]> git.ipfire.org Git - thirdparty/squid.git/blob - src/DiskIO/DiskDaemon/DiskdIOStrategy.cc
Removed inclusion of protos.h from most clients.
[thirdparty/squid.git] / src / DiskIO / DiskDaemon / DiskdIOStrategy.cc
1 /*
2 * DEBUG: section 79 Squid-side DISKD I/O functions.
3 * AUTHOR: Duane Wessels
4 *
5 * SQUID Web Proxy Cache http://www.squid-cache.org/
6 * ----------------------------------------------------------
7 *
8 * Squid is the result of efforts by numerous individuals from
9 * the Internet community; see the CONTRIBUTORS file for full
10 * details. Many organizations have provided support for Squid's
11 * development; see the SPONSORS file for full details. Squid is
12 * Copyrighted (C) 2001 by the Regents of the University of
13 * California; see the COPYRIGHT file for full details. Squid
14 * incorporates software developed and/or copyrighted by other
15 * sources; see the CREDITS file for full details.
16 *
17 * This program is free software; you can redistribute it and/or modify
18 * it under the terms of the GNU General Public License as published by
19 * the Free Software Foundation; either version 2 of the License, or
20 * (at your option) any later version.
21 *
22 * This program is distributed in the hope that it will be useful,
23 * but WITHOUT ANY WARRANTY; without even the implied warranty of
24 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
25 * GNU General Public License for more details.
26 *
27 * You should have received a copy of the GNU General Public License
28 * along with this program; if not, write to the Free Software
29 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
30 *
31 * Copyright (c) 2003, Robert Collins <robertc@squid-cache.org>
32 */
33
34 #include "squid.h"
35 #include "comm/Loops.h"
36 #include "ConfigOption.h"
37 #include "DiskdIOStrategy.h"
38 #include "DiskIO/DiskFile.h"
39 #include "DiskdFile.h"
40 #include "diomsg.h"
41 #include "fd.h"
42 #include "Store.h"
43 #include "StatCounters.h"
44 #include "SquidIpc.h"
45 #include "SquidTime.h"
46 #include "unlinkd.h"
47
48 #include <sys/ipc.h>
49 #include <sys/msg.h>
50 #include <sys/shm.h>
51 #if HAVE_ERRNO_H
52 #include <errno.h>
53 #endif
54
55 diskd_stats_t diskd_stats;
56
57 size_t DiskdIOStrategy::nextInstanceID (0);
58 const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
59
60 size_t
61 DiskdIOStrategy::newInstance()
62 {
63 return ++nextInstanceID;
64 }
65
66 bool
67 DiskdIOStrategy::shedLoad()
68 {
69 /*
70 * Fail on open() if there are too many requests queued.
71 */
72
73 if (away > magic1) {
74 debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
75
76 return true;
77 }
78
79 return false;
80 }
81
82 int
83 DiskdIOStrategy::load()
84 {
85 /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
86 /* the parse function guarantees magic2 is positivie */
87 return away * 1000 / magic2;
88 }
89
90 void
91 DiskdIOStrategy::openFailed()
92 {
93 ++diskd_stats.open_fail_queue_len;
94 }
95
96 DiskFile::Pointer
97 DiskdIOStrategy::newFile(char const *path)
98 {
99 if (shedLoad()) {
100 openFailed();
101 return NULL;
102 }
103
104 return new DiskdFile (path, this);
105 }
106
107 DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0) , smsgid(-1), rmsgid(-1), wfd(-1) , instanceID(newInstance())
108 {}
109
110 bool
111 DiskdIOStrategy::unlinkdUseful() const
112 {
113 return true;
114 }
115
116 void
117 DiskdIOStrategy::unlinkFile(char const *path)
118 {
119 if (shedLoad()) {
120 /* Damn, we need to issue a sync unlink here :( */
121 debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
122 unlinkdUnlink(path);
123 return;
124 }
125
126 /* We can attempt a diskd unlink */
127 int x;
128
129 ssize_t shm_offset;
130
131 char *buf;
132
133 buf = (char *)shm.get(&shm_offset);
134
135 xstrncpy(buf, path, SHMBUF_BLKSZ);
136
137 x = send(_MQD_UNLINK,
138 0,
139 (StoreIOState::Pointer )NULL,
140 0,
141 0,
142 shm_offset);
143
144 if (x < 0) {
145 debugs(79, DBG_IMPORTANT, "storeDiskdSend UNLINK: " << xstrerror());
146 ::unlink(buf); /* XXX EWW! */
147 // shm.put (shm_offset);
148 }
149
150 ++diskd_stats.unlink.ops;
151 }
152
153 void
154 DiskdIOStrategy::init()
155 {
156 int pid;
157 void * hIpc;
158 int rfd;
159 int ikey;
160 const char *args[5];
161 char skey1[32];
162 char skey2[32];
163 char skey3[32];
164 Ip::Address localhost;
165
166 ikey = (getpid() << 10) + (instanceID << 2);
167 ikey &= 0x7fffffff;
168 smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
169
170 if (smsgid < 0) {
171 debugs(50, DBG_CRITICAL, "storeDiskdInit: msgget: " << xstrerror());
172 fatal("msgget failed");
173 }
174
175 rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
176
177 if (rmsgid < 0) {
178 debugs(50, DBG_CRITICAL, "storeDiskdInit: msgget: " << xstrerror());
179 fatal("msgget failed");
180 }
181
182 shm.init(ikey, magic2);
183 snprintf(skey1, 32, "%d", ikey);
184 snprintf(skey2, 32, "%d", ikey + 1);
185 snprintf(skey3, 32, "%d", ikey + 2);
186 args[0] = "diskd";
187 args[1] = skey1;
188 args[2] = skey2;
189 args[3] = skey3;
190 args[4] = NULL;
191 localhost.SetLocalhost();
192 pid = ipcCreate(IPC_STREAM,
193 Config.Program.diskd,
194 args,
195 "diskd",
196 localhost,
197 &rfd,
198 &wfd,
199 &hIpc);
200
201 if (pid < 0)
202 fatalf("execl: %s", Config.Program.diskd);
203
204 if (rfd != wfd)
205 comm_close(rfd);
206
207 fd_note(wfd, "squid -> diskd");
208
209 commUnsetFdTimeout(wfd);
210 commSetNonBlocking(wfd);
211 Comm::QuickPollRequired();
212 }
213
214 /*
215 * SHM manipulation routines
216 */
217 void
218 SharedMemory::put(ssize_t offset)
219 {
220 int i;
221 assert(offset >= 0);
222 assert(offset < nbufs * SHMBUF_BLKSZ);
223 i = offset / SHMBUF_BLKSZ;
224 assert(i < nbufs);
225 assert(CBIT_TEST(inuse_map, i));
226 CBIT_CLR(inuse_map, i);
227 --diskd_stats.shmbuf_count;
228 }
229
230 void *
231
232 SharedMemory::get(ssize_t * shm_offset)
233 {
234 char *aBuf = NULL;
235 int i;
236
237 for (i = 0; i < nbufs; ++i) {
238 if (CBIT_TEST(inuse_map, i))
239 continue;
240
241 CBIT_SET(inuse_map, i);
242
243 *shm_offset = i * SHMBUF_BLKSZ;
244
245 aBuf = buf + (*shm_offset);
246
247 break;
248 }
249
250 assert(aBuf);
251 assert(aBuf >= buf);
252 assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
253 ++diskd_stats.shmbuf_count;
254
255 if (diskd_stats.max_shmuse < diskd_stats.shmbuf_count)
256 diskd_stats.max_shmuse = diskd_stats.shmbuf_count;
257
258 return aBuf;
259 }
260
261 void
262 SharedMemory::init(int ikey, int magic2)
263 {
264 nbufs = (int)(magic2 * 1.3);
265 id = shmget((key_t) (ikey + 2),
266 nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
267
268 if (id < 0) {
269 debugs(50, DBG_CRITICAL, "storeDiskdInit: shmget: " << xstrerror());
270 fatal("shmget failed");
271 }
272
273 buf = (char *)shmat(id, NULL, 0);
274
275 if (buf == (void *) -1) {
276 debugs(50, DBG_CRITICAL, "storeDiskdInit: shmat: " << xstrerror());
277 fatal("shmat failed");
278 }
279
280 inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
281 diskd_stats.shmbuf_count += nbufs;
282
283 for (int i = 0; i < nbufs; ++i) {
284 CBIT_SET(inuse_map, i);
285 put (i * SHMBUF_BLKSZ);
286 }
287 }
288
289 void
290 DiskdIOStrategy::unlinkDone(diomsg * M)
291 {
292 debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
293 ++statCounter.syscalls.disk.unlinks;
294
295 if (M->status < 0)
296 ++diskd_stats.unlink.fail;
297 else
298 ++diskd_stats.unlink.success;
299 }
300
301 void
302 DiskdIOStrategy::handle(diomsg * M)
303 {
304 if (!cbdataReferenceValid (M->callback_data)) {
305 /* I.e. already closed file
306 * - say when we have a error opening after
307 * a read was already queued
308 */
309 debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
310 cbdataReferenceDone (M->callback_data);
311 return;
312 }
313
314 /* set errno passed from diskd. makes debugging more meaningful */
315 if (M->status < 0)
316 errno = -M->status;
317
318 if (M->newstyle) {
319 DiskdFile *theFile = (DiskdFile *)M->callback_data;
320 theFile->RefCountDereference();
321 theFile->completed (M);
322 } else
323 switch (M->mtype) {
324
325 case _MQD_OPEN:
326
327 case _MQD_CREATE:
328
329 case _MQD_CLOSE:
330
331 case _MQD_READ:
332
333 case _MQD_WRITE:
334 assert (0);
335 break;
336
337 case _MQD_UNLINK:
338 unlinkDone(M);
339 break;
340
341 default:
342 assert(0);
343 break;
344 }
345
346 cbdataReferenceDone (M->callback_data);
347 }
348
349 int
350 DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, RefCountable_ *requestor)
351 {
352 diomsg M;
353 M.callback_data = cbdataReference(theFile);
354 theFile->RefCountReference();
355 M.requestor = requestor;
356 M.newstyle = true;
357
358 if (requestor)
359 requestor->RefCountReference();
360
361 return SEND(&M, mtype, id, size, offset, shm_offset);
362 }
363
364 int
365 DiskdIOStrategy::send(int mtype, int id, RefCount<StoreIOState> sio, size_t size, off_t offset, ssize_t shm_offset)
366 {
367 diomsg M;
368 M.callback_data = cbdataReference(sio.getRaw());
369 M.newstyle = false;
370
371 return SEND(&M, mtype, id, size, offset, shm_offset);
372 }
373
374 int
375 DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
376 {
377 static int send_errors = 0;
378 static int last_seq_no = 0;
379 static int seq_no = 0;
380 int x;
381
382 M->mtype = mtype;
383 M->size = size;
384 M->offset = offset;
385 M->status = -1;
386 M->shm_offset = (int) shm_offset;
387 M->id = id;
388 M->seq_no = ++seq_no;
389
390 if (M->seq_no < last_seq_no)
391 debugs(79, DBG_IMPORTANT, "WARNING: sequencing out of order");
392
393 x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
394
395 last_seq_no = M->seq_no;
396
397 if (0 == x) {
398 ++diskd_stats.sent_count;
399 ++away;
400 } else {
401 debugs(79, DBG_IMPORTANT, "storeDiskdSend: msgsnd: " << xstrerror());
402 cbdataReferenceDone(M->callback_data);
403 assert(++send_errors < 100);
404 if (shm_offset > -1)
405 shm.put(shm_offset);
406 }
407
408 /*
409 * We have to drain the queue here if necessary. If we don't,
410 * then we can have a lot of messages in the queue (probably
411 * up to 2*magic1) and we can run out of shared memory buffers.
412 */
413 /*
414 * Note that we call Store::Root().callbackk (for all SDs), rather
415 * than callback for just this SD, so that while
416 * we're "blocking" on this SD we can also handle callbacks
417 * from other SDs that might be ready.
418 */
419
420 struct timeval delay = {0, 1};
421
422 while (away > magic2) {
423 select(0, NULL, NULL, NULL, &delay);
424 Store::Root().callback();
425
426 if (delay.tv_usec < 1000000)
427 delay.tv_usec <<= 1;
428 }
429
430 return x;
431 }
432
433 ConfigOption *
434 DiskdIOStrategy::getOptionTree() const
435 {
436 ConfigOptionVector *result = new ConfigOptionVector;
437 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ1Parse, &DiskdIOStrategy::optionQ1Dump));
438 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ2Parse, &DiskdIOStrategy::optionQ2Dump));
439 return result;
440 }
441
442 bool
443 DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int isaReconfig)
444 {
445 if (strcmp(name, "Q1") != 0)
446 return false;
447
448 int old_magic1 = magic1;
449
450 magic1 = atoi(value);
451
452 if (!isaReconfig)
453 return true;
454
455 if (old_magic1 < magic1) {
456 /*
457 * This is because shm.nbufs is computed at startup, when
458 * we call shmget(). We can't increase the Q1/Q2 parameters
459 * beyond their initial values because then we might have
460 * more "Q2 messages" than shared memory chunks, and this
461 * will cause an assertion in storeDiskdShmGet().
462 */
463 /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
464 debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
465 magic1 = old_magic1;
466 return true;
467 }
468
469 if (old_magic1 != magic1)
470 debugs(3, DBG_IMPORTANT, "cache_dir new Q1 value '" << magic1 << "'");
471
472 return true;
473 }
474
475 void
476 DiskdIOStrategy::optionQ1Dump(StoreEntry * e) const
477 {
478 storeAppendPrintf(e, " Q1=%d", magic1);
479 }
480
481 bool
482 DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int isaReconfig)
483 {
484 if (strcmp(name, "Q2") != 0)
485 return false;
486
487 int old_magic2 = magic2;
488
489 magic2 = atoi(value);
490
491 if (!isaReconfig)
492 return true;
493
494 if (old_magic2 < magic2) {
495 /* See comments in Q1 function above */
496 debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
497 magic2 = old_magic2;
498 return true;
499 }
500
501 if (old_magic2 != magic2)
502 debugs(3, DBG_IMPORTANT, "cache_dir new Q2 value '" << magic2 << "'");
503
504 return true;
505 }
506
507 void
508 DiskdIOStrategy::optionQ2Dump(StoreEntry * e) const
509 {
510 storeAppendPrintf(e, " Q2=%d", magic2);
511 }
512
513 /*
514 * Sync any pending data. We just sit around and read the queue
515 * until the data has finished writing.
516 */
517 void
518 DiskdIOStrategy::sync()
519 {
520 static time_t lastmsg = 0;
521
522 while (away > 0) {
523 if (squid_curtime > lastmsg) {
524 debugs(47, DBG_IMPORTANT, "storeDiskdDirSync: " << away << " messages away");
525 lastmsg = squid_curtime;
526 }
527
528 callback();
529 }
530 }
531
532 /*
533 * Handle callbacks. If we have more than magic2 requests away, we block
534 * until the queue is below magic2. Otherwise, we simply return when we
535 * don't get a message.
536 */
537
538 int
539 DiskdIOStrategy::callback()
540 {
541 diomsg M;
542 int x;
543 int retval = 0;
544
545 if (away >= magic2) {
546 ++diskd_stats.block_queue_len;
547 retval = 1;
548 /* We might not have anything to do, but our queue
549 * is full.. */
550 }
551
552 if (diskd_stats.sent_count - diskd_stats.recv_count >
553 diskd_stats.max_away) {
554 diskd_stats.max_away = diskd_stats.sent_count - diskd_stats.recv_count;
555 }
556
557 while (1) {
558 #ifdef ALWAYS_ZERO_BUFFERS
559 memset(&M, '\0', sizeof(M));
560 #endif
561
562 x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
563
564 if (x < 0)
565 break;
566 else if (x != diomsg::msg_snd_rcv_sz) {
567 debugs(47, DBG_IMPORTANT, "storeDiskdDirCallback: msgget returns " << x);
568 break;
569 }
570
571 ++diskd_stats.recv_count;
572 --away;
573 handle(&M);
574 retval = 1; /* Return that we've actually done some work */
575
576 if (M.shm_offset > -1)
577 shm.put ((off_t) M.shm_offset);
578 }
579
580 return retval;
581 }
582
583 void
584 DiskdIOStrategy::statfs(StoreEntry & sentry)const
585 {
586 storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
587 }