]> git.ipfire.org Git - thirdparty/squid.git/blame - src/DiskIO/DiskDaemon/DiskdIOStrategy.cc
Cleanup: zap CVS Id tags
[thirdparty/squid.git] / src / DiskIO / DiskDaemon / DiskdIOStrategy.cc
CommitLineData
b9ae18aa 1/*
262a0e14 2 * $Id$
b9ae18aa 3 *
4 * DEBUG: section 79 Squid-side DISKD I/O functions.
5 * AUTHOR: Duane Wessels
6 *
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
9 *
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
18 *
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
26ac0430 23 *
b9ae18aa 24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
26ac0430 28 *
b9ae18aa 29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
32 *
33 * Copyright (c) 2003, Robert Collins <robertc@squid-cache.org>
34 */
35
36#include "squid.h"
37
38#include <sys/ipc.h>
39#include <sys/msg.h>
40#include <sys/shm.h>
41#include "DiskdIOStrategy.h"
42#include "ConfigOption.h"
43#include "DiskIO/DiskFile.h"
44#include "DiskdFile.h"
45#include "diomsg.h"
46/* for statfs */
47#include "Store.h"
985c86bc 48#include "SquidTime.h"
b9ae18aa 49
50diskd_stats_t diskd_stats;
51
52size_t DiskdIOStrategy::nextInstanceID (0);
53const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
54
55size_t
56DiskdIOStrategy::newInstance()
57{
58 return ++nextInstanceID;
59}
60
61bool
62DiskdIOStrategy::shedLoad()
63{
64 /*
65 * Fail on open() if there are too many requests queued.
66 */
67
68 if (away > magic1) {
bf8fe701 69 debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
b9ae18aa 70
71 return true;
72 }
73
74 return false;
75}
76
77int
78DiskdIOStrategy::load()
79{
80 /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
81 /* the parse function guarantees magic2 is positivie */
82 return away * 1000 / magic2;
83}
84
85void
86DiskdIOStrategy::openFailed()
87{
88 diskd_stats.open_fail_queue_len++;
89}
90
91DiskFile::Pointer
92DiskdIOStrategy::newFile(char const *path)
93{
94 if (shedLoad()) {
95 openFailed();
96 return NULL;
97 }
98
99 return new DiskdFile (path, this);
100}
101
102DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0) , smsgid(-1), rmsgid(-1), wfd(-1) , instanceID(newInstance())
103{}
104
105void
106DiskdIOStrategy::unlinkFile(char const *path)
107{
108 if (shedLoad()) {
109 /* Damn, we need to issue a sync unlink here :( */
bf8fe701 110 debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
b9ae18aa 111#if USE_UNLINKD
112
113 unlinkdUnlink(path);
b9ae18aa 114#else
115
116 unlink(path);
117#endif
118
119 return;
120 }
121
122 /* We can attempt a diskd unlink */
123 int x;
124
ee139403 125 ssize_t shm_offset;
b9ae18aa 126
127 char *buf;
128
129 buf = (char *)shm.get(&shm_offset);
130
131 xstrncpy(buf, path, SHMBUF_BLKSZ);
132
133 x = send(_MQD_UNLINK,
134 0,
135 (StoreIOState::Pointer )NULL,
136 0,
137 0,
138 shm_offset);
139
140 if (x < 0) {
bf8fe701 141 debugs(79, 1, "storeDiskdSend UNLINK: " << xstrerror());
b9ae18aa 142 ::unlink(buf); /* XXX EWW! */
143 // shm.put (shm_offset);
144 }
145
146 diskd_stats.unlink.ops++;
147}
148
149void
150DiskdIOStrategy::init()
151{
b5d712b5 152 int pid;
153 void * hIpc;
b9ae18aa 154 int rfd;
155 int ikey;
156 const char *args[5];
157 char skey1[32];
158 char skey2[32];
159 char skey3[32];
f7fbb7a5 160 IpAddress localhost;
b9ae18aa 161
162 ikey = (getpid() << 10) + (instanceID << 2);
163 ikey &= 0x7fffffff;
164 smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
165
166 if (smsgid < 0) {
bf8fe701 167 debugs(50, 0, "storeDiskdInit: msgget: " << xstrerror());
b9ae18aa 168 fatal("msgget failed");
169 }
170
171 rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
172
173 if (rmsgid < 0) {
bf8fe701 174 debugs(50, 0, "storeDiskdInit: msgget: " << xstrerror());
b9ae18aa 175 fatal("msgget failed");
176 }
177
178 shm.init(ikey, magic2);
179 snprintf(skey1, 32, "%d", ikey);
180 snprintf(skey2, 32, "%d", ikey + 1);
181 snprintf(skey3, 32, "%d", ikey + 2);
182 args[0] = "diskd";
183 args[1] = skey1;
184 args[2] = skey2;
185 args[3] = skey3;
186 args[4] = NULL;
cc192b50 187 localhost.SetLocalhost();
b5d712b5 188 pid = ipcCreate(IPC_STREAM,
189 Config.Program.diskd,
190 args,
191 "diskd",
cc192b50 192 localhost,
b5d712b5 193 &rfd,
194 &wfd,
195 &hIpc);
196
197 if (pid < 0)
b9ae18aa 198 fatalf("execl: %s", Config.Program.diskd);
199
200 if (rfd != wfd)
201 comm_close(rfd);
202
203 fd_note(wfd, "squid -> diskd");
204
205 commSetTimeout(wfd, -1, NULL, NULL);
206
207 commSetNonBlocking(wfd);
208
209 comm_quick_poll_required();
210}
211
212/*
213 * SHM manipulation routines
214 */
215void
ee139403 216SharedMemory::put(ssize_t offset)
b9ae18aa 217{
218 int i;
219 assert(offset >= 0);
220 assert(offset < nbufs * SHMBUF_BLKSZ);
221 i = offset / SHMBUF_BLKSZ;
222 assert(i < nbufs);
223 assert(CBIT_TEST(inuse_map, i));
224 CBIT_CLR(inuse_map, i);
225 --diskd_stats.shmbuf_count;
226}
227
228void *
229
ee139403 230SharedMemory::get(ssize_t * shm_offset)
b9ae18aa 231{
232 char *aBuf = NULL;
233 int i;
234
235 for (i = 0; i < nbufs; i++) {
236 if (CBIT_TEST(inuse_map, i))
237 continue;
238
239 CBIT_SET(inuse_map, i);
240
241 *shm_offset = i * SHMBUF_BLKSZ;
242
243 aBuf = buf + (*shm_offset);
244
245 break;
246 }
247
248 assert(aBuf);
249 assert(aBuf >= buf);
250 assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
251 diskd_stats.shmbuf_count++;
252
253 if (diskd_stats.max_shmuse < diskd_stats.shmbuf_count)
254 diskd_stats.max_shmuse = diskd_stats.shmbuf_count;
255
256 return aBuf;
257}
258
259void
260SharedMemory::init(int ikey, int magic2)
261{
262 nbufs = (int)(magic2 * 1.3);
263 id = shmget((key_t) (ikey + 2),
264 nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
265
266 if (id < 0) {
bf8fe701 267 debugs(50, 0, "storeDiskdInit: shmget: " << xstrerror());
b9ae18aa 268 fatal("shmget failed");
269 }
270
271 buf = (char *)shmat(id, NULL, 0);
272
273 if (buf == (void *) -1) {
bf8fe701 274 debugs(50, 0, "storeDiskdInit: shmat: " << xstrerror());
b9ae18aa 275 fatal("shmat failed");
276 }
277
278 inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
279 diskd_stats.shmbuf_count += nbufs;
280
281 for (int i = 0; i < nbufs; i++) {
282 CBIT_SET(inuse_map, i);
283 put (i * SHMBUF_BLKSZ);
284 }
285}
286
287void
288DiskdIOStrategy::unlinkDone(diomsg * M)
289{
bf8fe701 290 debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
b9ae18aa 291 statCounter.syscalls.disk.unlinks++;
292
293 if (M->status < 0)
294 diskd_stats.unlink.fail++;
295 else
296 diskd_stats.unlink.success++;
297}
298
299void
300DiskdIOStrategy::handle(diomsg * M)
301{
302 if (!cbdataReferenceValid (M->callback_data)) {
303 /* I.e. already closed file
304 * - say when we have a error opening after
305 * a read was already queued
306 */
26ac0430 307 debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
b9ae18aa 308 cbdataReferenceDone (M->callback_data);
309 return;
310 }
311
312
a1ad81aa 313 /* set errno passed from diskd. makes debugging more meaningful */
314 if (M->status < 0)
315 errno = -M->status;
316
b9ae18aa 317 if (M->newstyle) {
318 DiskdFile *theFile = (DiskdFile *)M->callback_data;
319 theFile->RefCountDereference();
320 theFile->completed (M);
321 } else
322 switch (M->mtype) {
323
324 case _MQD_OPEN:
325
326 case _MQD_CREATE:
327
328 case _MQD_CLOSE:
329
330 case _MQD_READ:
331
332 case _MQD_WRITE:
333 assert (0);
334 break;
335
336 case _MQD_UNLINK:
337 unlinkDone(M);
338 break;
339
340 default:
341 assert(0);
342 break;
343 }
344
345 cbdataReferenceDone (M->callback_data);
346}
347
348int
ee139403 349DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, RefCountable_ *requestor)
b9ae18aa 350{
b9ae18aa 351 diomsg M;
b9ae18aa 352 M.callback_data = cbdataReference(theFile);
353 theFile->RefCountReference();
354 M.requestor = requestor;
f30dcf2a 355 M.newstyle = true;
b9ae18aa 356
357 if (requestor)
358 requestor->RefCountReference();
359
f30dcf2a 360 return SEND(&M, mtype, id, size, offset, shm_offset);
b9ae18aa 361}
362
363int
63be0a78 364DiskdIOStrategy::send(int mtype, int id, RefCount<StoreIOState> sio, size_t size, off_t offset, ssize_t shm_offset)
b9ae18aa 365{
b9ae18aa 366 diomsg M;
f30dcf2a 367 M.callback_data = cbdataReference(sio.getRaw());
368 M.newstyle = false;
369
370 return SEND(&M, mtype, id, size, offset, shm_offset);
371}
372
373int
ee139403 374DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
f30dcf2a 375{
b9ae18aa 376 static int send_errors = 0;
377 static int last_seq_no = 0;
378 static int seq_no = 0;
f30dcf2a 379 int x;
380
381 M->mtype = mtype;
382 M->size = size;
383 M->offset = offset;
384 M->status = -1;
385 M->shm_offset = (int) shm_offset;
386 M->id = id;
387 M->seq_no = ++seq_no;
b9ae18aa 388
f30dcf2a 389 if (M->seq_no < last_seq_no)
bf8fe701 390 debugs(79, 1, "WARNING: sequencing out of order");
b9ae18aa 391
f30dcf2a 392 x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
b9ae18aa 393
f30dcf2a 394 last_seq_no = M->seq_no;
b9ae18aa 395
396 if (0 == x) {
397 diskd_stats.sent_count++;
398 away++;
399 } else {
bf8fe701 400 debugs(79, 1, "storeDiskdSend: msgsnd: " << xstrerror());
f30dcf2a 401 cbdataReferenceDone(M->callback_data);
b9ae18aa 402 assert(++send_errors < 100);
6a786390 403 if (shm_offset > -1)
404 shm.put(shm_offset);
b9ae18aa 405 }
406
407 /*
408 * We have to drain the queue here if necessary. If we don't,
409 * then we can have a lot of messages in the queue (probably
410 * up to 2*magic1) and we can run out of shared memory buffers.
411 */
412 /*
c8f4eac4 413 * Note that we call Store::Root().callbackk (for all SDs), rather
414 * than callback for just this SD, so that while
b9ae18aa 415 * we're "blocking" on this SD we can also handle callbacks
416 * from other SDs that might be ready.
417 */
b9ae18aa 418
9a518127 419 struct timeval delay = {0, 1};
b9ae18aa 420
9a518127 421 while (away > magic2) {
b9ae18aa 422 select(0, NULL, NULL, NULL, &delay);
c8f4eac4 423 Store::Root().callback();
b9ae18aa 424
425 if (delay.tv_usec < 1000000)
426 delay.tv_usec <<= 1;
427 }
428
429 return x;
430}
431
432ConfigOption *
433DiskdIOStrategy::getOptionTree() const
434{
435 ConfigOptionVector *result = new ConfigOptionVector;
436 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ1Parse, &DiskdIOStrategy::optionQ1Dump));
437 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ2Parse, &DiskdIOStrategy::optionQ2Dump));
438 return result;
439}
440
441bool
442DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int reconfiguring)
443{
444 if (strcmp(name, "Q1") != 0)
445 return false;
446
447 int old_magic1 = magic1;
448
449 magic1 = atoi(value);
450
451 if (!reconfiguring)
452 return true;
453
454 if (old_magic1 < magic1) {
455 /*
456 * This is because shm.nbufs is computed at startup, when
457 * we call shmget(). We can't increase the Q1/Q2 parameters
458 * beyond their initial values because then we might have
459 * more "Q2 messages" than shared memory chunks, and this
460 * will cause an assertion in storeDiskdShmGet().
461 */
462 /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
bf8fe701 463 debugs(3, 1, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
b9ae18aa 464 magic1 = old_magic1;
465 return true;
466 }
467
468 if (old_magic1 != magic1)
bf8fe701 469 debugs(3, 1, "cache_dir new Q1 value '" << magic1 << "'");
b9ae18aa 470
471 return true;
472}
473
474void
475DiskdIOStrategy::optionQ1Dump(StoreEntry * e) const
476{
477 storeAppendPrintf(e, " Q1=%d", magic1);
478}
479
480bool
481DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int reconfiguring)
482{
483 if (strcmp(name, "Q2") != 0)
484 return false;
485
486 int old_magic2 = magic2;
487
488 magic2 = atoi(value);
489
490 if (!reconfiguring)
491 return true;
492
493 if (old_magic2 < magic2) {
494 /* See comments in Q1 function above */
bf8fe701 495 debugs(3, 1, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
b9ae18aa 496 magic2 = old_magic2;
497 return true;
498 }
499
500 if (old_magic2 != magic2)
bf8fe701 501 debugs(3, 1, "cache_dir new Q2 value '" << magic2 << "'");
b9ae18aa 502
503 return true;
504}
505
506void
507DiskdIOStrategy::optionQ2Dump(StoreEntry * e) const
508{
509 storeAppendPrintf(e, " Q2=%d", magic2);
510}
511
512/*
513 * Sync any pending data. We just sit around and read the queue
514 * until the data has finished writing.
515 */
516void
517DiskdIOStrategy::sync()
518{
519 static time_t lastmsg = 0;
520
521 while (away > 0) {
522 if (squid_curtime > lastmsg) {
bf8fe701 523 debugs(47, 1, "storeDiskdDirSync: " << away << " messages away");
b9ae18aa 524 lastmsg = squid_curtime;
525 }
526
527 callback();
528 }
529}
530
531
532/*
533 * Handle callbacks. If we have more than magic2 requests away, we block
534 * until the queue is below magic2. Otherwise, we simply return when we
535 * don't get a message.
536 */
537
538int
539DiskdIOStrategy::callback()
540{
541 diomsg M;
542 int x;
543 int retval = 0;
544
545 if (away >= magic2) {
546 diskd_stats.block_queue_len++;
547 retval = 1;
548 /* We might not have anything to do, but our queue
549 * is full.. */
550 }
551
552 if (diskd_stats.sent_count - diskd_stats.recv_count >
553 diskd_stats.max_away) {
554 diskd_stats.max_away = diskd_stats.sent_count - diskd_stats.recv_count;
555 }
556
557 while (1) {
558#ifdef ALWAYS_ZERO_BUFFERS
559 memset(&M, '\0', sizeof(M));
560#endif
561
562 x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
563
564 if (x < 0)
565 break;
566 else if (x != diomsg::msg_snd_rcv_sz) {
bf8fe701 567 debugs(47, 1, "storeDiskdDirCallback: msgget returns " << x);
b9ae18aa 568 break;
569 }
570
571 diskd_stats.recv_count++;
572 --away;
573 handle(&M);
574 retval = 1; /* Return that we've actually done some work */
575
576 if (M.shm_offset > -1)
577 shm.put ((off_t) M.shm_offset);
578 }
579
580 return retval;
581}
582
583void
584DiskdIOStrategy::statfs(StoreEntry & sentry)const
585{
586 storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
587}