]>
Commit | Line | Data |
---|---|---|
b9ae18aa | 1 | |
2 | /* | |
c8f4eac4 | 3 | * $Id: DiskdIOStrategy.cc,v 1.2 2005/01/03 16:08:26 robertc Exp $ |
b9ae18aa | 4 | * |
5 | * DEBUG: section 79 Squid-side DISKD I/O functions. | |
6 | * AUTHOR: Duane Wessels | |
7 | * | |
8 | * SQUID Web Proxy Cache http://www.squid-cache.org/ | |
9 | * ---------------------------------------------------------- | |
10 | * | |
11 | * Squid is the result of efforts by numerous individuals from | |
12 | * the Internet community; see the CONTRIBUTORS file for full | |
13 | * details. Many organizations have provided support for Squid's | |
14 | * development; see the SPONSORS file for full details. Squid is | |
15 | * Copyrighted (C) 2001 by the Regents of the University of | |
16 | * California; see the COPYRIGHT file for full details. Squid | |
17 | * incorporates software developed and/or copyrighted by other | |
18 | * sources; see the CREDITS file for full details. | |
19 | * | |
20 | * This program is free software; you can redistribute it and/or modify | |
21 | * it under the terms of the GNU General Public License as published by | |
22 | * the Free Software Foundation; either version 2 of the License, or | |
23 | * (at your option) any later version. | |
24 | * | |
25 | * This program is distributed in the hope that it will be useful, | |
26 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
27 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
28 | * GNU General Public License for more details. | |
29 | * | |
30 | * You should have received a copy of the GNU General Public License | |
31 | * along with this program; if not, write to the Free Software | |
32 | * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. | |
33 | * | |
34 | * Copyright (c) 2003, Robert Collins <robertc@squid-cache.org> | |
35 | */ | |
36 | ||
37 | #include "squid.h" | |
38 | ||
39 | #include <sys/ipc.h> | |
40 | #include <sys/msg.h> | |
41 | #include <sys/shm.h> | |
42 | #include "DiskdIOStrategy.h" | |
43 | #include "ConfigOption.h" | |
44 | #include "DiskIO/DiskFile.h" | |
45 | #include "DiskdFile.h" | |
46 | #include "diomsg.h" | |
47 | /* for statfs */ | |
48 | #include "Store.h" | |
49 | ||
50 | diskd_stats_t diskd_stats; | |
51 | ||
52 | size_t DiskdIOStrategy::nextInstanceID (0); | |
53 | const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t); | |
54 | ||
55 | size_t | |
56 | DiskdIOStrategy::newInstance() | |
57 | { | |
58 | return ++nextInstanceID; | |
59 | } | |
60 | ||
61 | bool | |
62 | DiskdIOStrategy::shedLoad() | |
63 | { | |
64 | /* | |
65 | * Fail on open() if there are too many requests queued. | |
66 | */ | |
67 | ||
68 | if (away > magic1) { | |
69 | debug(79, 3) ("storeDiskdIO::shedLoad: Shedding, too many requests away\n"); | |
70 | ||
71 | return true; | |
72 | } | |
73 | ||
74 | return false; | |
75 | } | |
76 | ||
77 | int | |
78 | DiskdIOStrategy::load() | |
79 | { | |
80 | /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */ | |
81 | /* the parse function guarantees magic2 is positivie */ | |
82 | return away * 1000 / magic2; | |
83 | } | |
84 | ||
85 | void | |
86 | DiskdIOStrategy::openFailed() | |
87 | { | |
88 | diskd_stats.open_fail_queue_len++; | |
89 | } | |
90 | ||
91 | DiskFile::Pointer | |
92 | DiskdIOStrategy::newFile(char const *path) | |
93 | { | |
94 | if (shedLoad()) { | |
95 | openFailed(); | |
96 | return NULL; | |
97 | } | |
98 | ||
99 | return new DiskdFile (path, this); | |
100 | } | |
101 | ||
102 | DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0) , smsgid(-1), rmsgid(-1), wfd(-1) , instanceID(newInstance()) | |
103 | {} | |
104 | ||
105 | void | |
106 | DiskdIOStrategy::unlinkFile(char const *path) | |
107 | { | |
108 | if (shedLoad()) { | |
109 | /* Damn, we need to issue a sync unlink here :( */ | |
110 | debug(79, 2) ("storeDiskUnlink: Out of queue space, sync unlink\n"); | |
111 | #if USE_UNLINKD | |
112 | ||
113 | unlinkdUnlink(path); | |
114 | #elif USE_TRUNCATE | |
115 | ||
116 | truncate(path, 0); | |
117 | #else | |
118 | ||
119 | unlink(path); | |
120 | #endif | |
121 | ||
122 | return; | |
123 | } | |
124 | ||
125 | /* We can attempt a diskd unlink */ | |
126 | int x; | |
127 | ||
128 | off_t shm_offset; | |
129 | ||
130 | char *buf; | |
131 | ||
132 | buf = (char *)shm.get(&shm_offset); | |
133 | ||
134 | xstrncpy(buf, path, SHMBUF_BLKSZ); | |
135 | ||
136 | x = send(_MQD_UNLINK, | |
137 | 0, | |
138 | (StoreIOState::Pointer )NULL, | |
139 | 0, | |
140 | 0, | |
141 | shm_offset); | |
142 | ||
143 | if (x < 0) { | |
144 | debug(79, 1) ("storeDiskdSend UNLINK: %s\n", xstrerror()); | |
145 | ::unlink(buf); /* XXX EWW! */ | |
146 | // shm.put (shm_offset); | |
147 | } | |
148 | ||
149 | diskd_stats.unlink.ops++; | |
150 | } | |
151 | ||
152 | void | |
153 | DiskdIOStrategy::init() | |
154 | { | |
155 | int x; | |
156 | int rfd; | |
157 | int ikey; | |
158 | const char *args[5]; | |
159 | char skey1[32]; | |
160 | char skey2[32]; | |
161 | char skey3[32]; | |
162 | ||
163 | ikey = (getpid() << 10) + (instanceID << 2); | |
164 | ikey &= 0x7fffffff; | |
165 | smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT); | |
166 | ||
167 | if (smsgid < 0) { | |
168 | debug(50, 0) ("storeDiskdInit: msgget: %s\n", xstrerror()); | |
169 | fatal("msgget failed"); | |
170 | } | |
171 | ||
172 | rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT); | |
173 | ||
174 | if (rmsgid < 0) { | |
175 | debug(50, 0) ("storeDiskdInit: msgget: %s\n", xstrerror()); | |
176 | fatal("msgget failed"); | |
177 | } | |
178 | ||
179 | shm.init(ikey, magic2); | |
180 | snprintf(skey1, 32, "%d", ikey); | |
181 | snprintf(skey2, 32, "%d", ikey + 1); | |
182 | snprintf(skey3, 32, "%d", ikey + 2); | |
183 | args[0] = "diskd"; | |
184 | args[1] = skey1; | |
185 | args[2] = skey2; | |
186 | args[3] = skey3; | |
187 | args[4] = NULL; | |
188 | x = ipcCreate(IPC_STREAM, | |
189 | Config.Program.diskd, | |
190 | args, | |
191 | "diskd", | |
192 | &rfd, | |
193 | &wfd); | |
194 | ||
195 | if (x < 0) | |
196 | fatalf("execl: %s", Config.Program.diskd); | |
197 | ||
198 | if (rfd != wfd) | |
199 | comm_close(rfd); | |
200 | ||
201 | fd_note(wfd, "squid -> diskd"); | |
202 | ||
203 | commSetTimeout(wfd, -1, NULL, NULL); | |
204 | ||
205 | commSetNonBlocking(wfd); | |
206 | ||
207 | comm_quick_poll_required(); | |
208 | } | |
209 | ||
210 | /* | |
211 | * SHM manipulation routines | |
212 | */ | |
213 | void | |
214 | SharedMemory::put (off_t offset) | |
215 | { | |
216 | int i; | |
217 | assert(offset >= 0); | |
218 | assert(offset < nbufs * SHMBUF_BLKSZ); | |
219 | i = offset / SHMBUF_BLKSZ; | |
220 | assert(i < nbufs); | |
221 | assert(CBIT_TEST(inuse_map, i)); | |
222 | CBIT_CLR(inuse_map, i); | |
223 | --diskd_stats.shmbuf_count; | |
224 | } | |
225 | ||
226 | void * | |
227 | ||
228 | SharedMemory::get | |
229 | (off_t * shm_offset) | |
230 | { | |
231 | char *aBuf = NULL; | |
232 | int i; | |
233 | ||
234 | for (i = 0; i < nbufs; i++) { | |
235 | if (CBIT_TEST(inuse_map, i)) | |
236 | continue; | |
237 | ||
238 | CBIT_SET(inuse_map, i); | |
239 | ||
240 | *shm_offset = i * SHMBUF_BLKSZ; | |
241 | ||
242 | aBuf = buf + (*shm_offset); | |
243 | ||
244 | break; | |
245 | } | |
246 | ||
247 | assert(aBuf); | |
248 | assert(aBuf >= buf); | |
249 | assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ)); | |
250 | diskd_stats.shmbuf_count++; | |
251 | ||
252 | if (diskd_stats.max_shmuse < diskd_stats.shmbuf_count) | |
253 | diskd_stats.max_shmuse = diskd_stats.shmbuf_count; | |
254 | ||
255 | return aBuf; | |
256 | } | |
257 | ||
258 | void | |
259 | SharedMemory::init(int ikey, int magic2) | |
260 | { | |
261 | nbufs = (int)(magic2 * 1.3); | |
262 | id = shmget((key_t) (ikey + 2), | |
263 | nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT); | |
264 | ||
265 | if (id < 0) { | |
266 | debug(50, 0) ("storeDiskdInit: shmget: %s\n", xstrerror()); | |
267 | fatal("shmget failed"); | |
268 | } | |
269 | ||
270 | buf = (char *)shmat(id, NULL, 0); | |
271 | ||
272 | if (buf == (void *) -1) { | |
273 | debug(50, 0) ("storeDiskdInit: shmat: %s\n", xstrerror()); | |
274 | fatal("shmat failed"); | |
275 | } | |
276 | ||
277 | inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1); | |
278 | diskd_stats.shmbuf_count += nbufs; | |
279 | ||
280 | for (int i = 0; i < nbufs; i++) { | |
281 | CBIT_SET(inuse_map, i); | |
282 | put (i * SHMBUF_BLKSZ); | |
283 | } | |
284 | } | |
285 | ||
286 | void | |
287 | DiskdIOStrategy::unlinkDone(diomsg * M) | |
288 | { | |
289 | debug(79, 3) ("storeDiskdUnlinkDone: file %s status %d\n",shm.buf + M->shm_offset, | |
290 | M->status); | |
291 | statCounter.syscalls.disk.unlinks++; | |
292 | ||
293 | if (M->status < 0) | |
294 | diskd_stats.unlink.fail++; | |
295 | else | |
296 | diskd_stats.unlink.success++; | |
297 | } | |
298 | ||
299 | void | |
300 | DiskdIOStrategy::handle(diomsg * M) | |
301 | { | |
302 | if (!cbdataReferenceValid (M->callback_data)) { | |
303 | /* I.e. already closed file | |
304 | * - say when we have a error opening after | |
305 | * a read was already queued | |
306 | */ | |
307 | debug(79, 3) ("storeDiskdHandle: Invalid callback_data %p\n", | |
308 | M->callback_data); | |
309 | cbdataReferenceDone (M->callback_data); | |
310 | return; | |
311 | } | |
312 | ||
313 | ||
314 | if (M->newstyle) { | |
315 | DiskdFile *theFile = (DiskdFile *)M->callback_data; | |
316 | theFile->RefCountDereference(); | |
317 | theFile->completed (M); | |
318 | } else | |
319 | switch (M->mtype) { | |
320 | ||
321 | case _MQD_OPEN: | |
322 | ||
323 | case _MQD_CREATE: | |
324 | ||
325 | case _MQD_CLOSE: | |
326 | ||
327 | case _MQD_READ: | |
328 | ||
329 | case _MQD_WRITE: | |
330 | assert (0); | |
331 | break; | |
332 | ||
333 | case _MQD_UNLINK: | |
334 | unlinkDone(M); | |
335 | break; | |
336 | ||
337 | default: | |
338 | assert(0); | |
339 | break; | |
340 | } | |
341 | ||
342 | cbdataReferenceDone (M->callback_data); | |
343 | } | |
344 | ||
345 | int | |
346 | DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, int size, int offset, off_t shm_offset, RefCountable_ *requestor) | |
347 | { | |
348 | int x; | |
349 | diomsg M; | |
350 | static int send_errors = 0; | |
351 | static int last_seq_no = 0; | |
352 | static int seq_no = 0; | |
353 | M.mtype = mtype; | |
354 | M.callback_data = cbdataReference(theFile); | |
355 | theFile->RefCountReference(); | |
356 | M.requestor = requestor; | |
357 | ||
358 | if (requestor) | |
359 | requestor->RefCountReference(); | |
360 | ||
361 | M.size = size; | |
362 | ||
363 | M.offset = offset; | |
364 | ||
365 | M.status = -1; | |
366 | ||
367 | M.shm_offset = (int) shm_offset; | |
368 | ||
369 | M.id = id; | |
370 | ||
371 | M.seq_no = ++seq_no; | |
372 | ||
373 | M.newstyle = true; | |
374 | ||
375 | if (M.seq_no < last_seq_no) | |
376 | debug(79, 1) ("WARNING: sequencing out of order\n"); | |
377 | ||
378 | debugs (79,9, "sending with" << smsgid <<" " << &M << " " <<diomsg::msg_snd_rcv_sz << " " << IPC_NOWAIT); | |
379 | ||
380 | x = msgsnd(smsgid, &M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT); | |
381 | ||
382 | last_seq_no = M.seq_no; | |
383 | ||
384 | if (0 == x) { | |
385 | diskd_stats.sent_count++; | |
386 | away++; | |
387 | } else { | |
388 | debug(79, 1) ("storeDiskdSend: msgsnd: %s\n", xstrerror()); | |
389 | cbdataReferenceDone(M.callback_data); | |
390 | assert(++send_errors < 100); | |
391 | shm.put (shm_offset); | |
392 | } | |
393 | ||
394 | /* | |
395 | * We have to drain the queue here if necessary. If we don't, | |
396 | * then we can have a lot of messages in the queue (probably | |
397 | * up to 2*magic1) and we can run out of shared memory buffers. | |
398 | */ | |
399 | /* | |
c8f4eac4 | 400 | * Note that we call Store::Root().callbackk (for all SDs), rather |
401 | * than callback for just this SD, so that while | |
b9ae18aa | 402 | * we're "blocking" on this SD we can also handle callbacks |
403 | * from other SDs that might be ready. | |
404 | */ | |
405 | while (away > magic2) { | |
406 | ||
407 | struct timeval delay = {0, 1}; | |
408 | ||
409 | select(0, NULL, NULL, NULL, &delay); | |
c8f4eac4 | 410 | Store::Root().callback(); |
b9ae18aa | 411 | |
412 | if (delay.tv_usec < 1000000) | |
413 | delay.tv_usec <<= 1; | |
414 | } | |
415 | ||
416 | return x; | |
417 | } | |
418 | ||
419 | int | |
420 | DiskdIOStrategy::send(int mtype, int id, StoreIOState::Pointer sio, int size, int offset, off_t shm_offset) | |
421 | { | |
422 | int x; | |
423 | diomsg M; | |
424 | static int send_errors = 0; | |
425 | static int last_seq_no = 0; | |
426 | static int seq_no = 0; | |
427 | M.mtype = mtype; | |
428 | M.callback_data = cbdataReference(sio.getRaw()); | |
429 | M.size = size; | |
430 | M.offset = offset; | |
431 | M.status = -1; | |
432 | M.shm_offset = (int) shm_offset; | |
433 | M.id = id; | |
434 | M.seq_no = ++seq_no; | |
435 | M.newstyle = false; | |
436 | ||
437 | if (M.seq_no < last_seq_no) | |
438 | debug(79, 1) ("WARNING: sequencing out of order\n"); | |
439 | ||
440 | x = msgsnd(smsgid, &M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT); | |
441 | ||
442 | last_seq_no = M.seq_no; | |
443 | ||
444 | if (0 == x) { | |
445 | diskd_stats.sent_count++; | |
446 | away++; | |
447 | } else { | |
448 | debug(79, 1) ("storeDiskdSend: msgsnd: %s\n", xstrerror()); | |
449 | cbdataReferenceDone(M.callback_data); | |
450 | assert(++send_errors < 100); | |
451 | } | |
452 | ||
453 | /* | |
454 | * We have to drain the queue here if necessary. If we don't, | |
455 | * then we can have a lot of messages in the queue (probably | |
456 | * up to 2*magic1) and we can run out of shared memory buffers. | |
457 | */ | |
458 | /* | |
c8f4eac4 | 459 | * Note that we call Store::Root().callbackk (for all SDs), rather |
460 | * than callback for just this SD, so that while | |
b9ae18aa | 461 | * we're "blocking" on this SD we can also handle callbacks |
462 | * from other SDs that might be ready. | |
463 | */ | |
464 | while (away > magic2) { | |
465 | ||
466 | struct timeval delay = {0, 1}; | |
467 | ||
468 | select(0, NULL, NULL, NULL, &delay); | |
c8f4eac4 | 469 | Store::Root().callback(); |
b9ae18aa | 470 | |
471 | if (delay.tv_usec < 1000000) | |
472 | delay.tv_usec <<= 1; | |
473 | } | |
474 | ||
475 | return x; | |
476 | } | |
477 | ||
478 | ConfigOption * | |
479 | DiskdIOStrategy::getOptionTree() const | |
480 | { | |
481 | ConfigOptionVector *result = new ConfigOptionVector; | |
482 | result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ1Parse, &DiskdIOStrategy::optionQ1Dump)); | |
483 | result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ2Parse, &DiskdIOStrategy::optionQ2Dump)); | |
484 | return result; | |
485 | } | |
486 | ||
487 | bool | |
488 | DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int reconfiguring) | |
489 | { | |
490 | if (strcmp(name, "Q1") != 0) | |
491 | return false; | |
492 | ||
493 | int old_magic1 = magic1; | |
494 | ||
495 | magic1 = atoi(value); | |
496 | ||
497 | if (!reconfiguring) | |
498 | return true; | |
499 | ||
500 | if (old_magic1 < magic1) { | |
501 | /* | |
502 | * This is because shm.nbufs is computed at startup, when | |
503 | * we call shmget(). We can't increase the Q1/Q2 parameters | |
504 | * beyond their initial values because then we might have | |
505 | * more "Q2 messages" than shared memory chunks, and this | |
506 | * will cause an assertion in storeDiskdShmGet(). | |
507 | */ | |
508 | /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */ | |
509 | debug(3, 1) ("WARNING: cannot increase cache_dir Q1 value while Squid is running.\n"); | |
510 | magic1 = old_magic1; | |
511 | return true; | |
512 | } | |
513 | ||
514 | if (old_magic1 != magic1) | |
515 | debug(3, 1) ("cache_dir new Q1 value '%d'\n", | |
516 | magic1); | |
517 | ||
518 | return true; | |
519 | } | |
520 | ||
521 | void | |
522 | DiskdIOStrategy::optionQ1Dump(StoreEntry * e) const | |
523 | { | |
524 | storeAppendPrintf(e, " Q1=%d", magic1); | |
525 | } | |
526 | ||
527 | bool | |
528 | DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int reconfiguring) | |
529 | { | |
530 | if (strcmp(name, "Q2") != 0) | |
531 | return false; | |
532 | ||
533 | int old_magic2 = magic2; | |
534 | ||
535 | magic2 = atoi(value); | |
536 | ||
537 | if (!reconfiguring) | |
538 | return true; | |
539 | ||
540 | if (old_magic2 < magic2) { | |
541 | /* See comments in Q1 function above */ | |
542 | debug(3, 1) ("WARNING: cannot increase cache_dir Q2 value while Squid is running.\n"); | |
543 | magic2 = old_magic2; | |
544 | return true; | |
545 | } | |
546 | ||
547 | if (old_magic2 != magic2) | |
548 | debug(3, 1) ("cache_dir new Q2 value '%d'\n", | |
549 | magic2); | |
550 | ||
551 | return true; | |
552 | } | |
553 | ||
554 | void | |
555 | DiskdIOStrategy::optionQ2Dump(StoreEntry * e) const | |
556 | { | |
557 | storeAppendPrintf(e, " Q2=%d", magic2); | |
558 | } | |
559 | ||
560 | /* | |
561 | * Sync any pending data. We just sit around and read the queue | |
562 | * until the data has finished writing. | |
563 | */ | |
564 | void | |
565 | DiskdIOStrategy::sync() | |
566 | { | |
567 | static time_t lastmsg = 0; | |
568 | ||
569 | while (away > 0) { | |
570 | if (squid_curtime > lastmsg) { | |
571 | debug(47, 1) ("storeDiskdDirSync: %d messages away\n", | |
572 | away); | |
573 | lastmsg = squid_curtime; | |
574 | } | |
575 | ||
576 | callback(); | |
577 | } | |
578 | } | |
579 | ||
580 | ||
581 | /* | |
582 | * Handle callbacks. If we have more than magic2 requests away, we block | |
583 | * until the queue is below magic2. Otherwise, we simply return when we | |
584 | * don't get a message. | |
585 | */ | |
586 | ||
587 | int | |
588 | DiskdIOStrategy::callback() | |
589 | { | |
590 | diomsg M; | |
591 | int x; | |
592 | int retval = 0; | |
593 | ||
594 | if (away >= magic2) { | |
595 | diskd_stats.block_queue_len++; | |
596 | retval = 1; | |
597 | /* We might not have anything to do, but our queue | |
598 | * is full.. */ | |
599 | } | |
600 | ||
601 | if (diskd_stats.sent_count - diskd_stats.recv_count > | |
602 | diskd_stats.max_away) { | |
603 | diskd_stats.max_away = diskd_stats.sent_count - diskd_stats.recv_count; | |
604 | } | |
605 | ||
606 | while (1) { | |
607 | #ifdef ALWAYS_ZERO_BUFFERS | |
608 | memset(&M, '\0', sizeof(M)); | |
609 | #endif | |
610 | ||
611 | x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT); | |
612 | ||
613 | if (x < 0) | |
614 | break; | |
615 | else if (x != diomsg::msg_snd_rcv_sz) { | |
616 | debug(47, 1) ("storeDiskdDirCallback: msgget returns %d\n", | |
617 | x); | |
618 | break; | |
619 | } | |
620 | ||
621 | diskd_stats.recv_count++; | |
622 | --away; | |
623 | handle(&M); | |
624 | retval = 1; /* Return that we've actually done some work */ | |
625 | ||
626 | if (M.shm_offset > -1) | |
627 | shm.put ((off_t) M.shm_offset); | |
628 | } | |
629 | ||
630 | return retval; | |
631 | } | |
632 | ||
633 | void | |
634 | DiskdIOStrategy::statfs(StoreEntry & sentry)const | |
635 | { | |
636 | storeAppendPrintf(&sentry, "Pending operations: %d\n", away); | |
637 | } |