]> git.ipfire.org Git - thirdparty/squid.git/blame - src/DiskIO/DiskDaemon/DiskdIOStrategy.cc
Boilerplate: update copyright blurbs on src/
[thirdparty/squid.git] / src / DiskIO / DiskDaemon / DiskdIOStrategy.cc
CommitLineData
b9ae18aa 1/*
bbc27441 2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
b9ae18aa 3 *
bbc27441
AJ
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
b9ae18aa 7 */
8
bbc27441
AJ
9/* DEBUG: section 79 Squid-side DISKD I/O functions. */
10
582c2af2 11#include "squid.h"
d841c88d 12#include "comm/Loops.h"
b9ae18aa 13#include "ConfigOption.h"
602d9612
A
14#include "diomsg.h"
15#include "DiskdFile.h"
582c2af2 16#include "DiskdIOStrategy.h"
b9ae18aa 17#include "DiskIO/DiskFile.h"
c4ad1349 18#include "fd.h"
4d5904f7 19#include "SquidConfig.h"
96097880 20#include "SquidIpc.h"
985c86bc 21#include "SquidTime.h"
602d9612
A
22#include "StatCounters.h"
23#include "Store.h"
b6b42084 24#include "unlinkd.h"
b9ae18aa 25
1a30fdf5 26#include <cerrno>
d440d5a4 27#if HAVE_SYS_IPC_H
582c2af2 28#include <sys/ipc.h>
d440d5a4
AJ
29#endif
30#if HAVE_SYS_MSG_H
582c2af2 31#include <sys/msg.h>
d440d5a4
AJ
32#endif
33#if HAVE_SYS_SHM_H
582c2af2 34#include <sys/shm.h>
d440d5a4 35#endif
582c2af2 36
b9ae18aa 37diskd_stats_t diskd_stats;
38
39size_t DiskdIOStrategy::nextInstanceID (0);
40const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
41
42size_t
43DiskdIOStrategy::newInstance()
44{
45 return ++nextInstanceID;
46}
47
48bool
49DiskdIOStrategy::shedLoad()
50{
51 /*
52 * Fail on open() if there are too many requests queued.
53 */
54
55 if (away > magic1) {
bf8fe701 56 debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
b9ae18aa 57
58 return true;
59 }
60
61 return false;
62}
63
64int
65DiskdIOStrategy::load()
66{
67 /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
68 /* the parse function guarantees magic2 is positivie */
69 return away * 1000 / magic2;
70}
71
72void
73DiskdIOStrategy::openFailed()
74{
cb4185f1 75 ++diskd_stats.open_fail_queue_len;
b9ae18aa 76}
77
78DiskFile::Pointer
79DiskdIOStrategy::newFile(char const *path)
80{
81 if (shedLoad()) {
82 openFailed();
83 return NULL;
84 }
85
86 return new DiskdFile (path, this);
87}
88
89DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0) , smsgid(-1), rmsgid(-1), wfd(-1) , instanceID(newInstance())
90{}
91
c521ad17
DK
92bool
93DiskdIOStrategy::unlinkdUseful() const
94{
95 return true;
96}
97
b9ae18aa 98void
99DiskdIOStrategy::unlinkFile(char const *path)
100{
101 if (shedLoad()) {
102 /* Damn, we need to issue a sync unlink here :( */
bf8fe701 103 debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
b9ae18aa 104 unlinkdUnlink(path);
b9ae18aa 105 return;
106 }
107
108 /* We can attempt a diskd unlink */
109 int x;
110
ee139403 111 ssize_t shm_offset;
b9ae18aa 112
113 char *buf;
114
115 buf = (char *)shm.get(&shm_offset);
116
117 xstrncpy(buf, path, SHMBUF_BLKSZ);
118
119 x = send(_MQD_UNLINK,
120 0,
121 (StoreIOState::Pointer )NULL,
122 0,
123 0,
124 shm_offset);
125
126 if (x < 0) {
e0236918 127 debugs(79, DBG_IMPORTANT, "storeDiskdSend UNLINK: " << xstrerror());
b9ae18aa 128 ::unlink(buf); /* XXX EWW! */
129 // shm.put (shm_offset);
130 }
131
cb4185f1 132 ++diskd_stats.unlink.ops;
b9ae18aa 133}
134
135void
136DiskdIOStrategy::init()
137{
b5d712b5 138 int pid;
139 void * hIpc;
b9ae18aa 140 int rfd;
141 int ikey;
142 const char *args[5];
143 char skey1[32];
144 char skey2[32];
145 char skey3[32];
b7ac5457 146 Ip::Address localhost;
b9ae18aa 147
148 ikey = (getpid() << 10) + (instanceID << 2);
149 ikey &= 0x7fffffff;
150 smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
151
152 if (smsgid < 0) {
fa84c01d 153 debugs(50, DBG_CRITICAL, "storeDiskdInit: msgget: " << xstrerror());
b9ae18aa 154 fatal("msgget failed");
155 }
156
157 rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
158
159 if (rmsgid < 0) {
fa84c01d 160 debugs(50, DBG_CRITICAL, "storeDiskdInit: msgget: " << xstrerror());
b9ae18aa 161 fatal("msgget failed");
162 }
163
164 shm.init(ikey, magic2);
165 snprintf(skey1, 32, "%d", ikey);
166 snprintf(skey2, 32, "%d", ikey + 1);
167 snprintf(skey3, 32, "%d", ikey + 2);
168 args[0] = "diskd";
169 args[1] = skey1;
170 args[2] = skey2;
171 args[3] = skey3;
172 args[4] = NULL;
4dd643d5 173 localhost.setLocalhost();
b5d712b5 174 pid = ipcCreate(IPC_STREAM,
175 Config.Program.diskd,
176 args,
177 "diskd",
cc192b50 178 localhost,
b5d712b5 179 &rfd,
180 &wfd,
181 &hIpc);
182
183 if (pid < 0)
b9ae18aa 184 fatalf("execl: %s", Config.Program.diskd);
185
186 if (rfd != wfd)
187 comm_close(rfd);
188
189 fd_note(wfd, "squid -> diskd");
190
933dd095 191 commUnsetFdTimeout(wfd);
b9ae18aa 192 commSetNonBlocking(wfd);
d841c88d 193 Comm::QuickPollRequired();
b9ae18aa 194}
195
196/*
197 * SHM manipulation routines
198 */
199void
ee139403 200SharedMemory::put(ssize_t offset)
b9ae18aa 201{
202 int i;
203 assert(offset >= 0);
204 assert(offset < nbufs * SHMBUF_BLKSZ);
205 i = offset / SHMBUF_BLKSZ;
206 assert(i < nbufs);
207 assert(CBIT_TEST(inuse_map, i));
208 CBIT_CLR(inuse_map, i);
209 --diskd_stats.shmbuf_count;
210}
211
212void *
213
ee139403 214SharedMemory::get(ssize_t * shm_offset)
b9ae18aa 215{
216 char *aBuf = NULL;
217 int i;
218
cb4185f1 219 for (i = 0; i < nbufs; ++i) {
b9ae18aa 220 if (CBIT_TEST(inuse_map, i))
221 continue;
222
223 CBIT_SET(inuse_map, i);
224
225 *shm_offset = i * SHMBUF_BLKSZ;
226
227 aBuf = buf + (*shm_offset);
228
229 break;
230 }
231
232 assert(aBuf);
233 assert(aBuf >= buf);
234 assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
cb4185f1 235 ++diskd_stats.shmbuf_count;
b9ae18aa 236
237 if (diskd_stats.max_shmuse < diskd_stats.shmbuf_count)
238 diskd_stats.max_shmuse = diskd_stats.shmbuf_count;
239
240 return aBuf;
241}
242
243void
244SharedMemory::init(int ikey, int magic2)
245{
246 nbufs = (int)(magic2 * 1.3);
247 id = shmget((key_t) (ikey + 2),
248 nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
249
250 if (id < 0) {
fa84c01d 251 debugs(50, DBG_CRITICAL, "storeDiskdInit: shmget: " << xstrerror());
b9ae18aa 252 fatal("shmget failed");
253 }
254
255 buf = (char *)shmat(id, NULL, 0);
256
257 if (buf == (void *) -1) {
fa84c01d 258 debugs(50, DBG_CRITICAL, "storeDiskdInit: shmat: " << xstrerror());
b9ae18aa 259 fatal("shmat failed");
260 }
261
262 inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
263 diskd_stats.shmbuf_count += nbufs;
264
cb4185f1 265 for (int i = 0; i < nbufs; ++i) {
b9ae18aa 266 CBIT_SET(inuse_map, i);
267 put (i * SHMBUF_BLKSZ);
268 }
269}
270
271void
272DiskdIOStrategy::unlinkDone(diomsg * M)
273{
bf8fe701 274 debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
e4f1fdae 275 ++statCounter.syscalls.disk.unlinks;
b9ae18aa 276
277 if (M->status < 0)
cb4185f1 278 ++diskd_stats.unlink.fail;
b9ae18aa 279 else
cb4185f1 280 ++diskd_stats.unlink.success;
b9ae18aa 281}
282
283void
284DiskdIOStrategy::handle(diomsg * M)
285{
286 if (!cbdataReferenceValid (M->callback_data)) {
287 /* I.e. already closed file
288 * - say when we have a error opening after
289 * a read was already queued
290 */
26ac0430 291 debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
b9ae18aa 292 cbdataReferenceDone (M->callback_data);
293 return;
294 }
295
a1ad81aa 296 /* set errno passed from diskd. makes debugging more meaningful */
297 if (M->status < 0)
298 errno = -M->status;
299
b9ae18aa 300 if (M->newstyle) {
301 DiskdFile *theFile = (DiskdFile *)M->callback_data;
8bf217bd 302 theFile->unlock();
b9ae18aa 303 theFile->completed (M);
304 } else
305 switch (M->mtype) {
306
307 case _MQD_OPEN:
308
309 case _MQD_CREATE:
310
311 case _MQD_CLOSE:
312
313 case _MQD_READ:
314
315 case _MQD_WRITE:
316 assert (0);
317 break;
318
319 case _MQD_UNLINK:
320 unlinkDone(M);
321 break;
322
323 default:
324 assert(0);
325 break;
326 }
327
328 cbdataReferenceDone (M->callback_data);
329}
330
331int
8bf217bd 332DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
b9ae18aa 333{
b9ae18aa 334 diomsg M;
b9ae18aa 335 M.callback_data = cbdataReference(theFile);
8bf217bd 336 theFile->lock();
b9ae18aa 337 M.requestor = requestor;
f30dcf2a 338 M.newstyle = true;
b9ae18aa 339
340 if (requestor)
8bf217bd 341 requestor->lock();
b9ae18aa 342
f30dcf2a 343 return SEND(&M, mtype, id, size, offset, shm_offset);
b9ae18aa 344}
345
346int
63be0a78 347DiskdIOStrategy::send(int mtype, int id, RefCount<StoreIOState> sio, size_t size, off_t offset, ssize_t shm_offset)
b9ae18aa 348{
b9ae18aa 349 diomsg M;
f30dcf2a 350 M.callback_data = cbdataReference(sio.getRaw());
351 M.newstyle = false;
352
353 return SEND(&M, mtype, id, size, offset, shm_offset);
354}
355
356int
ee139403 357DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
f30dcf2a 358{
b9ae18aa 359 static int send_errors = 0;
360 static int last_seq_no = 0;
361 static int seq_no = 0;
f30dcf2a 362 int x;
363
364 M->mtype = mtype;
365 M->size = size;
366 M->offset = offset;
367 M->status = -1;
368 M->shm_offset = (int) shm_offset;
369 M->id = id;
370 M->seq_no = ++seq_no;
b9ae18aa 371
f30dcf2a 372 if (M->seq_no < last_seq_no)
e0236918 373 debugs(79, DBG_IMPORTANT, "WARNING: sequencing out of order");
b9ae18aa 374
f30dcf2a 375 x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
b9ae18aa 376
f30dcf2a 377 last_seq_no = M->seq_no;
b9ae18aa 378
379 if (0 == x) {
cb4185f1
FC
380 ++diskd_stats.sent_count;
381 ++away;
b9ae18aa 382 } else {
e0236918 383 debugs(79, DBG_IMPORTANT, "storeDiskdSend: msgsnd: " << xstrerror());
f30dcf2a 384 cbdataReferenceDone(M->callback_data);
7f56277d
AJ
385 ++send_errors;
386 assert(send_errors < 100);
6a786390 387 if (shm_offset > -1)
388 shm.put(shm_offset);
b9ae18aa 389 }
390
391 /*
392 * We have to drain the queue here if necessary. If we don't,
393 * then we can have a lot of messages in the queue (probably
394 * up to 2*magic1) and we can run out of shared memory buffers.
395 */
396 /*
c8f4eac4 397 * Note that we call Store::Root().callbackk (for all SDs), rather
398 * than callback for just this SD, so that while
b9ae18aa 399 * we're "blocking" on this SD we can also handle callbacks
400 * from other SDs that might be ready.
401 */
b9ae18aa 402
9a518127 403 struct timeval delay = {0, 1};
b9ae18aa 404
9a518127 405 while (away > magic2) {
b9ae18aa 406 select(0, NULL, NULL, NULL, &delay);
c8f4eac4 407 Store::Root().callback();
b9ae18aa 408
409 if (delay.tv_usec < 1000000)
410 delay.tv_usec <<= 1;
411 }
412
413 return x;
414}
415
416ConfigOption *
417DiskdIOStrategy::getOptionTree() const
418{
419 ConfigOptionVector *result = new ConfigOptionVector;
420 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ1Parse, &DiskdIOStrategy::optionQ1Dump));
421 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ2Parse, &DiskdIOStrategy::optionQ2Dump));
422 return result;
423}
424
425bool
350e2aec 426DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int isaReconfig)
b9ae18aa 427{
428 if (strcmp(name, "Q1") != 0)
429 return false;
430
431 int old_magic1 = magic1;
432
433 magic1 = atoi(value);
434
350e2aec 435 if (!isaReconfig)
b9ae18aa 436 return true;
437
438 if (old_magic1 < magic1) {
439 /*
440 * This is because shm.nbufs is computed at startup, when
441 * we call shmget(). We can't increase the Q1/Q2 parameters
442 * beyond their initial values because then we might have
443 * more "Q2 messages" than shared memory chunks, and this
444 * will cause an assertion in storeDiskdShmGet().
445 */
446 /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
e0236918 447 debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
b9ae18aa 448 magic1 = old_magic1;
449 return true;
450 }
451
452 if (old_magic1 != magic1)
e0236918 453 debugs(3, DBG_IMPORTANT, "cache_dir new Q1 value '" << magic1 << "'");
b9ae18aa 454
455 return true;
456}
457
458void
459DiskdIOStrategy::optionQ1Dump(StoreEntry * e) const
460{
461 storeAppendPrintf(e, " Q1=%d", magic1);
462}
463
464bool
350e2aec 465DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int isaReconfig)
b9ae18aa 466{
467 if (strcmp(name, "Q2") != 0)
468 return false;
469
470 int old_magic2 = magic2;
471
472 magic2 = atoi(value);
473
350e2aec 474 if (!isaReconfig)
b9ae18aa 475 return true;
476
477 if (old_magic2 < magic2) {
478 /* See comments in Q1 function above */
e0236918 479 debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
b9ae18aa 480 magic2 = old_magic2;
481 return true;
482 }
483
484 if (old_magic2 != magic2)
e0236918 485 debugs(3, DBG_IMPORTANT, "cache_dir new Q2 value '" << magic2 << "'");
b9ae18aa 486
487 return true;
488}
489
490void
491DiskdIOStrategy::optionQ2Dump(StoreEntry * e) const
492{
493 storeAppendPrintf(e, " Q2=%d", magic2);
494}
495
496/*
497 * Sync any pending data. We just sit around and read the queue
498 * until the data has finished writing.
499 */
500void
501DiskdIOStrategy::sync()
502{
503 static time_t lastmsg = 0;
504
505 while (away > 0) {
506 if (squid_curtime > lastmsg) {
e0236918 507 debugs(47, DBG_IMPORTANT, "storeDiskdDirSync: " << away << " messages away");
b9ae18aa 508 lastmsg = squid_curtime;
509 }
510
511 callback();
512 }
513}
514
b9ae18aa 515/*
516 * Handle callbacks. If we have more than magic2 requests away, we block
517 * until the queue is below magic2. Otherwise, we simply return when we
518 * don't get a message.
519 */
520
521int
522DiskdIOStrategy::callback()
523{
524 diomsg M;
525 int x;
526 int retval = 0;
527
528 if (away >= magic2) {
cb4185f1 529 ++diskd_stats.block_queue_len;
b9ae18aa 530 retval = 1;
531 /* We might not have anything to do, but our queue
532 * is full.. */
533 }
534
535 if (diskd_stats.sent_count - diskd_stats.recv_count >
536 diskd_stats.max_away) {
537 diskd_stats.max_away = diskd_stats.sent_count - diskd_stats.recv_count;
538 }
539
540 while (1) {
541#ifdef ALWAYS_ZERO_BUFFERS
542 memset(&M, '\0', sizeof(M));
543#endif
544
545 x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
546
547 if (x < 0)
548 break;
549 else if (x != diomsg::msg_snd_rcv_sz) {
e0236918 550 debugs(47, DBG_IMPORTANT, "storeDiskdDirCallback: msgget returns " << x);
b9ae18aa 551 break;
552 }
553
cb4185f1 554 ++diskd_stats.recv_count;
b9ae18aa 555 --away;
556 handle(&M);
557 retval = 1; /* Return that we've actually done some work */
558
559 if (M.shm_offset > -1)
560 shm.put ((off_t) M.shm_offset);
561 }
562
563 return retval;
564}
565
566void
567DiskdIOStrategy::statfs(StoreEntry & sentry)const
568{
569 storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
570}