]> git.ipfire.org Git - thirdparty/squid.git/blob - src/DiskIO/DiskDaemon/DiskdIOStrategy.cc
SourceLayout: Move time related tools to time/libtime.la (#1001)
[thirdparty/squid.git] / src / DiskIO / DiskDaemon / DiskdIOStrategy.cc
1 /*
2 * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 79 Squid-side DISKD I/O functions. */
10
11 #include "squid.h"
12 #include "comm/Loops.h"
13 #include "ConfigOption.h"
14 #include "diomsg.h"
15 #include "DiskdFile.h"
16 #include "DiskdIOStrategy.h"
17 #include "DiskIO/DiskFile.h"
18 #include "fd.h"
19 #include "SquidConfig.h"
20 #include "SquidIpc.h"
21 #include "StatCounters.h"
22 #include "Store.h"
23 #include "unlinkd.h"
24
25 #include <cerrno>
26 #if HAVE_SYS_IPC_H
27 #include <sys/ipc.h>
28 #endif
29 #if HAVE_SYS_MSG_H
30 #include <sys/msg.h>
31 #endif
32 #if HAVE_SYS_SHM_H
33 #include <sys/shm.h>
34 #endif
35
36 diskd_stats_t diskd_stats;
37
38 size_t DiskdIOStrategy::nextInstanceID (0);
39 const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
40
41 size_t
42 DiskdIOStrategy::newInstance()
43 {
44 return ++nextInstanceID;
45 }
46
47 bool
48 DiskdIOStrategy::shedLoad()
49 {
50 /*
51 * Fail on open() if there are too many requests queued.
52 */
53
54 if (away > magic1) {
55 debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
56
57 return true;
58 }
59
60 return false;
61 }
62
63 int
64 DiskdIOStrategy::load()
65 {
66 /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
67 /* the parse function guarantees magic2 is positivie */
68 return away * 1000 / magic2;
69 }
70
71 void
72 DiskdIOStrategy::openFailed()
73 {
74 ++diskd_stats.open_fail_queue_len;
75 }
76
77 DiskFile::Pointer
78 DiskdIOStrategy::newFile(char const *path)
79 {
80 if (shedLoad()) {
81 openFailed();
82 return NULL;
83 }
84
85 return new DiskdFile (path, this);
86 }
87
88 DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0), smsgid(-1), rmsgid(-1), wfd(-1), instanceID(newInstance())
89 {}
90
91 bool
92 DiskdIOStrategy::unlinkdUseful() const
93 {
94 return true;
95 }
96
97 void
98 DiskdIOStrategy::unlinkFile(char const *path)
99 {
100 if (shedLoad()) {
101 /* Damn, we need to issue a sync unlink here :( */
102 debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
103 unlinkdUnlink(path);
104 return;
105 }
106
107 /* We can attempt a diskd unlink */
108 int x;
109
110 ssize_t shm_offset;
111
112 char *buf;
113
114 buf = (char *)shm.get(&shm_offset);
115
116 xstrncpy(buf, path, SHMBUF_BLKSZ);
117
118 x = send(_MQD_UNLINK,
119 0,
120 (StoreIOState::Pointer )NULL,
121 0,
122 0,
123 shm_offset);
124
125 if (x < 0) {
126 int xerrno = errno;
127 debugs(79, DBG_IMPORTANT, "storeDiskdSend UNLINK: " << xstrerr(xerrno));
128 ::unlink(buf); /* XXX EWW! */
129 // shm.put (shm_offset);
130 }
131
132 ++diskd_stats.unlink.ops;
133 }
134
135 void
136 DiskdIOStrategy::init()
137 {
138 int pid;
139 void * hIpc;
140 int rfd;
141 int ikey;
142 const char *args[5];
143 char skey1[32];
144 char skey2[32];
145 char skey3[32];
146 Ip::Address localhost;
147
148 ikey = (getpid() << 10) + (instanceID << 2);
149 ikey &= 0x7fffffff;
150 smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
151
152 if (smsgid < 0) {
153 int xerrno = errno;
154 debugs(50, DBG_CRITICAL, MYNAME << "msgget: " << xstrerr(xerrno));
155 fatal("msgget failed");
156 }
157
158 rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
159
160 if (rmsgid < 0) {
161 int xerrno = errno;
162 debugs(50, DBG_CRITICAL, MYNAME << "msgget: " << xstrerr(xerrno));
163 fatal("msgget failed");
164 }
165
166 shm.init(ikey, magic2);
167 snprintf(skey1, 32, "%d", ikey);
168 snprintf(skey2, 32, "%d", ikey + 1);
169 snprintf(skey3, 32, "%d", ikey + 2);
170 args[0] = "diskd";
171 args[1] = skey1;
172 args[2] = skey2;
173 args[3] = skey3;
174 args[4] = NULL;
175 localhost.setLocalhost();
176 pid = ipcCreate(IPC_STREAM,
177 Config.Program.diskd,
178 args,
179 "diskd",
180 localhost,
181 &rfd,
182 &wfd,
183 &hIpc);
184
185 if (pid < 0)
186 fatalf("execl: %s", Config.Program.diskd);
187
188 if (rfd != wfd)
189 comm_close(rfd);
190
191 fd_note(wfd, "squid -> diskd");
192
193 commUnsetFdTimeout(wfd);
194 commSetNonBlocking(wfd);
195 Comm::QuickPollRequired();
196 }
197
198 /*
199 * SHM manipulation routines
200 */
201 void
202 SharedMemory::put(ssize_t offset)
203 {
204 int i;
205 assert(offset >= 0);
206 assert(offset < nbufs * SHMBUF_BLKSZ);
207 i = offset / SHMBUF_BLKSZ;
208 assert(i < nbufs);
209 assert(CBIT_TEST(inuse_map, i));
210 CBIT_CLR(inuse_map, i);
211 --diskd_stats.shmbuf_count;
212 }
213
214 void *
215
216 SharedMemory::get(ssize_t * shm_offset)
217 {
218 char *aBuf = NULL;
219 int i;
220
221 for (i = 0; i < nbufs; ++i) {
222 if (CBIT_TEST(inuse_map, i))
223 continue;
224
225 CBIT_SET(inuse_map, i);
226
227 *shm_offset = i * SHMBUF_BLKSZ;
228
229 aBuf = buf + (*shm_offset);
230
231 break;
232 }
233
234 assert(aBuf);
235 assert(aBuf >= buf);
236 assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
237 ++diskd_stats.shmbuf_count;
238
239 if (diskd_stats.max_shmuse < diskd_stats.shmbuf_count)
240 diskd_stats.max_shmuse = diskd_stats.shmbuf_count;
241
242 return aBuf;
243 }
244
245 void
246 SharedMemory::init(int ikey, int magic2)
247 {
248 nbufs = (int)(magic2 * 1.3);
249 id = shmget((key_t) (ikey + 2),
250 nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
251
252 if (id < 0) {
253 int xerrno = errno;
254 debugs(50, DBG_CRITICAL, MYNAME << "shmget: " << xstrerr(xerrno));
255 fatal("shmget failed");
256 }
257
258 buf = (char *)shmat(id, NULL, 0);
259
260 if (buf == (void *) -1) {
261 int xerrno = errno;
262 debugs(50, DBG_CRITICAL, MYNAME << "shmat: " << xstrerr(xerrno));
263 fatal("shmat failed");
264 }
265
266 inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
267 diskd_stats.shmbuf_count += nbufs;
268
269 for (int i = 0; i < nbufs; ++i) {
270 CBIT_SET(inuse_map, i);
271 put (i * SHMBUF_BLKSZ);
272 }
273 }
274
275 void
276 DiskdIOStrategy::unlinkDone(diomsg * M)
277 {
278 debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
279 ++statCounter.syscalls.disk.unlinks;
280
281 if (M->status < 0)
282 ++diskd_stats.unlink.fail;
283 else
284 ++diskd_stats.unlink.success;
285 }
286
287 void
288 DiskdIOStrategy::handle(diomsg * M)
289 {
290 if (!cbdataReferenceValid (M->callback_data)) {
291 /* I.e. already closed file
292 * - say when we have a error opening after
293 * a read was already queued
294 */
295 debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
296 cbdataReferenceDone (M->callback_data);
297 return;
298 }
299
300 /* set errno passed from diskd. makes debugging more meaningful */
301 if (M->status < 0)
302 errno = -M->status;
303
304 if (M->newstyle) {
305 DiskdFile *theFile = (DiskdFile *)M->callback_data;
306 theFile->unlock();
307 theFile->completed (M);
308 } else
309 switch (M->mtype) {
310
311 case _MQD_OPEN:
312
313 case _MQD_CREATE:
314
315 case _MQD_CLOSE:
316
317 case _MQD_READ:
318
319 case _MQD_WRITE:
320 assert (0);
321 break;
322
323 case _MQD_UNLINK:
324 unlinkDone(M);
325 break;
326
327 default:
328 assert(0);
329 break;
330 }
331
332 cbdataReferenceDone (M->callback_data);
333 }
334
335 int
336 DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
337 {
338 diomsg M;
339 M.callback_data = cbdataReference(theFile);
340 theFile->lock();
341 M.requestor = requestor;
342 M.newstyle = true;
343
344 if (requestor)
345 requestor->lock();
346
347 return SEND(&M, mtype, id, size, offset, shm_offset);
348 }
349
350 int
351 DiskdIOStrategy::send(int mtype, int id, RefCount<StoreIOState> sio, size_t size, off_t offset, ssize_t shm_offset)
352 {
353 diomsg M;
354 M.callback_data = cbdataReference(sio.getRaw());
355 M.newstyle = false;
356
357 return SEND(&M, mtype, id, size, offset, shm_offset);
358 }
359
360 int
361 DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
362 {
363 static int send_errors = 0;
364 static int last_seq_no = 0;
365 static int seq_no = 0;
366 int x;
367
368 M->mtype = mtype;
369 M->size = size;
370 M->offset = offset;
371 M->status = -1;
372 M->shm_offset = (int) shm_offset;
373 M->id = id;
374 M->seq_no = ++seq_no;
375
376 if (M->seq_no < last_seq_no)
377 debugs(79, DBG_IMPORTANT, "WARNING: sequencing out of order");
378
379 x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
380
381 last_seq_no = M->seq_no;
382
383 if (0 == x) {
384 ++diskd_stats.sent_count;
385 ++away;
386 } else {
387 int xerrno = errno;
388 debugs(79, DBG_IMPORTANT, MYNAME << "msgsnd: " << xstrerr(xerrno));
389 cbdataReferenceDone(M->callback_data);
390 ++send_errors;
391 assert(send_errors < 100);
392 if (shm_offset > -1)
393 shm.put(shm_offset);
394 }
395
396 /*
397 * We have to drain the queue here if necessary. If we don't,
398 * then we can have a lot of messages in the queue (probably
399 * up to 2*magic1) and we can run out of shared memory buffers.
400 */
401 /*
402 * Note that we call Store::Root().callbackk (for all SDs), rather
403 * than callback for just this SD, so that while
404 * we're "blocking" on this SD we can also handle callbacks
405 * from other SDs that might be ready.
406 */
407
408 struct timeval delay = {0, 1};
409
410 while (away > magic2) {
411 select(0, NULL, NULL, NULL, &delay);
412 Store::Root().callback();
413
414 if (delay.tv_usec < 1000000)
415 delay.tv_usec <<= 1;
416 }
417
418 return x;
419 }
420
421 ConfigOption *
422 DiskdIOStrategy::getOptionTree() const
423 {
424 ConfigOptionVector *result = new ConfigOptionVector;
425 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ1Parse, &DiskdIOStrategy::optionQ1Dump));
426 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ2Parse, &DiskdIOStrategy::optionQ2Dump));
427 return result;
428 }
429
430 bool
431 DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int isaReconfig)
432 {
433 if (strcmp(name, "Q1") != 0)
434 return false;
435
436 int old_magic1 = magic1;
437
438 magic1 = atoi(value);
439
440 if (!isaReconfig)
441 return true;
442
443 if (old_magic1 < magic1) {
444 /*
445 * This is because shm.nbufs is computed at startup, when
446 * we call shmget(). We can't increase the Q1/Q2 parameters
447 * beyond their initial values because then we might have
448 * more "Q2 messages" than shared memory chunks, and this
449 * will cause an assertion in storeDiskdShmGet().
450 */
451 /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
452 debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
453 magic1 = old_magic1;
454 return true;
455 }
456
457 if (old_magic1 != magic1)
458 debugs(3, DBG_IMPORTANT, "cache_dir new Q1 value '" << magic1 << "'");
459
460 return true;
461 }
462
463 void
464 DiskdIOStrategy::optionQ1Dump(StoreEntry * e) const
465 {
466 storeAppendPrintf(e, " Q1=%d", magic1);
467 }
468
469 bool
470 DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int isaReconfig)
471 {
472 if (strcmp(name, "Q2") != 0)
473 return false;
474
475 int old_magic2 = magic2;
476
477 magic2 = atoi(value);
478
479 if (!isaReconfig)
480 return true;
481
482 if (old_magic2 < magic2) {
483 /* See comments in Q1 function above */
484 debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
485 magic2 = old_magic2;
486 return true;
487 }
488
489 if (old_magic2 != magic2)
490 debugs(3, DBG_IMPORTANT, "cache_dir new Q2 value '" << magic2 << "'");
491
492 return true;
493 }
494
495 void
496 DiskdIOStrategy::optionQ2Dump(StoreEntry * e) const
497 {
498 storeAppendPrintf(e, " Q2=%d", magic2);
499 }
500
501 /*
502 * Sync any pending data. We just sit around and read the queue
503 * until the data has finished writing.
504 */
505 void
506 DiskdIOStrategy::sync()
507 {
508 static time_t lastmsg = 0;
509
510 while (away > 0) {
511 if (squid_curtime > lastmsg) {
512 debugs(47, DBG_IMPORTANT, "storeDiskdDirSync: " << away << " messages away");
513 lastmsg = squid_curtime;
514 }
515
516 callback();
517 }
518 }
519
520 /*
521 * Handle callbacks. If we have more than magic2 requests away, we block
522 * until the queue is below magic2. Otherwise, we simply return when we
523 * don't get a message.
524 */
525
526 int
527 DiskdIOStrategy::callback()
528 {
529 diomsg M;
530 int x;
531 int retval = 0;
532
533 if (away >= magic2) {
534 ++diskd_stats.block_queue_len;
535 retval = 1;
536 /* We might not have anything to do, but our queue
537 * is full.. */
538 }
539
540 if (diskd_stats.sent_count - diskd_stats.recv_count >
541 diskd_stats.max_away) {
542 diskd_stats.max_away = diskd_stats.sent_count - diskd_stats.recv_count;
543 }
544
545 while (1) {
546 x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
547
548 if (x < 0)
549 break;
550 else if (x != diomsg::msg_snd_rcv_sz) {
551 debugs(47, DBG_IMPORTANT, "storeDiskdDirCallback: msgget returns " << x);
552 break;
553 }
554
555 ++diskd_stats.recv_count;
556 --away;
557 handle(&M);
558 retval = 1; /* Return that we've actually done some work */
559
560 if (M.shm_offset > -1)
561 shm.put ((off_t) M.shm_offset);
562 }
563
564 return retval;
565 }
566
567 void
568 DiskdIOStrategy::statfs(StoreEntry & sentry)const
569 {
570 storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
571 }
572