]> git.ipfire.org Git - thirdparty/squid.git/blame - src/DiskIO/DiskDaemon/DiskdIOStrategy.cc
Source Maintenance: enforce #include statement block ordering
[thirdparty/squid.git] / src / DiskIO / DiskDaemon / DiskdIOStrategy.cc
CommitLineData
b9ae18aa 1/*
b9ae18aa 2 * DEBUG: section 79 Squid-side DISKD I/O functions.
3 * AUTHOR: Duane Wessels
4 *
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
7 *
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
26ac0430 21 *
b9ae18aa 22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26ac0430 26 *
b9ae18aa 27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
30 *
31 * Copyright (c) 2003, Robert Collins <robertc@squid-cache.org>
32 */
33
582c2af2 34#include "squid.h"
d841c88d 35#include "comm/Loops.h"
b9ae18aa 36#include "ConfigOption.h"
582c2af2 37#include "DiskdIOStrategy.h"
b9ae18aa 38#include "DiskIO/DiskFile.h"
39#include "DiskdFile.h"
40#include "diomsg.h"
c4ad1349 41#include "fd.h"
b9ae18aa 42#include "Store.h"
e4f1fdae 43#include "StatCounters.h"
4d5904f7 44#include "SquidConfig.h"
96097880 45#include "SquidIpc.h"
985c86bc 46#include "SquidTime.h"
b6b42084 47#include "unlinkd.h"
b9ae18aa 48
d440d5a4 49#if HAVE_SYS_IPC_H
582c2af2 50#include <sys/ipc.h>
d440d5a4
AJ
51#endif
52#if HAVE_SYS_MSG_H
582c2af2 53#include <sys/msg.h>
d440d5a4
AJ
54#endif
55#if HAVE_SYS_SHM_H
582c2af2 56#include <sys/shm.h>
d440d5a4 57#endif
21d845b1
FC
58#if HAVE_ERRNO_H
59#include <errno.h>
60#endif
582c2af2 61
b9ae18aa 62diskd_stats_t diskd_stats;
63
64size_t DiskdIOStrategy::nextInstanceID (0);
65const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
66
67size_t
68DiskdIOStrategy::newInstance()
69{
70 return ++nextInstanceID;
71}
72
73bool
74DiskdIOStrategy::shedLoad()
75{
76 /*
77 * Fail on open() if there are too many requests queued.
78 */
79
80 if (away > magic1) {
bf8fe701 81 debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
b9ae18aa 82
83 return true;
84 }
85
86 return false;
87}
88
89int
90DiskdIOStrategy::load()
91{
92 /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
93 /* the parse function guarantees magic2 is positivie */
94 return away * 1000 / magic2;
95}
96
97void
98DiskdIOStrategy::openFailed()
99{
cb4185f1 100 ++diskd_stats.open_fail_queue_len;
b9ae18aa 101}
102
103DiskFile::Pointer
104DiskdIOStrategy::newFile(char const *path)
105{
106 if (shedLoad()) {
107 openFailed();
108 return NULL;
109 }
110
111 return new DiskdFile (path, this);
112}
113
114DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0) , smsgid(-1), rmsgid(-1), wfd(-1) , instanceID(newInstance())
115{}
116
c521ad17
DK
117bool
118DiskdIOStrategy::unlinkdUseful() const
119{
120 return true;
121}
122
b9ae18aa 123void
124DiskdIOStrategy::unlinkFile(char const *path)
125{
126 if (shedLoad()) {
127 /* Damn, we need to issue a sync unlink here :( */
bf8fe701 128 debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
b9ae18aa 129 unlinkdUnlink(path);
b9ae18aa 130 return;
131 }
132
133 /* We can attempt a diskd unlink */
134 int x;
135
ee139403 136 ssize_t shm_offset;
b9ae18aa 137
138 char *buf;
139
140 buf = (char *)shm.get(&shm_offset);
141
142 xstrncpy(buf, path, SHMBUF_BLKSZ);
143
144 x = send(_MQD_UNLINK,
145 0,
146 (StoreIOState::Pointer )NULL,
147 0,
148 0,
149 shm_offset);
150
151 if (x < 0) {
e0236918 152 debugs(79, DBG_IMPORTANT, "storeDiskdSend UNLINK: " << xstrerror());
b9ae18aa 153 ::unlink(buf); /* XXX EWW! */
154 // shm.put (shm_offset);
155 }
156
cb4185f1 157 ++diskd_stats.unlink.ops;
b9ae18aa 158}
159
160void
161DiskdIOStrategy::init()
162{
b5d712b5 163 int pid;
164 void * hIpc;
b9ae18aa 165 int rfd;
166 int ikey;
167 const char *args[5];
168 char skey1[32];
169 char skey2[32];
170 char skey3[32];
b7ac5457 171 Ip::Address localhost;
b9ae18aa 172
173 ikey = (getpid() << 10) + (instanceID << 2);
174 ikey &= 0x7fffffff;
175 smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
176
177 if (smsgid < 0) {
fa84c01d 178 debugs(50, DBG_CRITICAL, "storeDiskdInit: msgget: " << xstrerror());
b9ae18aa 179 fatal("msgget failed");
180 }
181
182 rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
183
184 if (rmsgid < 0) {
fa84c01d 185 debugs(50, DBG_CRITICAL, "storeDiskdInit: msgget: " << xstrerror());
b9ae18aa 186 fatal("msgget failed");
187 }
188
189 shm.init(ikey, magic2);
190 snprintf(skey1, 32, "%d", ikey);
191 snprintf(skey2, 32, "%d", ikey + 1);
192 snprintf(skey3, 32, "%d", ikey + 2);
193 args[0] = "diskd";
194 args[1] = skey1;
195 args[2] = skey2;
196 args[3] = skey3;
197 args[4] = NULL;
4dd643d5 198 localhost.setLocalhost();
b5d712b5 199 pid = ipcCreate(IPC_STREAM,
200 Config.Program.diskd,
201 args,
202 "diskd",
cc192b50 203 localhost,
b5d712b5 204 &rfd,
205 &wfd,
206 &hIpc);
207
208 if (pid < 0)
b9ae18aa 209 fatalf("execl: %s", Config.Program.diskd);
210
211 if (rfd != wfd)
212 comm_close(rfd);
213
214 fd_note(wfd, "squid -> diskd");
215
933dd095 216 commUnsetFdTimeout(wfd);
b9ae18aa 217 commSetNonBlocking(wfd);
d841c88d 218 Comm::QuickPollRequired();
b9ae18aa 219}
220
221/*
222 * SHM manipulation routines
223 */
224void
ee139403 225SharedMemory::put(ssize_t offset)
b9ae18aa 226{
227 int i;
228 assert(offset >= 0);
229 assert(offset < nbufs * SHMBUF_BLKSZ);
230 i = offset / SHMBUF_BLKSZ;
231 assert(i < nbufs);
232 assert(CBIT_TEST(inuse_map, i));
233 CBIT_CLR(inuse_map, i);
234 --diskd_stats.shmbuf_count;
235}
236
237void *
238
ee139403 239SharedMemory::get(ssize_t * shm_offset)
b9ae18aa 240{
241 char *aBuf = NULL;
242 int i;
243
cb4185f1 244 for (i = 0; i < nbufs; ++i) {
b9ae18aa 245 if (CBIT_TEST(inuse_map, i))
246 continue;
247
248 CBIT_SET(inuse_map, i);
249
250 *shm_offset = i * SHMBUF_BLKSZ;
251
252 aBuf = buf + (*shm_offset);
253
254 break;
255 }
256
257 assert(aBuf);
258 assert(aBuf >= buf);
259 assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
cb4185f1 260 ++diskd_stats.shmbuf_count;
b9ae18aa 261
262 if (diskd_stats.max_shmuse < diskd_stats.shmbuf_count)
263 diskd_stats.max_shmuse = diskd_stats.shmbuf_count;
264
265 return aBuf;
266}
267
268void
269SharedMemory::init(int ikey, int magic2)
270{
271 nbufs = (int)(magic2 * 1.3);
272 id = shmget((key_t) (ikey + 2),
273 nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
274
275 if (id < 0) {
fa84c01d 276 debugs(50, DBG_CRITICAL, "storeDiskdInit: shmget: " << xstrerror());
b9ae18aa 277 fatal("shmget failed");
278 }
279
280 buf = (char *)shmat(id, NULL, 0);
281
282 if (buf == (void *) -1) {
fa84c01d 283 debugs(50, DBG_CRITICAL, "storeDiskdInit: shmat: " << xstrerror());
b9ae18aa 284 fatal("shmat failed");
285 }
286
287 inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
288 diskd_stats.shmbuf_count += nbufs;
289
cb4185f1 290 for (int i = 0; i < nbufs; ++i) {
b9ae18aa 291 CBIT_SET(inuse_map, i);
292 put (i * SHMBUF_BLKSZ);
293 }
294}
295
296void
297DiskdIOStrategy::unlinkDone(diomsg * M)
298{
bf8fe701 299 debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
e4f1fdae 300 ++statCounter.syscalls.disk.unlinks;
b9ae18aa 301
302 if (M->status < 0)
cb4185f1 303 ++diskd_stats.unlink.fail;
b9ae18aa 304 else
cb4185f1 305 ++diskd_stats.unlink.success;
b9ae18aa 306}
307
308void
309DiskdIOStrategy::handle(diomsg * M)
310{
311 if (!cbdataReferenceValid (M->callback_data)) {
312 /* I.e. already closed file
313 * - say when we have a error opening after
314 * a read was already queued
315 */
26ac0430 316 debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
b9ae18aa 317 cbdataReferenceDone (M->callback_data);
318 return;
319 }
320
a1ad81aa 321 /* set errno passed from diskd. makes debugging more meaningful */
322 if (M->status < 0)
323 errno = -M->status;
324
b9ae18aa 325 if (M->newstyle) {
326 DiskdFile *theFile = (DiskdFile *)M->callback_data;
8bf217bd 327 theFile->unlock();
b9ae18aa 328 theFile->completed (M);
329 } else
330 switch (M->mtype) {
331
332 case _MQD_OPEN:
333
334 case _MQD_CREATE:
335
336 case _MQD_CLOSE:
337
338 case _MQD_READ:
339
340 case _MQD_WRITE:
341 assert (0);
342 break;
343
344 case _MQD_UNLINK:
345 unlinkDone(M);
346 break;
347
348 default:
349 assert(0);
350 break;
351 }
352
353 cbdataReferenceDone (M->callback_data);
354}
355
356int
8bf217bd 357DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
b9ae18aa 358{
b9ae18aa 359 diomsg M;
b9ae18aa 360 M.callback_data = cbdataReference(theFile);
8bf217bd 361 theFile->lock();
b9ae18aa 362 M.requestor = requestor;
f30dcf2a 363 M.newstyle = true;
b9ae18aa 364
365 if (requestor)
8bf217bd 366 requestor->lock();
b9ae18aa 367
f30dcf2a 368 return SEND(&M, mtype, id, size, offset, shm_offset);
b9ae18aa 369}
370
371int
63be0a78 372DiskdIOStrategy::send(int mtype, int id, RefCount<StoreIOState> sio, size_t size, off_t offset, ssize_t shm_offset)
b9ae18aa 373{
b9ae18aa 374 diomsg M;
f30dcf2a 375 M.callback_data = cbdataReference(sio.getRaw());
376 M.newstyle = false;
377
378 return SEND(&M, mtype, id, size, offset, shm_offset);
379}
380
381int
ee139403 382DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
f30dcf2a 383{
b9ae18aa 384 static int send_errors = 0;
385 static int last_seq_no = 0;
386 static int seq_no = 0;
f30dcf2a 387 int x;
388
389 M->mtype = mtype;
390 M->size = size;
391 M->offset = offset;
392 M->status = -1;
393 M->shm_offset = (int) shm_offset;
394 M->id = id;
395 M->seq_no = ++seq_no;
b9ae18aa 396
f30dcf2a 397 if (M->seq_no < last_seq_no)
e0236918 398 debugs(79, DBG_IMPORTANT, "WARNING: sequencing out of order");
b9ae18aa 399
f30dcf2a 400 x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
b9ae18aa 401
f30dcf2a 402 last_seq_no = M->seq_no;
b9ae18aa 403
404 if (0 == x) {
cb4185f1
FC
405 ++diskd_stats.sent_count;
406 ++away;
b9ae18aa 407 } else {
e0236918 408 debugs(79, DBG_IMPORTANT, "storeDiskdSend: msgsnd: " << xstrerror());
f30dcf2a 409 cbdataReferenceDone(M->callback_data);
7f56277d
AJ
410 ++send_errors;
411 assert(send_errors < 100);
6a786390 412 if (shm_offset > -1)
413 shm.put(shm_offset);
b9ae18aa 414 }
415
416 /*
417 * We have to drain the queue here if necessary. If we don't,
418 * then we can have a lot of messages in the queue (probably
419 * up to 2*magic1) and we can run out of shared memory buffers.
420 */
421 /*
c8f4eac4 422 * Note that we call Store::Root().callbackk (for all SDs), rather
423 * than callback for just this SD, so that while
b9ae18aa 424 * we're "blocking" on this SD we can also handle callbacks
425 * from other SDs that might be ready.
426 */
b9ae18aa 427
9a518127 428 struct timeval delay = {0, 1};
b9ae18aa 429
9a518127 430 while (away > magic2) {
b9ae18aa 431 select(0, NULL, NULL, NULL, &delay);
c8f4eac4 432 Store::Root().callback();
b9ae18aa 433
434 if (delay.tv_usec < 1000000)
435 delay.tv_usec <<= 1;
436 }
437
438 return x;
439}
440
441ConfigOption *
442DiskdIOStrategy::getOptionTree() const
443{
444 ConfigOptionVector *result = new ConfigOptionVector;
445 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ1Parse, &DiskdIOStrategy::optionQ1Dump));
446 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ2Parse, &DiskdIOStrategy::optionQ2Dump));
447 return result;
448}
449
450bool
350e2aec 451DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int isaReconfig)
b9ae18aa 452{
453 if (strcmp(name, "Q1") != 0)
454 return false;
455
456 int old_magic1 = magic1;
457
458 magic1 = atoi(value);
459
350e2aec 460 if (!isaReconfig)
b9ae18aa 461 return true;
462
463 if (old_magic1 < magic1) {
464 /*
465 * This is because shm.nbufs is computed at startup, when
466 * we call shmget(). We can't increase the Q1/Q2 parameters
467 * beyond their initial values because then we might have
468 * more "Q2 messages" than shared memory chunks, and this
469 * will cause an assertion in storeDiskdShmGet().
470 */
471 /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
e0236918 472 debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
b9ae18aa 473 magic1 = old_magic1;
474 return true;
475 }
476
477 if (old_magic1 != magic1)
e0236918 478 debugs(3, DBG_IMPORTANT, "cache_dir new Q1 value '" << magic1 << "'");
b9ae18aa 479
480 return true;
481}
482
483void
484DiskdIOStrategy::optionQ1Dump(StoreEntry * e) const
485{
486 storeAppendPrintf(e, " Q1=%d", magic1);
487}
488
489bool
350e2aec 490DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int isaReconfig)
b9ae18aa 491{
492 if (strcmp(name, "Q2") != 0)
493 return false;
494
495 int old_magic2 = magic2;
496
497 magic2 = atoi(value);
498
350e2aec 499 if (!isaReconfig)
b9ae18aa 500 return true;
501
502 if (old_magic2 < magic2) {
503 /* See comments in Q1 function above */
e0236918 504 debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
b9ae18aa 505 magic2 = old_magic2;
506 return true;
507 }
508
509 if (old_magic2 != magic2)
e0236918 510 debugs(3, DBG_IMPORTANT, "cache_dir new Q2 value '" << magic2 << "'");
b9ae18aa 511
512 return true;
513}
514
515void
516DiskdIOStrategy::optionQ2Dump(StoreEntry * e) const
517{
518 storeAppendPrintf(e, " Q2=%d", magic2);
519}
520
521/*
522 * Sync any pending data. We just sit around and read the queue
523 * until the data has finished writing.
524 */
525void
526DiskdIOStrategy::sync()
527{
528 static time_t lastmsg = 0;
529
530 while (away > 0) {
531 if (squid_curtime > lastmsg) {
e0236918 532 debugs(47, DBG_IMPORTANT, "storeDiskdDirSync: " << away << " messages away");
b9ae18aa 533 lastmsg = squid_curtime;
534 }
535
536 callback();
537 }
538}
539
b9ae18aa 540/*
541 * Handle callbacks. If we have more than magic2 requests away, we block
542 * until the queue is below magic2. Otherwise, we simply return when we
543 * don't get a message.
544 */
545
546int
547DiskdIOStrategy::callback()
548{
549 diomsg M;
550 int x;
551 int retval = 0;
552
553 if (away >= magic2) {
cb4185f1 554 ++diskd_stats.block_queue_len;
b9ae18aa 555 retval = 1;
556 /* We might not have anything to do, but our queue
557 * is full.. */
558 }
559
560 if (diskd_stats.sent_count - diskd_stats.recv_count >
561 diskd_stats.max_away) {
562 diskd_stats.max_away = diskd_stats.sent_count - diskd_stats.recv_count;
563 }
564
565 while (1) {
566#ifdef ALWAYS_ZERO_BUFFERS
567 memset(&M, '\0', sizeof(M));
568#endif
569
570 x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
571
572 if (x < 0)
573 break;
574 else if (x != diomsg::msg_snd_rcv_sz) {
e0236918 575 debugs(47, DBG_IMPORTANT, "storeDiskdDirCallback: msgget returns " << x);
b9ae18aa 576 break;
577 }
578
cb4185f1 579 ++diskd_stats.recv_count;
b9ae18aa 580 --away;
581 handle(&M);
582 retval = 1; /* Return that we've actually done some work */
583
584 if (M.shm_offset > -1)
585 shm.put ((off_t) M.shm_offset);
586 }
587
588 return retval;
589}
590
591void
592DiskdIOStrategy::statfs(StoreEntry & sentry)const
593{
594 storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
595}