]> git.ipfire.org Git - thirdparty/squid.git/blame - src/DiskIO/DiskDaemon/DiskdIOStrategy.cc
Clean up some recently-added debugging messages.
[thirdparty/squid.git] / src / DiskIO / DiskDaemon / DiskdIOStrategy.cc
CommitLineData
b9ae18aa 1
2/*
c8f4eac4 3 * $Id: DiskdIOStrategy.cc,v 1.2 2005/01/03 16:08:26 robertc Exp $
b9ae18aa 4 *
5 * DEBUG: section 79 Squid-side DISKD I/O functions.
6 * AUTHOR: Duane Wessels
7 *
8 * SQUID Web Proxy Cache http://www.squid-cache.org/
9 * ----------------------------------------------------------
10 *
11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
33 *
34 * Copyright (c) 2003, Robert Collins <robertc@squid-cache.org>
35 */
36
37#include "squid.h"
38
39#include <sys/ipc.h>
40#include <sys/msg.h>
41#include <sys/shm.h>
42#include "DiskdIOStrategy.h"
43#include "ConfigOption.h"
44#include "DiskIO/DiskFile.h"
45#include "DiskdFile.h"
46#include "diomsg.h"
47/* for statfs */
48#include "Store.h"
49
50diskd_stats_t diskd_stats;
51
52size_t DiskdIOStrategy::nextInstanceID (0);
53const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
54
55size_t
56DiskdIOStrategy::newInstance()
57{
58 return ++nextInstanceID;
59}
60
61bool
62DiskdIOStrategy::shedLoad()
63{
64 /*
65 * Fail on open() if there are too many requests queued.
66 */
67
68 if (away > magic1) {
69 debug(79, 3) ("storeDiskdIO::shedLoad: Shedding, too many requests away\n");
70
71 return true;
72 }
73
74 return false;
75}
76
77int
78DiskdIOStrategy::load()
79{
80 /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
81 /* the parse function guarantees magic2 is positivie */
82 return away * 1000 / magic2;
83}
84
85void
86DiskdIOStrategy::openFailed()
87{
88 diskd_stats.open_fail_queue_len++;
89}
90
91DiskFile::Pointer
92DiskdIOStrategy::newFile(char const *path)
93{
94 if (shedLoad()) {
95 openFailed();
96 return NULL;
97 }
98
99 return new DiskdFile (path, this);
100}
101
102DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0) , smsgid(-1), rmsgid(-1), wfd(-1) , instanceID(newInstance())
103{}
104
105void
106DiskdIOStrategy::unlinkFile(char const *path)
107{
108 if (shedLoad()) {
109 /* Damn, we need to issue a sync unlink here :( */
110 debug(79, 2) ("storeDiskUnlink: Out of queue space, sync unlink\n");
111#if USE_UNLINKD
112
113 unlinkdUnlink(path);
114#elif USE_TRUNCATE
115
116 truncate(path, 0);
117#else
118
119 unlink(path);
120#endif
121
122 return;
123 }
124
125 /* We can attempt a diskd unlink */
126 int x;
127
128 off_t shm_offset;
129
130 char *buf;
131
132 buf = (char *)shm.get(&shm_offset);
133
134 xstrncpy(buf, path, SHMBUF_BLKSZ);
135
136 x = send(_MQD_UNLINK,
137 0,
138 (StoreIOState::Pointer )NULL,
139 0,
140 0,
141 shm_offset);
142
143 if (x < 0) {
144 debug(79, 1) ("storeDiskdSend UNLINK: %s\n", xstrerror());
145 ::unlink(buf); /* XXX EWW! */
146 // shm.put (shm_offset);
147 }
148
149 diskd_stats.unlink.ops++;
150}
151
152void
153DiskdIOStrategy::init()
154{
155 int x;
156 int rfd;
157 int ikey;
158 const char *args[5];
159 char skey1[32];
160 char skey2[32];
161 char skey3[32];
162
163 ikey = (getpid() << 10) + (instanceID << 2);
164 ikey &= 0x7fffffff;
165 smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
166
167 if (smsgid < 0) {
168 debug(50, 0) ("storeDiskdInit: msgget: %s\n", xstrerror());
169 fatal("msgget failed");
170 }
171
172 rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
173
174 if (rmsgid < 0) {
175 debug(50, 0) ("storeDiskdInit: msgget: %s\n", xstrerror());
176 fatal("msgget failed");
177 }
178
179 shm.init(ikey, magic2);
180 snprintf(skey1, 32, "%d", ikey);
181 snprintf(skey2, 32, "%d", ikey + 1);
182 snprintf(skey3, 32, "%d", ikey + 2);
183 args[0] = "diskd";
184 args[1] = skey1;
185 args[2] = skey2;
186 args[3] = skey3;
187 args[4] = NULL;
188 x = ipcCreate(IPC_STREAM,
189 Config.Program.diskd,
190 args,
191 "diskd",
192 &rfd,
193 &wfd);
194
195 if (x < 0)
196 fatalf("execl: %s", Config.Program.diskd);
197
198 if (rfd != wfd)
199 comm_close(rfd);
200
201 fd_note(wfd, "squid -> diskd");
202
203 commSetTimeout(wfd, -1, NULL, NULL);
204
205 commSetNonBlocking(wfd);
206
207 comm_quick_poll_required();
208}
209
210/*
211 * SHM manipulation routines
212 */
213void
214SharedMemory::put (off_t offset)
215{
216 int i;
217 assert(offset >= 0);
218 assert(offset < nbufs * SHMBUF_BLKSZ);
219 i = offset / SHMBUF_BLKSZ;
220 assert(i < nbufs);
221 assert(CBIT_TEST(inuse_map, i));
222 CBIT_CLR(inuse_map, i);
223 --diskd_stats.shmbuf_count;
224}
225
226void *
227
228SharedMemory::get
229 (off_t * shm_offset)
230{
231 char *aBuf = NULL;
232 int i;
233
234 for (i = 0; i < nbufs; i++) {
235 if (CBIT_TEST(inuse_map, i))
236 continue;
237
238 CBIT_SET(inuse_map, i);
239
240 *shm_offset = i * SHMBUF_BLKSZ;
241
242 aBuf = buf + (*shm_offset);
243
244 break;
245 }
246
247 assert(aBuf);
248 assert(aBuf >= buf);
249 assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
250 diskd_stats.shmbuf_count++;
251
252 if (diskd_stats.max_shmuse < diskd_stats.shmbuf_count)
253 diskd_stats.max_shmuse = diskd_stats.shmbuf_count;
254
255 return aBuf;
256}
257
258void
259SharedMemory::init(int ikey, int magic2)
260{
261 nbufs = (int)(magic2 * 1.3);
262 id = shmget((key_t) (ikey + 2),
263 nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
264
265 if (id < 0) {
266 debug(50, 0) ("storeDiskdInit: shmget: %s\n", xstrerror());
267 fatal("shmget failed");
268 }
269
270 buf = (char *)shmat(id, NULL, 0);
271
272 if (buf == (void *) -1) {
273 debug(50, 0) ("storeDiskdInit: shmat: %s\n", xstrerror());
274 fatal("shmat failed");
275 }
276
277 inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
278 diskd_stats.shmbuf_count += nbufs;
279
280 for (int i = 0; i < nbufs; i++) {
281 CBIT_SET(inuse_map, i);
282 put (i * SHMBUF_BLKSZ);
283 }
284}
285
286void
287DiskdIOStrategy::unlinkDone(diomsg * M)
288{
289 debug(79, 3) ("storeDiskdUnlinkDone: file %s status %d\n",shm.buf + M->shm_offset,
290 M->status);
291 statCounter.syscalls.disk.unlinks++;
292
293 if (M->status < 0)
294 diskd_stats.unlink.fail++;
295 else
296 diskd_stats.unlink.success++;
297}
298
299void
300DiskdIOStrategy::handle(diomsg * M)
301{
302 if (!cbdataReferenceValid (M->callback_data)) {
303 /* I.e. already closed file
304 * - say when we have a error opening after
305 * a read was already queued
306 */
307 debug(79, 3) ("storeDiskdHandle: Invalid callback_data %p\n",
308 M->callback_data);
309 cbdataReferenceDone (M->callback_data);
310 return;
311 }
312
313
314 if (M->newstyle) {
315 DiskdFile *theFile = (DiskdFile *)M->callback_data;
316 theFile->RefCountDereference();
317 theFile->completed (M);
318 } else
319 switch (M->mtype) {
320
321 case _MQD_OPEN:
322
323 case _MQD_CREATE:
324
325 case _MQD_CLOSE:
326
327 case _MQD_READ:
328
329 case _MQD_WRITE:
330 assert (0);
331 break;
332
333 case _MQD_UNLINK:
334 unlinkDone(M);
335 break;
336
337 default:
338 assert(0);
339 break;
340 }
341
342 cbdataReferenceDone (M->callback_data);
343}
344
345int
346DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, int size, int offset, off_t shm_offset, RefCountable_ *requestor)
347{
348 int x;
349 diomsg M;
350 static int send_errors = 0;
351 static int last_seq_no = 0;
352 static int seq_no = 0;
353 M.mtype = mtype;
354 M.callback_data = cbdataReference(theFile);
355 theFile->RefCountReference();
356 M.requestor = requestor;
357
358 if (requestor)
359 requestor->RefCountReference();
360
361 M.size = size;
362
363 M.offset = offset;
364
365 M.status = -1;
366
367 M.shm_offset = (int) shm_offset;
368
369 M.id = id;
370
371 M.seq_no = ++seq_no;
372
373 M.newstyle = true;
374
375 if (M.seq_no < last_seq_no)
376 debug(79, 1) ("WARNING: sequencing out of order\n");
377
378 debugs (79,9, "sending with" << smsgid <<" " << &M << " " <<diomsg::msg_snd_rcv_sz << " " << IPC_NOWAIT);
379
380 x = msgsnd(smsgid, &M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
381
382 last_seq_no = M.seq_no;
383
384 if (0 == x) {
385 diskd_stats.sent_count++;
386 away++;
387 } else {
388 debug(79, 1) ("storeDiskdSend: msgsnd: %s\n", xstrerror());
389 cbdataReferenceDone(M.callback_data);
390 assert(++send_errors < 100);
391 shm.put (shm_offset);
392 }
393
394 /*
395 * We have to drain the queue here if necessary. If we don't,
396 * then we can have a lot of messages in the queue (probably
397 * up to 2*magic1) and we can run out of shared memory buffers.
398 */
399 /*
c8f4eac4 400 * Note that we call Store::Root().callbackk (for all SDs), rather
401 * than callback for just this SD, so that while
b9ae18aa 402 * we're "blocking" on this SD we can also handle callbacks
403 * from other SDs that might be ready.
404 */
405 while (away > magic2) {
406
407 struct timeval delay = {0, 1};
408
409 select(0, NULL, NULL, NULL, &delay);
c8f4eac4 410 Store::Root().callback();
b9ae18aa 411
412 if (delay.tv_usec < 1000000)
413 delay.tv_usec <<= 1;
414 }
415
416 return x;
417}
418
419int
420DiskdIOStrategy::send(int mtype, int id, StoreIOState::Pointer sio, int size, int offset, off_t shm_offset)
421{
422 int x;
423 diomsg M;
424 static int send_errors = 0;
425 static int last_seq_no = 0;
426 static int seq_no = 0;
427 M.mtype = mtype;
428 M.callback_data = cbdataReference(sio.getRaw());
429 M.size = size;
430 M.offset = offset;
431 M.status = -1;
432 M.shm_offset = (int) shm_offset;
433 M.id = id;
434 M.seq_no = ++seq_no;
435 M.newstyle = false;
436
437 if (M.seq_no < last_seq_no)
438 debug(79, 1) ("WARNING: sequencing out of order\n");
439
440 x = msgsnd(smsgid, &M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
441
442 last_seq_no = M.seq_no;
443
444 if (0 == x) {
445 diskd_stats.sent_count++;
446 away++;
447 } else {
448 debug(79, 1) ("storeDiskdSend: msgsnd: %s\n", xstrerror());
449 cbdataReferenceDone(M.callback_data);
450 assert(++send_errors < 100);
451 }
452
453 /*
454 * We have to drain the queue here if necessary. If we don't,
455 * then we can have a lot of messages in the queue (probably
456 * up to 2*magic1) and we can run out of shared memory buffers.
457 */
458 /*
c8f4eac4 459 * Note that we call Store::Root().callbackk (for all SDs), rather
460 * than callback for just this SD, so that while
b9ae18aa 461 * we're "blocking" on this SD we can also handle callbacks
462 * from other SDs that might be ready.
463 */
464 while (away > magic2) {
465
466 struct timeval delay = {0, 1};
467
468 select(0, NULL, NULL, NULL, &delay);
c8f4eac4 469 Store::Root().callback();
b9ae18aa 470
471 if (delay.tv_usec < 1000000)
472 delay.tv_usec <<= 1;
473 }
474
475 return x;
476}
477
478ConfigOption *
479DiskdIOStrategy::getOptionTree() const
480{
481 ConfigOptionVector *result = new ConfigOptionVector;
482 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ1Parse, &DiskdIOStrategy::optionQ1Dump));
483 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ2Parse, &DiskdIOStrategy::optionQ2Dump));
484 return result;
485}
486
487bool
488DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int reconfiguring)
489{
490 if (strcmp(name, "Q1") != 0)
491 return false;
492
493 int old_magic1 = magic1;
494
495 magic1 = atoi(value);
496
497 if (!reconfiguring)
498 return true;
499
500 if (old_magic1 < magic1) {
501 /*
502 * This is because shm.nbufs is computed at startup, when
503 * we call shmget(). We can't increase the Q1/Q2 parameters
504 * beyond their initial values because then we might have
505 * more "Q2 messages" than shared memory chunks, and this
506 * will cause an assertion in storeDiskdShmGet().
507 */
508 /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
509 debug(3, 1) ("WARNING: cannot increase cache_dir Q1 value while Squid is running.\n");
510 magic1 = old_magic1;
511 return true;
512 }
513
514 if (old_magic1 != magic1)
515 debug(3, 1) ("cache_dir new Q1 value '%d'\n",
516 magic1);
517
518 return true;
519}
520
521void
522DiskdIOStrategy::optionQ1Dump(StoreEntry * e) const
523{
524 storeAppendPrintf(e, " Q1=%d", magic1);
525}
526
527bool
528DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int reconfiguring)
529{
530 if (strcmp(name, "Q2") != 0)
531 return false;
532
533 int old_magic2 = magic2;
534
535 magic2 = atoi(value);
536
537 if (!reconfiguring)
538 return true;
539
540 if (old_magic2 < magic2) {
541 /* See comments in Q1 function above */
542 debug(3, 1) ("WARNING: cannot increase cache_dir Q2 value while Squid is running.\n");
543 magic2 = old_magic2;
544 return true;
545 }
546
547 if (old_magic2 != magic2)
548 debug(3, 1) ("cache_dir new Q2 value '%d'\n",
549 magic2);
550
551 return true;
552}
553
554void
555DiskdIOStrategy::optionQ2Dump(StoreEntry * e) const
556{
557 storeAppendPrintf(e, " Q2=%d", magic2);
558}
559
560/*
561 * Sync any pending data. We just sit around and read the queue
562 * until the data has finished writing.
563 */
564void
565DiskdIOStrategy::sync()
566{
567 static time_t lastmsg = 0;
568
569 while (away > 0) {
570 if (squid_curtime > lastmsg) {
571 debug(47, 1) ("storeDiskdDirSync: %d messages away\n",
572 away);
573 lastmsg = squid_curtime;
574 }
575
576 callback();
577 }
578}
579
580
581/*
582 * Handle callbacks. If we have more than magic2 requests away, we block
583 * until the queue is below magic2. Otherwise, we simply return when we
584 * don't get a message.
585 */
586
587int
588DiskdIOStrategy::callback()
589{
590 diomsg M;
591 int x;
592 int retval = 0;
593
594 if (away >= magic2) {
595 diskd_stats.block_queue_len++;
596 retval = 1;
597 /* We might not have anything to do, but our queue
598 * is full.. */
599 }
600
601 if (diskd_stats.sent_count - diskd_stats.recv_count >
602 diskd_stats.max_away) {
603 diskd_stats.max_away = diskd_stats.sent_count - diskd_stats.recv_count;
604 }
605
606 while (1) {
607#ifdef ALWAYS_ZERO_BUFFERS
608 memset(&M, '\0', sizeof(M));
609#endif
610
611 x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
612
613 if (x < 0)
614 break;
615 else if (x != diomsg::msg_snd_rcv_sz) {
616 debug(47, 1) ("storeDiskdDirCallback: msgget returns %d\n",
617 x);
618 break;
619 }
620
621 diskd_stats.recv_count++;
622 --away;
623 handle(&M);
624 retval = 1; /* Return that we've actually done some work */
625
626 if (M.shm_offset > -1)
627 shm.put ((off_t) M.shm_offset);
628 }
629
630 return retval;
631}
632
633void
634DiskdIOStrategy::statfs(StoreEntry & sentry)const
635{
636 storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
637}