]> git.ipfire.org Git - thirdparty/squid.git/blame - src/DiskIO/DiskDaemon/DiskdIOStrategy.cc
Sync store meta assignments with Squid-2.
[thirdparty/squid.git] / src / DiskIO / DiskDaemon / DiskdIOStrategy.cc
CommitLineData
b9ae18aa 1
2/*
ee139403 3 * $Id: DiskdIOStrategy.cc,v 1.11 2007/08/16 23:32:28 hno Exp $
b9ae18aa 4 *
5 * DEBUG: section 79 Squid-side DISKD I/O functions.
6 * AUTHOR: Duane Wessels
7 *
8 * SQUID Web Proxy Cache http://www.squid-cache.org/
9 * ----------------------------------------------------------
10 *
11 * Squid is the result of efforts by numerous individuals from
12 * the Internet community; see the CONTRIBUTORS file for full
13 * details. Many organizations have provided support for Squid's
14 * development; see the SPONSORS file for full details. Squid is
15 * Copyrighted (C) 2001 by the Regents of the University of
16 * California; see the COPYRIGHT file for full details. Squid
17 * incorporates software developed and/or copyrighted by other
18 * sources; see the CREDITS file for full details.
19 *
20 * This program is free software; you can redistribute it and/or modify
21 * it under the terms of the GNU General Public License as published by
22 * the Free Software Foundation; either version 2 of the License, or
23 * (at your option) any later version.
24 *
25 * This program is distributed in the hope that it will be useful,
26 * but WITHOUT ANY WARRANTY; without even the implied warranty of
27 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * GNU General Public License for more details.
29 *
30 * You should have received a copy of the GNU General Public License
31 * along with this program; if not, write to the Free Software
32 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
33 *
34 * Copyright (c) 2003, Robert Collins <robertc@squid-cache.org>
35 */
36
37#include "squid.h"
38
39#include <sys/ipc.h>
40#include <sys/msg.h>
41#include <sys/shm.h>
42#include "DiskdIOStrategy.h"
43#include "ConfigOption.h"
44#include "DiskIO/DiskFile.h"
45#include "DiskdFile.h"
46#include "diomsg.h"
47/* for statfs */
48#include "Store.h"
985c86bc 49#include "SquidTime.h"
b9ae18aa 50
51diskd_stats_t diskd_stats;
52
53size_t DiskdIOStrategy::nextInstanceID (0);
54const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
55
56size_t
57DiskdIOStrategy::newInstance()
58{
59 return ++nextInstanceID;
60}
61
62bool
63DiskdIOStrategy::shedLoad()
64{
65 /*
66 * Fail on open() if there are too many requests queued.
67 */
68
69 if (away > magic1) {
bf8fe701 70 debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
b9ae18aa 71
72 return true;
73 }
74
75 return false;
76}
77
78int
79DiskdIOStrategy::load()
80{
81 /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
82 /* the parse function guarantees magic2 is positivie */
83 return away * 1000 / magic2;
84}
85
86void
87DiskdIOStrategy::openFailed()
88{
89 diskd_stats.open_fail_queue_len++;
90}
91
92DiskFile::Pointer
93DiskdIOStrategy::newFile(char const *path)
94{
95 if (shedLoad()) {
96 openFailed();
97 return NULL;
98 }
99
100 return new DiskdFile (path, this);
101}
102
103DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0) , smsgid(-1), rmsgid(-1), wfd(-1) , instanceID(newInstance())
104{}
105
106void
107DiskdIOStrategy::unlinkFile(char const *path)
108{
109 if (shedLoad()) {
110 /* Damn, we need to issue a sync unlink here :( */
bf8fe701 111 debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
b9ae18aa 112#if USE_UNLINKD
113
114 unlinkdUnlink(path);
b9ae18aa 115#else
116
117 unlink(path);
118#endif
119
120 return;
121 }
122
123 /* We can attempt a diskd unlink */
124 int x;
125
ee139403 126 ssize_t shm_offset;
b9ae18aa 127
128 char *buf;
129
130 buf = (char *)shm.get(&shm_offset);
131
132 xstrncpy(buf, path, SHMBUF_BLKSZ);
133
134 x = send(_MQD_UNLINK,
135 0,
136 (StoreIOState::Pointer )NULL,
137 0,
138 0,
139 shm_offset);
140
141 if (x < 0) {
bf8fe701 142 debugs(79, 1, "storeDiskdSend UNLINK: " << xstrerror());
b9ae18aa 143 ::unlink(buf); /* XXX EWW! */
144 // shm.put (shm_offset);
145 }
146
147 diskd_stats.unlink.ops++;
148}
149
150void
151DiskdIOStrategy::init()
152{
b5d712b5 153 int pid;
154 void * hIpc;
b9ae18aa 155 int rfd;
156 int ikey;
157 const char *args[5];
158 char skey1[32];
159 char skey2[32];
160 char skey3[32];
161
162 ikey = (getpid() << 10) + (instanceID << 2);
163 ikey &= 0x7fffffff;
164 smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
165
166 if (smsgid < 0) {
bf8fe701 167 debugs(50, 0, "storeDiskdInit: msgget: " << xstrerror());
b9ae18aa 168 fatal("msgget failed");
169 }
170
171 rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
172
173 if (rmsgid < 0) {
bf8fe701 174 debugs(50, 0, "storeDiskdInit: msgget: " << xstrerror());
b9ae18aa 175 fatal("msgget failed");
176 }
177
178 shm.init(ikey, magic2);
179 snprintf(skey1, 32, "%d", ikey);
180 snprintf(skey2, 32, "%d", ikey + 1);
181 snprintf(skey3, 32, "%d", ikey + 2);
182 args[0] = "diskd";
183 args[1] = skey1;
184 args[2] = skey2;
185 args[3] = skey3;
186 args[4] = NULL;
b5d712b5 187 pid = ipcCreate(IPC_STREAM,
188 Config.Program.diskd,
189 args,
190 "diskd",
191 &rfd,
192 &wfd,
193 &hIpc);
194
195 if (pid < 0)
b9ae18aa 196 fatalf("execl: %s", Config.Program.diskd);
197
198 if (rfd != wfd)
199 comm_close(rfd);
200
201 fd_note(wfd, "squid -> diskd");
202
203 commSetTimeout(wfd, -1, NULL, NULL);
204
205 commSetNonBlocking(wfd);
206
207 comm_quick_poll_required();
208}
209
210/*
211 * SHM manipulation routines
212 */
213void
ee139403 214SharedMemory::put(ssize_t offset)
b9ae18aa 215{
216 int i;
217 assert(offset >= 0);
218 assert(offset < nbufs * SHMBUF_BLKSZ);
219 i = offset / SHMBUF_BLKSZ;
220 assert(i < nbufs);
221 assert(CBIT_TEST(inuse_map, i));
222 CBIT_CLR(inuse_map, i);
223 --diskd_stats.shmbuf_count;
224}
225
226void *
227
ee139403 228SharedMemory::get(ssize_t * shm_offset)
b9ae18aa 229{
230 char *aBuf = NULL;
231 int i;
232
233 for (i = 0; i < nbufs; i++) {
234 if (CBIT_TEST(inuse_map, i))
235 continue;
236
237 CBIT_SET(inuse_map, i);
238
239 *shm_offset = i * SHMBUF_BLKSZ;
240
241 aBuf = buf + (*shm_offset);
242
243 break;
244 }
245
246 assert(aBuf);
247 assert(aBuf >= buf);
248 assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
249 diskd_stats.shmbuf_count++;
250
251 if (diskd_stats.max_shmuse < diskd_stats.shmbuf_count)
252 diskd_stats.max_shmuse = diskd_stats.shmbuf_count;
253
254 return aBuf;
255}
256
257void
258SharedMemory::init(int ikey, int magic2)
259{
260 nbufs = (int)(magic2 * 1.3);
261 id = shmget((key_t) (ikey + 2),
262 nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
263
264 if (id < 0) {
bf8fe701 265 debugs(50, 0, "storeDiskdInit: shmget: " << xstrerror());
b9ae18aa 266 fatal("shmget failed");
267 }
268
269 buf = (char *)shmat(id, NULL, 0);
270
271 if (buf == (void *) -1) {
bf8fe701 272 debugs(50, 0, "storeDiskdInit: shmat: " << xstrerror());
b9ae18aa 273 fatal("shmat failed");
274 }
275
276 inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
277 diskd_stats.shmbuf_count += nbufs;
278
279 for (int i = 0; i < nbufs; i++) {
280 CBIT_SET(inuse_map, i);
281 put (i * SHMBUF_BLKSZ);
282 }
283}
284
285void
286DiskdIOStrategy::unlinkDone(diomsg * M)
287{
bf8fe701 288 debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
b9ae18aa 289 statCounter.syscalls.disk.unlinks++;
290
291 if (M->status < 0)
292 diskd_stats.unlink.fail++;
293 else
294 diskd_stats.unlink.success++;
295}
296
297void
298DiskdIOStrategy::handle(diomsg * M)
299{
300 if (!cbdataReferenceValid (M->callback_data)) {
301 /* I.e. already closed file
302 * - say when we have a error opening after
303 * a read was already queued
304 */
bf8fe701 305 debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
b9ae18aa 306 cbdataReferenceDone (M->callback_data);
307 return;
308 }
309
310
a1ad81aa 311 /* set errno passed from diskd. makes debugging more meaningful */
312 if (M->status < 0)
313 errno = -M->status;
314
b9ae18aa 315 if (M->newstyle) {
316 DiskdFile *theFile = (DiskdFile *)M->callback_data;
317 theFile->RefCountDereference();
318 theFile->completed (M);
319 } else
320 switch (M->mtype) {
321
322 case _MQD_OPEN:
323
324 case _MQD_CREATE:
325
326 case _MQD_CLOSE:
327
328 case _MQD_READ:
329
330 case _MQD_WRITE:
331 assert (0);
332 break;
333
334 case _MQD_UNLINK:
335 unlinkDone(M);
336 break;
337
338 default:
339 assert(0);
340 break;
341 }
342
343 cbdataReferenceDone (M->callback_data);
344}
345
346int
ee139403 347DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, RefCountable_ *requestor)
b9ae18aa 348{
b9ae18aa 349 diomsg M;
b9ae18aa 350 M.callback_data = cbdataReference(theFile);
351 theFile->RefCountReference();
352 M.requestor = requestor;
f30dcf2a 353 M.newstyle = true;
b9ae18aa 354
355 if (requestor)
356 requestor->RefCountReference();
357
f30dcf2a 358 return SEND(&M, mtype, id, size, offset, shm_offset);
b9ae18aa 359}
360
361int
ee139403 362DiskdIOStrategy::send(int mtype, int id, StoreIOState::Pointer sio, size_t size, off_t offset, ssize_t shm_offset)
b9ae18aa 363{
b9ae18aa 364 diomsg M;
f30dcf2a 365 M.callback_data = cbdataReference(sio.getRaw());
366 M.newstyle = false;
367
368 return SEND(&M, mtype, id, size, offset, shm_offset);
369}
370
371int
ee139403 372DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
f30dcf2a 373{
b9ae18aa 374 static int send_errors = 0;
375 static int last_seq_no = 0;
376 static int seq_no = 0;
f30dcf2a 377 int x;
378
379 M->mtype = mtype;
380 M->size = size;
381 M->offset = offset;
382 M->status = -1;
383 M->shm_offset = (int) shm_offset;
384 M->id = id;
385 M->seq_no = ++seq_no;
b9ae18aa 386
f30dcf2a 387 if (M->seq_no < last_seq_no)
bf8fe701 388 debugs(79, 1, "WARNING: sequencing out of order");
b9ae18aa 389
f30dcf2a 390 x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
b9ae18aa 391
f30dcf2a 392 last_seq_no = M->seq_no;
b9ae18aa 393
394 if (0 == x) {
395 diskd_stats.sent_count++;
396 away++;
397 } else {
bf8fe701 398 debugs(79, 1, "storeDiskdSend: msgsnd: " << xstrerror());
f30dcf2a 399 cbdataReferenceDone(M->callback_data);
b9ae18aa 400 assert(++send_errors < 100);
6a786390 401 if (shm_offset > -1)
402 shm.put(shm_offset);
b9ae18aa 403 }
404
405 /*
406 * We have to drain the queue here if necessary. If we don't,
407 * then we can have a lot of messages in the queue (probably
408 * up to 2*magic1) and we can run out of shared memory buffers.
409 */
410 /*
c8f4eac4 411 * Note that we call Store::Root().callbackk (for all SDs), rather
412 * than callback for just this SD, so that while
b9ae18aa 413 * we're "blocking" on this SD we can also handle callbacks
414 * from other SDs that might be ready.
415 */
b9ae18aa 416
9a518127 417 struct timeval delay = {0, 1};
b9ae18aa 418
9a518127 419 while (away > magic2) {
b9ae18aa 420 select(0, NULL, NULL, NULL, &delay);
c8f4eac4 421 Store::Root().callback();
b9ae18aa 422
423 if (delay.tv_usec < 1000000)
424 delay.tv_usec <<= 1;
425 }
426
427 return x;
428}
429
430ConfigOption *
431DiskdIOStrategy::getOptionTree() const
432{
433 ConfigOptionVector *result = new ConfigOptionVector;
434 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ1Parse, &DiskdIOStrategy::optionQ1Dump));
435 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ2Parse, &DiskdIOStrategy::optionQ2Dump));
436 return result;
437}
438
439bool
440DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int reconfiguring)
441{
442 if (strcmp(name, "Q1") != 0)
443 return false;
444
445 int old_magic1 = magic1;
446
447 magic1 = atoi(value);
448
449 if (!reconfiguring)
450 return true;
451
452 if (old_magic1 < magic1) {
453 /*
454 * This is because shm.nbufs is computed at startup, when
455 * we call shmget(). We can't increase the Q1/Q2 parameters
456 * beyond their initial values because then we might have
457 * more "Q2 messages" than shared memory chunks, and this
458 * will cause an assertion in storeDiskdShmGet().
459 */
460 /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
bf8fe701 461 debugs(3, 1, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
b9ae18aa 462 magic1 = old_magic1;
463 return true;
464 }
465
466 if (old_magic1 != magic1)
bf8fe701 467 debugs(3, 1, "cache_dir new Q1 value '" << magic1 << "'");
b9ae18aa 468
469 return true;
470}
471
472void
473DiskdIOStrategy::optionQ1Dump(StoreEntry * e) const
474{
475 storeAppendPrintf(e, " Q1=%d", magic1);
476}
477
478bool
479DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int reconfiguring)
480{
481 if (strcmp(name, "Q2") != 0)
482 return false;
483
484 int old_magic2 = magic2;
485
486 magic2 = atoi(value);
487
488 if (!reconfiguring)
489 return true;
490
491 if (old_magic2 < magic2) {
492 /* See comments in Q1 function above */
bf8fe701 493 debugs(3, 1, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
b9ae18aa 494 magic2 = old_magic2;
495 return true;
496 }
497
498 if (old_magic2 != magic2)
bf8fe701 499 debugs(3, 1, "cache_dir new Q2 value '" << magic2 << "'");
b9ae18aa 500
501 return true;
502}
503
504void
505DiskdIOStrategy::optionQ2Dump(StoreEntry * e) const
506{
507 storeAppendPrintf(e, " Q2=%d", magic2);
508}
509
510/*
511 * Sync any pending data. We just sit around and read the queue
512 * until the data has finished writing.
513 */
514void
515DiskdIOStrategy::sync()
516{
517 static time_t lastmsg = 0;
518
519 while (away > 0) {
520 if (squid_curtime > lastmsg) {
bf8fe701 521 debugs(47, 1, "storeDiskdDirSync: " << away << " messages away");
b9ae18aa 522 lastmsg = squid_curtime;
523 }
524
525 callback();
526 }
527}
528
529
530/*
531 * Handle callbacks. If we have more than magic2 requests away, we block
532 * until the queue is below magic2. Otherwise, we simply return when we
533 * don't get a message.
534 */
535
536int
537DiskdIOStrategy::callback()
538{
539 diomsg M;
540 int x;
541 int retval = 0;
542
543 if (away >= magic2) {
544 diskd_stats.block_queue_len++;
545 retval = 1;
546 /* We might not have anything to do, but our queue
547 * is full.. */
548 }
549
550 if (diskd_stats.sent_count - diskd_stats.recv_count >
551 diskd_stats.max_away) {
552 diskd_stats.max_away = diskd_stats.sent_count - diskd_stats.recv_count;
553 }
554
555 while (1) {
556#ifdef ALWAYS_ZERO_BUFFERS
557 memset(&M, '\0', sizeof(M));
558#endif
559
560 x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
561
562 if (x < 0)
563 break;
564 else if (x != diomsg::msg_snd_rcv_sz) {
bf8fe701 565 debugs(47, 1, "storeDiskdDirCallback: msgget returns " << x);
b9ae18aa 566 break;
567 }
568
569 diskd_stats.recv_count++;
570 --away;
571 handle(&M);
572 retval = 1; /* Return that we've actually done some work */
573
574 if (M.shm_offset > -1)
575 shm.put ((off_t) M.shm_offset);
576 }
577
578 return retval;
579}
580
581void
582DiskdIOStrategy::statfs(StoreEntry & sentry)const
583{
584 storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
585}