]> git.ipfire.org Git - thirdparty/squid.git/blob - src/DiskIO/DiskDaemon/DiskdIOStrategy.cc
Moved SquidConfig definition from structs.h to own header file.
[thirdparty/squid.git] / src / DiskIO / DiskDaemon / DiskdIOStrategy.cc
1 /*
2 * DEBUG: section 79 Squid-side DISKD I/O functions.
3 * AUTHOR: Duane Wessels
4 *
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
7 *
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
30 *
31 * Copyright (c) 2003, Robert Collins <robertc@squid-cache.org>
32 */
33
34 #include "squid.h"
35 #include "comm/Loops.h"
36 #include "ConfigOption.h"
37 #include "DiskdIOStrategy.h"
38 #include "DiskIO/DiskFile.h"
39 #include "DiskdFile.h"
40 #include "diomsg.h"
41 #include "fd.h"
42 #include "Store.h"
43 #include "StatCounters.h"
44 #include "SquidConfig.h"
45 #include "SquidIpc.h"
46 #include "SquidTime.h"
47 #include "unlinkd.h"
48
49 #include <sys/ipc.h>
50 #include <sys/msg.h>
51 #include <sys/shm.h>
52 #if HAVE_ERRNO_H
53 #include <errno.h>
54 #endif
55
56 diskd_stats_t diskd_stats;
57
58 size_t DiskdIOStrategy::nextInstanceID (0);
59 const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
60
61 size_t
62 DiskdIOStrategy::newInstance()
63 {
64 return ++nextInstanceID;
65 }
66
67 bool
68 DiskdIOStrategy::shedLoad()
69 {
70 /*
71 * Fail on open() if there are too many requests queued.
72 */
73
74 if (away > magic1) {
75 debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
76
77 return true;
78 }
79
80 return false;
81 }
82
83 int
84 DiskdIOStrategy::load()
85 {
86 /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
87 /* the parse function guarantees magic2 is positivie */
88 return away * 1000 / magic2;
89 }
90
91 void
92 DiskdIOStrategy::openFailed()
93 {
94 ++diskd_stats.open_fail_queue_len;
95 }
96
97 DiskFile::Pointer
98 DiskdIOStrategy::newFile(char const *path)
99 {
100 if (shedLoad()) {
101 openFailed();
102 return NULL;
103 }
104
105 return new DiskdFile (path, this);
106 }
107
108 DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0) , smsgid(-1), rmsgid(-1), wfd(-1) , instanceID(newInstance())
109 {}
110
111 bool
112 DiskdIOStrategy::unlinkdUseful() const
113 {
114 return true;
115 }
116
117 void
118 DiskdIOStrategy::unlinkFile(char const *path)
119 {
120 if (shedLoad()) {
121 /* Damn, we need to issue a sync unlink here :( */
122 debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
123 unlinkdUnlink(path);
124 return;
125 }
126
127 /* We can attempt a diskd unlink */
128 int x;
129
130 ssize_t shm_offset;
131
132 char *buf;
133
134 buf = (char *)shm.get(&shm_offset);
135
136 xstrncpy(buf, path, SHMBUF_BLKSZ);
137
138 x = send(_MQD_UNLINK,
139 0,
140 (StoreIOState::Pointer )NULL,
141 0,
142 0,
143 shm_offset);
144
145 if (x < 0) {
146 debugs(79, DBG_IMPORTANT, "storeDiskdSend UNLINK: " << xstrerror());
147 ::unlink(buf); /* XXX EWW! */
148 // shm.put (shm_offset);
149 }
150
151 ++diskd_stats.unlink.ops;
152 }
153
154 void
155 DiskdIOStrategy::init()
156 {
157 int pid;
158 void * hIpc;
159 int rfd;
160 int ikey;
161 const char *args[5];
162 char skey1[32];
163 char skey2[32];
164 char skey3[32];
165 Ip::Address localhost;
166
167 ikey = (getpid() << 10) + (instanceID << 2);
168 ikey &= 0x7fffffff;
169 smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
170
171 if (smsgid < 0) {
172 debugs(50, DBG_CRITICAL, "storeDiskdInit: msgget: " << xstrerror());
173 fatal("msgget failed");
174 }
175
176 rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
177
178 if (rmsgid < 0) {
179 debugs(50, DBG_CRITICAL, "storeDiskdInit: msgget: " << xstrerror());
180 fatal("msgget failed");
181 }
182
183 shm.init(ikey, magic2);
184 snprintf(skey1, 32, "%d", ikey);
185 snprintf(skey2, 32, "%d", ikey + 1);
186 snprintf(skey3, 32, "%d", ikey + 2);
187 args[0] = "diskd";
188 args[1] = skey1;
189 args[2] = skey2;
190 args[3] = skey3;
191 args[4] = NULL;
192 localhost.SetLocalhost();
193 pid = ipcCreate(IPC_STREAM,
194 Config.Program.diskd,
195 args,
196 "diskd",
197 localhost,
198 &rfd,
199 &wfd,
200 &hIpc);
201
202 if (pid < 0)
203 fatalf("execl: %s", Config.Program.diskd);
204
205 if (rfd != wfd)
206 comm_close(rfd);
207
208 fd_note(wfd, "squid -> diskd");
209
210 commUnsetFdTimeout(wfd);
211 commSetNonBlocking(wfd);
212 Comm::QuickPollRequired();
213 }
214
215 /*
216 * SHM manipulation routines
217 */
218 void
219 SharedMemory::put(ssize_t offset)
220 {
221 int i;
222 assert(offset >= 0);
223 assert(offset < nbufs * SHMBUF_BLKSZ);
224 i = offset / SHMBUF_BLKSZ;
225 assert(i < nbufs);
226 assert(CBIT_TEST(inuse_map, i));
227 CBIT_CLR(inuse_map, i);
228 --diskd_stats.shmbuf_count;
229 }
230
231 void *
232
233 SharedMemory::get(ssize_t * shm_offset)
234 {
235 char *aBuf = NULL;
236 int i;
237
238 for (i = 0; i < nbufs; ++i) {
239 if (CBIT_TEST(inuse_map, i))
240 continue;
241
242 CBIT_SET(inuse_map, i);
243
244 *shm_offset = i * SHMBUF_BLKSZ;
245
246 aBuf = buf + (*shm_offset);
247
248 break;
249 }
250
251 assert(aBuf);
252 assert(aBuf >= buf);
253 assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
254 ++diskd_stats.shmbuf_count;
255
256 if (diskd_stats.max_shmuse < diskd_stats.shmbuf_count)
257 diskd_stats.max_shmuse = diskd_stats.shmbuf_count;
258
259 return aBuf;
260 }
261
262 void
263 SharedMemory::init(int ikey, int magic2)
264 {
265 nbufs = (int)(magic2 * 1.3);
266 id = shmget((key_t) (ikey + 2),
267 nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
268
269 if (id < 0) {
270 debugs(50, DBG_CRITICAL, "storeDiskdInit: shmget: " << xstrerror());
271 fatal("shmget failed");
272 }
273
274 buf = (char *)shmat(id, NULL, 0);
275
276 if (buf == (void *) -1) {
277 debugs(50, DBG_CRITICAL, "storeDiskdInit: shmat: " << xstrerror());
278 fatal("shmat failed");
279 }
280
281 inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
282 diskd_stats.shmbuf_count += nbufs;
283
284 for (int i = 0; i < nbufs; ++i) {
285 CBIT_SET(inuse_map, i);
286 put (i * SHMBUF_BLKSZ);
287 }
288 }
289
290 void
291 DiskdIOStrategy::unlinkDone(diomsg * M)
292 {
293 debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
294 ++statCounter.syscalls.disk.unlinks;
295
296 if (M->status < 0)
297 ++diskd_stats.unlink.fail;
298 else
299 ++diskd_stats.unlink.success;
300 }
301
302 void
303 DiskdIOStrategy::handle(diomsg * M)
304 {
305 if (!cbdataReferenceValid (M->callback_data)) {
306 /* I.e. already closed file
307 * - say when we have a error opening after
308 * a read was already queued
309 */
310 debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
311 cbdataReferenceDone (M->callback_data);
312 return;
313 }
314
315 /* set errno passed from diskd. makes debugging more meaningful */
316 if (M->status < 0)
317 errno = -M->status;
318
319 if (M->newstyle) {
320 DiskdFile *theFile = (DiskdFile *)M->callback_data;
321 theFile->RefCountDereference();
322 theFile->completed (M);
323 } else
324 switch (M->mtype) {
325
326 case _MQD_OPEN:
327
328 case _MQD_CREATE:
329
330 case _MQD_CLOSE:
331
332 case _MQD_READ:
333
334 case _MQD_WRITE:
335 assert (0);
336 break;
337
338 case _MQD_UNLINK:
339 unlinkDone(M);
340 break;
341
342 default:
343 assert(0);
344 break;
345 }
346
347 cbdataReferenceDone (M->callback_data);
348 }
349
350 int
351 DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, RefCountable_ *requestor)
352 {
353 diomsg M;
354 M.callback_data = cbdataReference(theFile);
355 theFile->RefCountReference();
356 M.requestor = requestor;
357 M.newstyle = true;
358
359 if (requestor)
360 requestor->RefCountReference();
361
362 return SEND(&M, mtype, id, size, offset, shm_offset);
363 }
364
365 int
366 DiskdIOStrategy::send(int mtype, int id, RefCount<StoreIOState> sio, size_t size, off_t offset, ssize_t shm_offset)
367 {
368 diomsg M;
369 M.callback_data = cbdataReference(sio.getRaw());
370 M.newstyle = false;
371
372 return SEND(&M, mtype, id, size, offset, shm_offset);
373 }
374
375 int
376 DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
377 {
378 static int send_errors = 0;
379 static int last_seq_no = 0;
380 static int seq_no = 0;
381 int x;
382
383 M->mtype = mtype;
384 M->size = size;
385 M->offset = offset;
386 M->status = -1;
387 M->shm_offset = (int) shm_offset;
388 M->id = id;
389 M->seq_no = ++seq_no;
390
391 if (M->seq_no < last_seq_no)
392 debugs(79, DBG_IMPORTANT, "WARNING: sequencing out of order");
393
394 x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
395
396 last_seq_no = M->seq_no;
397
398 if (0 == x) {
399 ++diskd_stats.sent_count;
400 ++away;
401 } else {
402 debugs(79, DBG_IMPORTANT, "storeDiskdSend: msgsnd: " << xstrerror());
403 cbdataReferenceDone(M->callback_data);
404 assert(++send_errors < 100);
405 if (shm_offset > -1)
406 shm.put(shm_offset);
407 }
408
409 /*
410 * We have to drain the queue here if necessary. If we don't,
411 * then we can have a lot of messages in the queue (probably
412 * up to 2*magic1) and we can run out of shared memory buffers.
413 */
414 /*
415 * Note that we call Store::Root().callbackk (for all SDs), rather
416 * than callback for just this SD, so that while
417 * we're "blocking" on this SD we can also handle callbacks
418 * from other SDs that might be ready.
419 */
420
421 struct timeval delay = {0, 1};
422
423 while (away > magic2) {
424 select(0, NULL, NULL, NULL, &delay);
425 Store::Root().callback();
426
427 if (delay.tv_usec < 1000000)
428 delay.tv_usec <<= 1;
429 }
430
431 return x;
432 }
433
434 ConfigOption *
435 DiskdIOStrategy::getOptionTree() const
436 {
437 ConfigOptionVector *result = new ConfigOptionVector;
438 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ1Parse, &DiskdIOStrategy::optionQ1Dump));
439 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ2Parse, &DiskdIOStrategy::optionQ2Dump));
440 return result;
441 }
442
443 bool
444 DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int isaReconfig)
445 {
446 if (strcmp(name, "Q1") != 0)
447 return false;
448
449 int old_magic1 = magic1;
450
451 magic1 = atoi(value);
452
453 if (!isaReconfig)
454 return true;
455
456 if (old_magic1 < magic1) {
457 /*
458 * This is because shm.nbufs is computed at startup, when
459 * we call shmget(). We can't increase the Q1/Q2 parameters
460 * beyond their initial values because then we might have
461 * more "Q2 messages" than shared memory chunks, and this
462 * will cause an assertion in storeDiskdShmGet().
463 */
464 /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
465 debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
466 magic1 = old_magic1;
467 return true;
468 }
469
470 if (old_magic1 != magic1)
471 debugs(3, DBG_IMPORTANT, "cache_dir new Q1 value '" << magic1 << "'");
472
473 return true;
474 }
475
476 void
477 DiskdIOStrategy::optionQ1Dump(StoreEntry * e) const
478 {
479 storeAppendPrintf(e, " Q1=%d", magic1);
480 }
481
482 bool
483 DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int isaReconfig)
484 {
485 if (strcmp(name, "Q2") != 0)
486 return false;
487
488 int old_magic2 = magic2;
489
490 magic2 = atoi(value);
491
492 if (!isaReconfig)
493 return true;
494
495 if (old_magic2 < magic2) {
496 /* See comments in Q1 function above */
497 debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
498 magic2 = old_magic2;
499 return true;
500 }
501
502 if (old_magic2 != magic2)
503 debugs(3, DBG_IMPORTANT, "cache_dir new Q2 value '" << magic2 << "'");
504
505 return true;
506 }
507
508 void
509 DiskdIOStrategy::optionQ2Dump(StoreEntry * e) const
510 {
511 storeAppendPrintf(e, " Q2=%d", magic2);
512 }
513
514 /*
515 * Sync any pending data. We just sit around and read the queue
516 * until the data has finished writing.
517 */
518 void
519 DiskdIOStrategy::sync()
520 {
521 static time_t lastmsg = 0;
522
523 while (away > 0) {
524 if (squid_curtime > lastmsg) {
525 debugs(47, DBG_IMPORTANT, "storeDiskdDirSync: " << away << " messages away");
526 lastmsg = squid_curtime;
527 }
528
529 callback();
530 }
531 }
532
533 /*
534 * Handle callbacks. If we have more than magic2 requests away, we block
535 * until the queue is below magic2. Otherwise, we simply return when we
536 * don't get a message.
537 */
538
539 int
540 DiskdIOStrategy::callback()
541 {
542 diomsg M;
543 int x;
544 int retval = 0;
545
546 if (away >= magic2) {
547 ++diskd_stats.block_queue_len;
548 retval = 1;
549 /* We might not have anything to do, but our queue
550 * is full.. */
551 }
552
553 if (diskd_stats.sent_count - diskd_stats.recv_count >
554 diskd_stats.max_away) {
555 diskd_stats.max_away = diskd_stats.sent_count - diskd_stats.recv_count;
556 }
557
558 while (1) {
559 #ifdef ALWAYS_ZERO_BUFFERS
560 memset(&M, '\0', sizeof(M));
561 #endif
562
563 x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
564
565 if (x < 0)
566 break;
567 else if (x != diomsg::msg_snd_rcv_sz) {
568 debugs(47, DBG_IMPORTANT, "storeDiskdDirCallback: msgget returns " << x);
569 break;
570 }
571
572 ++diskd_stats.recv_count;
573 --away;
574 handle(&M);
575 retval = 1; /* Return that we've actually done some work */
576
577 if (M.shm_offset > -1)
578 shm.put ((off_t) M.shm_offset);
579 }
580
581 return retval;
582 }
583
584 void
585 DiskdIOStrategy::statfs(StoreEntry & sentry)const
586 {
587 storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
588 }