]> git.ipfire.org Git - thirdparty/squid.git/blob - src/DiskIO/DiskDaemon/DiskdIOStrategy.cc
Renamed squid.h to squid-old.h and config.h to squid.h.
[thirdparty/squid.git] / src / DiskIO / DiskDaemon / DiskdIOStrategy.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 79 Squid-side DISKD I/O functions.
5 * AUTHOR: Duane Wessels
6 *
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
9 *
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
18 *
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
23 *
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
32 *
33 * Copyright (c) 2003, Robert Collins <robertc@squid-cache.org>
34 */
35
36 #include "squid-old.h"
37 #include "comm/Loops.h"
38
39 #include <sys/ipc.h>
40 #include <sys/msg.h>
41 #include <sys/shm.h>
42 #include "DiskdIOStrategy.h"
43 #include "ConfigOption.h"
44 #include "DiskIO/DiskFile.h"
45 #include "DiskdFile.h"
46 #include "diomsg.h"
47 /* for statfs */
48 #include "Store.h"
49 #include "StatCounters.h"
50 #include "SquidTime.h"
51
52 diskd_stats_t diskd_stats;
53
54 size_t DiskdIOStrategy::nextInstanceID (0);
55 const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
56
57 size_t
58 DiskdIOStrategy::newInstance()
59 {
60 return ++nextInstanceID;
61 }
62
63 bool
64 DiskdIOStrategy::shedLoad()
65 {
66 /*
67 * Fail on open() if there are too many requests queued.
68 */
69
70 if (away > magic1) {
71 debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
72
73 return true;
74 }
75
76 return false;
77 }
78
79 int
80 DiskdIOStrategy::load()
81 {
82 /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
83 /* the parse function guarantees magic2 is positivie */
84 return away * 1000 / magic2;
85 }
86
87 void
88 DiskdIOStrategy::openFailed()
89 {
90 diskd_stats.open_fail_queue_len++;
91 }
92
93 DiskFile::Pointer
94 DiskdIOStrategy::newFile(char const *path)
95 {
96 if (shedLoad()) {
97 openFailed();
98 return NULL;
99 }
100
101 return new DiskdFile (path, this);
102 }
103
104 DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0) , smsgid(-1), rmsgid(-1), wfd(-1) , instanceID(newInstance())
105 {}
106
107 bool
108 DiskdIOStrategy::unlinkdUseful() const
109 {
110 return true;
111 }
112
113 void
114 DiskdIOStrategy::unlinkFile(char const *path)
115 {
116 if (shedLoad()) {
117 /* Damn, we need to issue a sync unlink here :( */
118 debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
119 #if USE_UNLINKD
120
121 unlinkdUnlink(path);
122 #else
123
124 unlink(path);
125 #endif
126
127 return;
128 }
129
130 /* We can attempt a diskd unlink */
131 int x;
132
133 ssize_t shm_offset;
134
135 char *buf;
136
137 buf = (char *)shm.get(&shm_offset);
138
139 xstrncpy(buf, path, SHMBUF_BLKSZ);
140
141 x = send(_MQD_UNLINK,
142 0,
143 (StoreIOState::Pointer )NULL,
144 0,
145 0,
146 shm_offset);
147
148 if (x < 0) {
149 debugs(79, 1, "storeDiskdSend UNLINK: " << xstrerror());
150 ::unlink(buf); /* XXX EWW! */
151 // shm.put (shm_offset);
152 }
153
154 diskd_stats.unlink.ops++;
155 }
156
157 void
158 DiskdIOStrategy::init()
159 {
160 int pid;
161 void * hIpc;
162 int rfd;
163 int ikey;
164 const char *args[5];
165 char skey1[32];
166 char skey2[32];
167 char skey3[32];
168 Ip::Address localhost;
169
170 ikey = (getpid() << 10) + (instanceID << 2);
171 ikey &= 0x7fffffff;
172 smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
173
174 if (smsgid < 0) {
175 debugs(50, 0, "storeDiskdInit: msgget: " << xstrerror());
176 fatal("msgget failed");
177 }
178
179 rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
180
181 if (rmsgid < 0) {
182 debugs(50, 0, "storeDiskdInit: msgget: " << xstrerror());
183 fatal("msgget failed");
184 }
185
186 shm.init(ikey, magic2);
187 snprintf(skey1, 32, "%d", ikey);
188 snprintf(skey2, 32, "%d", ikey + 1);
189 snprintf(skey3, 32, "%d", ikey + 2);
190 args[0] = "diskd";
191 args[1] = skey1;
192 args[2] = skey2;
193 args[3] = skey3;
194 args[4] = NULL;
195 localhost.SetLocalhost();
196 pid = ipcCreate(IPC_STREAM,
197 Config.Program.diskd,
198 args,
199 "diskd",
200 localhost,
201 &rfd,
202 &wfd,
203 &hIpc);
204
205 if (pid < 0)
206 fatalf("execl: %s", Config.Program.diskd);
207
208 if (rfd != wfd)
209 comm_close(rfd);
210
211 fd_note(wfd, "squid -> diskd");
212
213 commUnsetFdTimeout(wfd);
214 commSetNonBlocking(wfd);
215 Comm::QuickPollRequired();
216 }
217
218 /*
219 * SHM manipulation routines
220 */
221 void
222 SharedMemory::put(ssize_t offset)
223 {
224 int i;
225 assert(offset >= 0);
226 assert(offset < nbufs * SHMBUF_BLKSZ);
227 i = offset / SHMBUF_BLKSZ;
228 assert(i < nbufs);
229 assert(CBIT_TEST(inuse_map, i));
230 CBIT_CLR(inuse_map, i);
231 --diskd_stats.shmbuf_count;
232 }
233
234 void *
235
236 SharedMemory::get(ssize_t * shm_offset)
237 {
238 char *aBuf = NULL;
239 int i;
240
241 for (i = 0; i < nbufs; i++) {
242 if (CBIT_TEST(inuse_map, i))
243 continue;
244
245 CBIT_SET(inuse_map, i);
246
247 *shm_offset = i * SHMBUF_BLKSZ;
248
249 aBuf = buf + (*shm_offset);
250
251 break;
252 }
253
254 assert(aBuf);
255 assert(aBuf >= buf);
256 assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
257 diskd_stats.shmbuf_count++;
258
259 if (diskd_stats.max_shmuse < diskd_stats.shmbuf_count)
260 diskd_stats.max_shmuse = diskd_stats.shmbuf_count;
261
262 return aBuf;
263 }
264
265 void
266 SharedMemory::init(int ikey, int magic2)
267 {
268 nbufs = (int)(magic2 * 1.3);
269 id = shmget((key_t) (ikey + 2),
270 nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
271
272 if (id < 0) {
273 debugs(50, 0, "storeDiskdInit: shmget: " << xstrerror());
274 fatal("shmget failed");
275 }
276
277 buf = (char *)shmat(id, NULL, 0);
278
279 if (buf == (void *) -1) {
280 debugs(50, 0, "storeDiskdInit: shmat: " << xstrerror());
281 fatal("shmat failed");
282 }
283
284 inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
285 diskd_stats.shmbuf_count += nbufs;
286
287 for (int i = 0; i < nbufs; i++) {
288 CBIT_SET(inuse_map, i);
289 put (i * SHMBUF_BLKSZ);
290 }
291 }
292
293 void
294 DiskdIOStrategy::unlinkDone(diomsg * M)
295 {
296 debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
297 ++statCounter.syscalls.disk.unlinks;
298
299 if (M->status < 0)
300 diskd_stats.unlink.fail++;
301 else
302 diskd_stats.unlink.success++;
303 }
304
305 void
306 DiskdIOStrategy::handle(diomsg * M)
307 {
308 if (!cbdataReferenceValid (M->callback_data)) {
309 /* I.e. already closed file
310 * - say when we have a error opening after
311 * a read was already queued
312 */
313 debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
314 cbdataReferenceDone (M->callback_data);
315 return;
316 }
317
318
319 /* set errno passed from diskd. makes debugging more meaningful */
320 if (M->status < 0)
321 errno = -M->status;
322
323 if (M->newstyle) {
324 DiskdFile *theFile = (DiskdFile *)M->callback_data;
325 theFile->RefCountDereference();
326 theFile->completed (M);
327 } else
328 switch (M->mtype) {
329
330 case _MQD_OPEN:
331
332 case _MQD_CREATE:
333
334 case _MQD_CLOSE:
335
336 case _MQD_READ:
337
338 case _MQD_WRITE:
339 assert (0);
340 break;
341
342 case _MQD_UNLINK:
343 unlinkDone(M);
344 break;
345
346 default:
347 assert(0);
348 break;
349 }
350
351 cbdataReferenceDone (M->callback_data);
352 }
353
354 int
355 DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, RefCountable_ *requestor)
356 {
357 diomsg M;
358 M.callback_data = cbdataReference(theFile);
359 theFile->RefCountReference();
360 M.requestor = requestor;
361 M.newstyle = true;
362
363 if (requestor)
364 requestor->RefCountReference();
365
366 return SEND(&M, mtype, id, size, offset, shm_offset);
367 }
368
369 int
370 DiskdIOStrategy::send(int mtype, int id, RefCount<StoreIOState> sio, size_t size, off_t offset, ssize_t shm_offset)
371 {
372 diomsg M;
373 M.callback_data = cbdataReference(sio.getRaw());
374 M.newstyle = false;
375
376 return SEND(&M, mtype, id, size, offset, shm_offset);
377 }
378
379 int
380 DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
381 {
382 static int send_errors = 0;
383 static int last_seq_no = 0;
384 static int seq_no = 0;
385 int x;
386
387 M->mtype = mtype;
388 M->size = size;
389 M->offset = offset;
390 M->status = -1;
391 M->shm_offset = (int) shm_offset;
392 M->id = id;
393 M->seq_no = ++seq_no;
394
395 if (M->seq_no < last_seq_no)
396 debugs(79, 1, "WARNING: sequencing out of order");
397
398 x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
399
400 last_seq_no = M->seq_no;
401
402 if (0 == x) {
403 diskd_stats.sent_count++;
404 away++;
405 } else {
406 debugs(79, 1, "storeDiskdSend: msgsnd: " << xstrerror());
407 cbdataReferenceDone(M->callback_data);
408 assert(++send_errors < 100);
409 if (shm_offset > -1)
410 shm.put(shm_offset);
411 }
412
413 /*
414 * We have to drain the queue here if necessary. If we don't,
415 * then we can have a lot of messages in the queue (probably
416 * up to 2*magic1) and we can run out of shared memory buffers.
417 */
418 /*
419 * Note that we call Store::Root().callbackk (for all SDs), rather
420 * than callback for just this SD, so that while
421 * we're "blocking" on this SD we can also handle callbacks
422 * from other SDs that might be ready.
423 */
424
425 struct timeval delay = {0, 1};
426
427 while (away > magic2) {
428 select(0, NULL, NULL, NULL, &delay);
429 Store::Root().callback();
430
431 if (delay.tv_usec < 1000000)
432 delay.tv_usec <<= 1;
433 }
434
435 return x;
436 }
437
438 ConfigOption *
439 DiskdIOStrategy::getOptionTree() const
440 {
441 ConfigOptionVector *result = new ConfigOptionVector;
442 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ1Parse, &DiskdIOStrategy::optionQ1Dump));
443 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ2Parse, &DiskdIOStrategy::optionQ2Dump));
444 return result;
445 }
446
447 bool
448 DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int isaReconfig)
449 {
450 if (strcmp(name, "Q1") != 0)
451 return false;
452
453 int old_magic1 = magic1;
454
455 magic1 = atoi(value);
456
457 if (!isaReconfig)
458 return true;
459
460 if (old_magic1 < magic1) {
461 /*
462 * This is because shm.nbufs is computed at startup, when
463 * we call shmget(). We can't increase the Q1/Q2 parameters
464 * beyond their initial values because then we might have
465 * more "Q2 messages" than shared memory chunks, and this
466 * will cause an assertion in storeDiskdShmGet().
467 */
468 /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
469 debugs(3, 1, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
470 magic1 = old_magic1;
471 return true;
472 }
473
474 if (old_magic1 != magic1)
475 debugs(3, 1, "cache_dir new Q1 value '" << magic1 << "'");
476
477 return true;
478 }
479
480 void
481 DiskdIOStrategy::optionQ1Dump(StoreEntry * e) const
482 {
483 storeAppendPrintf(e, " Q1=%d", magic1);
484 }
485
486 bool
487 DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int isaReconfig)
488 {
489 if (strcmp(name, "Q2") != 0)
490 return false;
491
492 int old_magic2 = magic2;
493
494 magic2 = atoi(value);
495
496 if (!isaReconfig)
497 return true;
498
499 if (old_magic2 < magic2) {
500 /* See comments in Q1 function above */
501 debugs(3, 1, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
502 magic2 = old_magic2;
503 return true;
504 }
505
506 if (old_magic2 != magic2)
507 debugs(3, 1, "cache_dir new Q2 value '" << magic2 << "'");
508
509 return true;
510 }
511
512 void
513 DiskdIOStrategy::optionQ2Dump(StoreEntry * e) const
514 {
515 storeAppendPrintf(e, " Q2=%d", magic2);
516 }
517
518 /*
519 * Sync any pending data. We just sit around and read the queue
520 * until the data has finished writing.
521 */
522 void
523 DiskdIOStrategy::sync()
524 {
525 static time_t lastmsg = 0;
526
527 while (away > 0) {
528 if (squid_curtime > lastmsg) {
529 debugs(47, 1, "storeDiskdDirSync: " << away << " messages away");
530 lastmsg = squid_curtime;
531 }
532
533 callback();
534 }
535 }
536
537
538 /*
539 * Handle callbacks. If we have more than magic2 requests away, we block
540 * until the queue is below magic2. Otherwise, we simply return when we
541 * don't get a message.
542 */
543
544 int
545 DiskdIOStrategy::callback()
546 {
547 diomsg M;
548 int x;
549 int retval = 0;
550
551 if (away >= magic2) {
552 diskd_stats.block_queue_len++;
553 retval = 1;
554 /* We might not have anything to do, but our queue
555 * is full.. */
556 }
557
558 if (diskd_stats.sent_count - diskd_stats.recv_count >
559 diskd_stats.max_away) {
560 diskd_stats.max_away = diskd_stats.sent_count - diskd_stats.recv_count;
561 }
562
563 while (1) {
564 #ifdef ALWAYS_ZERO_BUFFERS
565 memset(&M, '\0', sizeof(M));
566 #endif
567
568 x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
569
570 if (x < 0)
571 break;
572 else if (x != diomsg::msg_snd_rcv_sz) {
573 debugs(47, 1, "storeDiskdDirCallback: msgget returns " << x);
574 break;
575 }
576
577 diskd_stats.recv_count++;
578 --away;
579 handle(&M);
580 retval = 1; /* Return that we've actually done some work */
581
582 if (M.shm_offset > -1)
583 shm.put ((off_t) M.shm_offset);
584 }
585
586 return retval;
587 }
588
589 void
590 DiskdIOStrategy::statfs(StoreEntry & sentry)const
591 {
592 storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
593 }