]> git.ipfire.org Git - thirdparty/squid.git/blob - src/DiskIO/DiskDaemon/DiskdIOStrategy.cc
Author: Leonid Evdokimov <leon@darkk.net.ru>
[thirdparty/squid.git] / src / DiskIO / DiskDaemon / DiskdIOStrategy.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 79 Squid-side DISKD I/O functions.
5 * AUTHOR: Duane Wessels
6 *
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
9 *
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
18 *
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
23 *
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
32 *
33 * Copyright (c) 2003, Robert Collins <robertc@squid-cache.org>
34 */
35
36 #include "squid.h"
37 #include "comm/Loops.h"
38
39 #include <sys/ipc.h>
40 #include <sys/msg.h>
41 #include <sys/shm.h>
42 #include "DiskdIOStrategy.h"
43 #include "ConfigOption.h"
44 #include "DiskIO/DiskFile.h"
45 #include "DiskdFile.h"
46 #include "diomsg.h"
47 /* for statfs */
48 #include "Store.h"
49 #include "SquidTime.h"
50
51 diskd_stats_t diskd_stats;
52
53 size_t DiskdIOStrategy::nextInstanceID (0);
54 const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
55
56 size_t
57 DiskdIOStrategy::newInstance()
58 {
59 return ++nextInstanceID;
60 }
61
62 bool
63 DiskdIOStrategy::shedLoad()
64 {
65 /*
66 * Fail on open() if there are too many requests queued.
67 */
68
69 if (away > magic1) {
70 debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
71
72 return true;
73 }
74
75 return false;
76 }
77
78 int
79 DiskdIOStrategy::load()
80 {
81 /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
82 /* the parse function guarantees magic2 is positivie */
83 return away * 1000 / magic2;
84 }
85
86 void
87 DiskdIOStrategy::openFailed()
88 {
89 diskd_stats.open_fail_queue_len++;
90 }
91
92 DiskFile::Pointer
93 DiskdIOStrategy::newFile(char const *path)
94 {
95 if (shedLoad()) {
96 openFailed();
97 return NULL;
98 }
99
100 return new DiskdFile (path, this);
101 }
102
103 DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0) , smsgid(-1), rmsgid(-1), wfd(-1) , instanceID(newInstance())
104 {}
105
106 void
107 DiskdIOStrategy::unlinkFile(char const *path)
108 {
109 if (shedLoad()) {
110 /* Damn, we need to issue a sync unlink here :( */
111 debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
112 #if USE_UNLINKD
113
114 unlinkdUnlink(path);
115 #else
116
117 unlink(path);
118 #endif
119
120 return;
121 }
122
123 /* We can attempt a diskd unlink */
124 int x;
125
126 ssize_t shm_offset;
127
128 char *buf;
129
130 buf = (char *)shm.get(&shm_offset);
131
132 xstrncpy(buf, path, SHMBUF_BLKSZ);
133
134 x = send(_MQD_UNLINK,
135 0,
136 (StoreIOState::Pointer )NULL,
137 0,
138 0,
139 shm_offset);
140
141 if (x < 0) {
142 debugs(79, 1, "storeDiskdSend UNLINK: " << xstrerror());
143 ::unlink(buf); /* XXX EWW! */
144 // shm.put (shm_offset);
145 }
146
147 diskd_stats.unlink.ops++;
148 }
149
150 void
151 DiskdIOStrategy::init()
152 {
153 int pid;
154 void * hIpc;
155 int rfd;
156 int ikey;
157 const char *args[5];
158 char skey1[32];
159 char skey2[32];
160 char skey3[32];
161 Ip::Address localhost;
162
163 ikey = (getpid() << 10) + (instanceID << 2);
164 ikey &= 0x7fffffff;
165 smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
166
167 if (smsgid < 0) {
168 debugs(50, 0, "storeDiskdInit: msgget: " << xstrerror());
169 fatal("msgget failed");
170 }
171
172 rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
173
174 if (rmsgid < 0) {
175 debugs(50, 0, "storeDiskdInit: msgget: " << xstrerror());
176 fatal("msgget failed");
177 }
178
179 shm.init(ikey, magic2);
180 snprintf(skey1, 32, "%d", ikey);
181 snprintf(skey2, 32, "%d", ikey + 1);
182 snprintf(skey3, 32, "%d", ikey + 2);
183 args[0] = "diskd";
184 args[1] = skey1;
185 args[2] = skey2;
186 args[3] = skey3;
187 args[4] = NULL;
188 localhost.SetLocalhost();
189 pid = ipcCreate(IPC_STREAM,
190 Config.Program.diskd,
191 args,
192 "diskd",
193 localhost,
194 &rfd,
195 &wfd,
196 &hIpc);
197
198 if (pid < 0)
199 fatalf("execl: %s", Config.Program.diskd);
200
201 if (rfd != wfd)
202 comm_close(rfd);
203
204 fd_note(wfd, "squid -> diskd");
205
206 commSetTimeout(wfd, -1, NULL, NULL);
207 commSetNonBlocking(wfd);
208 Comm::QuickPollRequired();
209 }
210
211 /*
212 * SHM manipulation routines
213 */
214 void
215 SharedMemory::put(ssize_t offset)
216 {
217 int i;
218 assert(offset >= 0);
219 assert(offset < nbufs * SHMBUF_BLKSZ);
220 i = offset / SHMBUF_BLKSZ;
221 assert(i < nbufs);
222 assert(CBIT_TEST(inuse_map, i));
223 CBIT_CLR(inuse_map, i);
224 --diskd_stats.shmbuf_count;
225 }
226
227 void *
228
229 SharedMemory::get(ssize_t * shm_offset)
230 {
231 char *aBuf = NULL;
232 int i;
233
234 for (i = 0; i < nbufs; i++) {
235 if (CBIT_TEST(inuse_map, i))
236 continue;
237
238 CBIT_SET(inuse_map, i);
239
240 *shm_offset = i * SHMBUF_BLKSZ;
241
242 aBuf = buf + (*shm_offset);
243
244 break;
245 }
246
247 assert(aBuf);
248 assert(aBuf >= buf);
249 assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
250 diskd_stats.shmbuf_count++;
251
252 if (diskd_stats.max_shmuse < diskd_stats.shmbuf_count)
253 diskd_stats.max_shmuse = diskd_stats.shmbuf_count;
254
255 return aBuf;
256 }
257
258 void
259 SharedMemory::init(int ikey, int magic2)
260 {
261 nbufs = (int)(magic2 * 1.3);
262 id = shmget((key_t) (ikey + 2),
263 nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
264
265 if (id < 0) {
266 debugs(50, 0, "storeDiskdInit: shmget: " << xstrerror());
267 fatal("shmget failed");
268 }
269
270 buf = (char *)shmat(id, NULL, 0);
271
272 if (buf == (void *) -1) {
273 debugs(50, 0, "storeDiskdInit: shmat: " << xstrerror());
274 fatal("shmat failed");
275 }
276
277 inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
278 diskd_stats.shmbuf_count += nbufs;
279
280 for (int i = 0; i < nbufs; i++) {
281 CBIT_SET(inuse_map, i);
282 put (i * SHMBUF_BLKSZ);
283 }
284 }
285
286 void
287 DiskdIOStrategy::unlinkDone(diomsg * M)
288 {
289 debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
290 statCounter.syscalls.disk.unlinks++;
291
292 if (M->status < 0)
293 diskd_stats.unlink.fail++;
294 else
295 diskd_stats.unlink.success++;
296 }
297
298 void
299 DiskdIOStrategy::handle(diomsg * M)
300 {
301 if (!cbdataReferenceValid (M->callback_data)) {
302 /* I.e. already closed file
303 * - say when we have a error opening after
304 * a read was already queued
305 */
306 debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
307 cbdataReferenceDone (M->callback_data);
308 return;
309 }
310
311
312 /* set errno passed from diskd. makes debugging more meaningful */
313 if (M->status < 0)
314 errno = -M->status;
315
316 if (M->newstyle) {
317 DiskdFile *theFile = (DiskdFile *)M->callback_data;
318 theFile->RefCountDereference();
319 theFile->completed (M);
320 } else
321 switch (M->mtype) {
322
323 case _MQD_OPEN:
324
325 case _MQD_CREATE:
326
327 case _MQD_CLOSE:
328
329 case _MQD_READ:
330
331 case _MQD_WRITE:
332 assert (0);
333 break;
334
335 case _MQD_UNLINK:
336 unlinkDone(M);
337 break;
338
339 default:
340 assert(0);
341 break;
342 }
343
344 cbdataReferenceDone (M->callback_data);
345 }
346
347 int
348 DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, RefCountable_ *requestor)
349 {
350 diomsg M;
351 M.callback_data = cbdataReference(theFile);
352 theFile->RefCountReference();
353 M.requestor = requestor;
354 M.newstyle = true;
355
356 if (requestor)
357 requestor->RefCountReference();
358
359 return SEND(&M, mtype, id, size, offset, shm_offset);
360 }
361
362 int
363 DiskdIOStrategy::send(int mtype, int id, RefCount<StoreIOState> sio, size_t size, off_t offset, ssize_t shm_offset)
364 {
365 diomsg M;
366 M.callback_data = cbdataReference(sio.getRaw());
367 M.newstyle = false;
368
369 return SEND(&M, mtype, id, size, offset, shm_offset);
370 }
371
372 int
373 DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
374 {
375 static int send_errors = 0;
376 static int last_seq_no = 0;
377 static int seq_no = 0;
378 int x;
379
380 M->mtype = mtype;
381 M->size = size;
382 M->offset = offset;
383 M->status = -1;
384 M->shm_offset = (int) shm_offset;
385 M->id = id;
386 M->seq_no = ++seq_no;
387
388 if (M->seq_no < last_seq_no)
389 debugs(79, 1, "WARNING: sequencing out of order");
390
391 x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
392
393 last_seq_no = M->seq_no;
394
395 if (0 == x) {
396 diskd_stats.sent_count++;
397 away++;
398 } else {
399 debugs(79, 1, "storeDiskdSend: msgsnd: " << xstrerror());
400 cbdataReferenceDone(M->callback_data);
401 assert(++send_errors < 100);
402 if (shm_offset > -1)
403 shm.put(shm_offset);
404 }
405
406 /*
407 * We have to drain the queue here if necessary. If we don't,
408 * then we can have a lot of messages in the queue (probably
409 * up to 2*magic1) and we can run out of shared memory buffers.
410 */
411 /*
412 * Note that we call Store::Root().callbackk (for all SDs), rather
413 * than callback for just this SD, so that while
414 * we're "blocking" on this SD we can also handle callbacks
415 * from other SDs that might be ready.
416 */
417
418 struct timeval delay = {0, 1};
419
420 while (away > magic2) {
421 select(0, NULL, NULL, NULL, &delay);
422 Store::Root().callback();
423
424 if (delay.tv_usec < 1000000)
425 delay.tv_usec <<= 1;
426 }
427
428 return x;
429 }
430
431 ConfigOption *
432 DiskdIOStrategy::getOptionTree() const
433 {
434 ConfigOptionVector *result = new ConfigOptionVector;
435 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ1Parse, &DiskdIOStrategy::optionQ1Dump));
436 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ2Parse, &DiskdIOStrategy::optionQ2Dump));
437 return result;
438 }
439
440 bool
441 DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int isaReconfig)
442 {
443 if (strcmp(name, "Q1") != 0)
444 return false;
445
446 int old_magic1 = magic1;
447
448 magic1 = atoi(value);
449
450 if (!isaReconfig)
451 return true;
452
453 if (old_magic1 < magic1) {
454 /*
455 * This is because shm.nbufs is computed at startup, when
456 * we call shmget(). We can't increase the Q1/Q2 parameters
457 * beyond their initial values because then we might have
458 * more "Q2 messages" than shared memory chunks, and this
459 * will cause an assertion in storeDiskdShmGet().
460 */
461 /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
462 debugs(3, 1, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
463 magic1 = old_magic1;
464 return true;
465 }
466
467 if (old_magic1 != magic1)
468 debugs(3, 1, "cache_dir new Q1 value '" << magic1 << "'");
469
470 return true;
471 }
472
473 void
474 DiskdIOStrategy::optionQ1Dump(StoreEntry * e) const
475 {
476 storeAppendPrintf(e, " Q1=%d", magic1);
477 }
478
479 bool
480 DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int isaReconfig)
481 {
482 if (strcmp(name, "Q2") != 0)
483 return false;
484
485 int old_magic2 = magic2;
486
487 magic2 = atoi(value);
488
489 if (!isaReconfig)
490 return true;
491
492 if (old_magic2 < magic2) {
493 /* See comments in Q1 function above */
494 debugs(3, 1, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
495 magic2 = old_magic2;
496 return true;
497 }
498
499 if (old_magic2 != magic2)
500 debugs(3, 1, "cache_dir new Q2 value '" << magic2 << "'");
501
502 return true;
503 }
504
505 void
506 DiskdIOStrategy::optionQ2Dump(StoreEntry * e) const
507 {
508 storeAppendPrintf(e, " Q2=%d", magic2);
509 }
510
511 /*
512 * Sync any pending data. We just sit around and read the queue
513 * until the data has finished writing.
514 */
515 void
516 DiskdIOStrategy::sync()
517 {
518 static time_t lastmsg = 0;
519
520 while (away > 0) {
521 if (squid_curtime > lastmsg) {
522 debugs(47, 1, "storeDiskdDirSync: " << away << " messages away");
523 lastmsg = squid_curtime;
524 }
525
526 callback();
527 }
528 }
529
530
531 /*
532 * Handle callbacks. If we have more than magic2 requests away, we block
533 * until the queue is below magic2. Otherwise, we simply return when we
534 * don't get a message.
535 */
536
537 int
538 DiskdIOStrategy::callback()
539 {
540 diomsg M;
541 int x;
542 int retval = 0;
543
544 if (away >= magic2) {
545 diskd_stats.block_queue_len++;
546 retval = 1;
547 /* We might not have anything to do, but our queue
548 * is full.. */
549 }
550
551 if (diskd_stats.sent_count - diskd_stats.recv_count >
552 diskd_stats.max_away) {
553 diskd_stats.max_away = diskd_stats.sent_count - diskd_stats.recv_count;
554 }
555
556 while (1) {
557 #ifdef ALWAYS_ZERO_BUFFERS
558 memset(&M, '\0', sizeof(M));
559 #endif
560
561 x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
562
563 if (x < 0)
564 break;
565 else if (x != diomsg::msg_snd_rcv_sz) {
566 debugs(47, 1, "storeDiskdDirCallback: msgget returns " << x);
567 break;
568 }
569
570 diskd_stats.recv_count++;
571 --away;
572 handle(&M);
573 retval = 1; /* Return that we've actually done some work */
574
575 if (M.shm_offset > -1)
576 shm.put ((off_t) M.shm_offset);
577 }
578
579 return retval;
580 }
581
582 void
583 DiskdIOStrategy::statfs(StoreEntry & sentry)const
584 {
585 storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
586 }