]> git.ipfire.org Git - thirdparty/squid.git/blame - src/DiskIO/DiskDaemon/DiskdIOStrategy.cc
Source Format Enforcement (#532)
[thirdparty/squid.git] / src / DiskIO / DiskDaemon / DiskdIOStrategy.cc
CommitLineData
b9ae18aa 1/*
77b1029d 2 * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
b9ae18aa 3 *
bbc27441
AJ
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
b9ae18aa 7 */
8
bbc27441
AJ
9/* DEBUG: section 79 Squid-side DISKD I/O functions. */
10
582c2af2 11#include "squid.h"
d841c88d 12#include "comm/Loops.h"
b9ae18aa 13#include "ConfigOption.h"
602d9612
A
14#include "diomsg.h"
15#include "DiskdFile.h"
582c2af2 16#include "DiskdIOStrategy.h"
b9ae18aa 17#include "DiskIO/DiskFile.h"
c4ad1349 18#include "fd.h"
4d5904f7 19#include "SquidConfig.h"
96097880 20#include "SquidIpc.h"
985c86bc 21#include "SquidTime.h"
602d9612
A
22#include "StatCounters.h"
23#include "Store.h"
b6b42084 24#include "unlinkd.h"
b9ae18aa 25
1a30fdf5 26#include <cerrno>
d440d5a4 27#if HAVE_SYS_IPC_H
582c2af2 28#include <sys/ipc.h>
d440d5a4
AJ
29#endif
30#if HAVE_SYS_MSG_H
582c2af2 31#include <sys/msg.h>
d440d5a4
AJ
32#endif
33#if HAVE_SYS_SHM_H
582c2af2 34#include <sys/shm.h>
d440d5a4 35#endif
582c2af2 36
b9ae18aa 37diskd_stats_t diskd_stats;
38
39size_t DiskdIOStrategy::nextInstanceID (0);
40const int diomsg::msg_snd_rcv_sz = sizeof(diomsg) - sizeof(mtyp_t);
41
42size_t
43DiskdIOStrategy::newInstance()
44{
45 return ++nextInstanceID;
46}
47
48bool
49DiskdIOStrategy::shedLoad()
50{
51 /*
52 * Fail on open() if there are too many requests queued.
53 */
54
55 if (away > magic1) {
bf8fe701 56 debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
b9ae18aa 57
58 return true;
59 }
60
61 return false;
62}
63
64int
65DiskdIOStrategy::load()
66{
67 /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
68 /* the parse function guarantees magic2 is positivie */
69 return away * 1000 / magic2;
70}
71
72void
73DiskdIOStrategy::openFailed()
74{
cb4185f1 75 ++diskd_stats.open_fail_queue_len;
b9ae18aa 76}
77
78DiskFile::Pointer
79DiskdIOStrategy::newFile(char const *path)
80{
81 if (shedLoad()) {
82 openFailed();
83 return NULL;
84 }
85
86 return new DiskdFile (path, this);
87}
88
9e167fa2 89DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0), smsgid(-1), rmsgid(-1), wfd(-1), instanceID(newInstance())
b9ae18aa 90{}
91
c521ad17
DK
92bool
93DiskdIOStrategy::unlinkdUseful() const
94{
95 return true;
96}
97
b9ae18aa 98void
99DiskdIOStrategy::unlinkFile(char const *path)
100{
101 if (shedLoad()) {
102 /* Damn, we need to issue a sync unlink here :( */
bf8fe701 103 debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
b9ae18aa 104 unlinkdUnlink(path);
b9ae18aa 105 return;
106 }
107
108 /* We can attempt a diskd unlink */
109 int x;
110
ee139403 111 ssize_t shm_offset;
b9ae18aa 112
113 char *buf;
114
115 buf = (char *)shm.get(&shm_offset);
116
117 xstrncpy(buf, path, SHMBUF_BLKSZ);
118
119 x = send(_MQD_UNLINK,
120 0,
121 (StoreIOState::Pointer )NULL,
122 0,
123 0,
124 shm_offset);
125
126 if (x < 0) {
b69e9ffa
AJ
127 int xerrno = errno;
128 debugs(79, DBG_IMPORTANT, "storeDiskdSend UNLINK: " << xstrerr(xerrno));
f53969cc 129 ::unlink(buf); /* XXX EWW! */
b9ae18aa 130 // shm.put (shm_offset);
131 }
132
cb4185f1 133 ++diskd_stats.unlink.ops;
b9ae18aa 134}
135
136void
137DiskdIOStrategy::init()
138{
b5d712b5 139 int pid;
140 void * hIpc;
b9ae18aa 141 int rfd;
142 int ikey;
143 const char *args[5];
144 char skey1[32];
145 char skey2[32];
146 char skey3[32];
b7ac5457 147 Ip::Address localhost;
b9ae18aa 148
149 ikey = (getpid() << 10) + (instanceID << 2);
150 ikey &= 0x7fffffff;
151 smsgid = msgget((key_t) ikey, 0700 | IPC_CREAT);
152
153 if (smsgid < 0) {
b69e9ffa
AJ
154 int xerrno = errno;
155 debugs(50, DBG_CRITICAL, MYNAME << "msgget: " << xstrerr(xerrno));
b9ae18aa 156 fatal("msgget failed");
157 }
158
159 rmsgid = msgget((key_t) (ikey + 1), 0700 | IPC_CREAT);
160
161 if (rmsgid < 0) {
b69e9ffa
AJ
162 int xerrno = errno;
163 debugs(50, DBG_CRITICAL, MYNAME << "msgget: " << xstrerr(xerrno));
b9ae18aa 164 fatal("msgget failed");
165 }
166
167 shm.init(ikey, magic2);
168 snprintf(skey1, 32, "%d", ikey);
169 snprintf(skey2, 32, "%d", ikey + 1);
170 snprintf(skey3, 32, "%d", ikey + 2);
171 args[0] = "diskd";
172 args[1] = skey1;
173 args[2] = skey2;
174 args[3] = skey3;
175 args[4] = NULL;
4dd643d5 176 localhost.setLocalhost();
b5d712b5 177 pid = ipcCreate(IPC_STREAM,
178 Config.Program.diskd,
179 args,
180 "diskd",
cc192b50 181 localhost,
b5d712b5 182 &rfd,
183 &wfd,
184 &hIpc);
185
186 if (pid < 0)
b9ae18aa 187 fatalf("execl: %s", Config.Program.diskd);
188
189 if (rfd != wfd)
190 comm_close(rfd);
191
192 fd_note(wfd, "squid -> diskd");
193
933dd095 194 commUnsetFdTimeout(wfd);
b9ae18aa 195 commSetNonBlocking(wfd);
d841c88d 196 Comm::QuickPollRequired();
b9ae18aa 197}
198
199/*
200 * SHM manipulation routines
201 */
202void
ee139403 203SharedMemory::put(ssize_t offset)
b9ae18aa 204{
205 int i;
206 assert(offset >= 0);
207 assert(offset < nbufs * SHMBUF_BLKSZ);
208 i = offset / SHMBUF_BLKSZ;
209 assert(i < nbufs);
210 assert(CBIT_TEST(inuse_map, i));
211 CBIT_CLR(inuse_map, i);
212 --diskd_stats.shmbuf_count;
213}
214
215void *
216
ee139403 217SharedMemory::get(ssize_t * shm_offset)
b9ae18aa 218{
219 char *aBuf = NULL;
220 int i;
221
cb4185f1 222 for (i = 0; i < nbufs; ++i) {
b9ae18aa 223 if (CBIT_TEST(inuse_map, i))
224 continue;
225
226 CBIT_SET(inuse_map, i);
227
228 *shm_offset = i * SHMBUF_BLKSZ;
229
230 aBuf = buf + (*shm_offset);
231
232 break;
233 }
234
235 assert(aBuf);
236 assert(aBuf >= buf);
237 assert(aBuf < buf + (nbufs * SHMBUF_BLKSZ));
cb4185f1 238 ++diskd_stats.shmbuf_count;
b9ae18aa 239
240 if (diskd_stats.max_shmuse < diskd_stats.shmbuf_count)
241 diskd_stats.max_shmuse = diskd_stats.shmbuf_count;
242
243 return aBuf;
244}
245
246void
247SharedMemory::init(int ikey, int magic2)
248{
249 nbufs = (int)(magic2 * 1.3);
250 id = shmget((key_t) (ikey + 2),
251 nbufs * SHMBUF_BLKSZ, 0600 | IPC_CREAT);
252
253 if (id < 0) {
b69e9ffa
AJ
254 int xerrno = errno;
255 debugs(50, DBG_CRITICAL, MYNAME << "shmget: " << xstrerr(xerrno));
b9ae18aa 256 fatal("shmget failed");
257 }
258
259 buf = (char *)shmat(id, NULL, 0);
260
261 if (buf == (void *) -1) {
b69e9ffa
AJ
262 int xerrno = errno;
263 debugs(50, DBG_CRITICAL, MYNAME << "shmat: " << xstrerr(xerrno));
b9ae18aa 264 fatal("shmat failed");
265 }
266
267 inuse_map = (char *)xcalloc((nbufs + 7) / 8, 1);
268 diskd_stats.shmbuf_count += nbufs;
269
cb4185f1 270 for (int i = 0; i < nbufs; ++i) {
b9ae18aa 271 CBIT_SET(inuse_map, i);
272 put (i * SHMBUF_BLKSZ);
273 }
274}
275
276void
277DiskdIOStrategy::unlinkDone(diomsg * M)
278{
bf8fe701 279 debugs(79, 3, "storeDiskdUnlinkDone: file " << shm.buf + M->shm_offset << " status " << M->status);
e4f1fdae 280 ++statCounter.syscalls.disk.unlinks;
b9ae18aa 281
282 if (M->status < 0)
cb4185f1 283 ++diskd_stats.unlink.fail;
b9ae18aa 284 else
cb4185f1 285 ++diskd_stats.unlink.success;
b9ae18aa 286}
287
288void
289DiskdIOStrategy::handle(diomsg * M)
290{
291 if (!cbdataReferenceValid (M->callback_data)) {
292 /* I.e. already closed file
293 * - say when we have a error opening after
294 * a read was already queued
295 */
26ac0430 296 debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M->callback_data);
b9ae18aa 297 cbdataReferenceDone (M->callback_data);
298 return;
299 }
300
a1ad81aa 301 /* set errno passed from diskd. makes debugging more meaningful */
302 if (M->status < 0)
303 errno = -M->status;
304
b9ae18aa 305 if (M->newstyle) {
306 DiskdFile *theFile = (DiskdFile *)M->callback_data;
8bf217bd 307 theFile->unlock();
b9ae18aa 308 theFile->completed (M);
309 } else
310 switch (M->mtype) {
311
312 case _MQD_OPEN:
313
314 case _MQD_CREATE:
315
316 case _MQD_CLOSE:
317
318 case _MQD_READ:
319
320 case _MQD_WRITE:
321 assert (0);
322 break;
323
324 case _MQD_UNLINK:
325 unlinkDone(M);
326 break;
327
328 default:
329 assert(0);
330 break;
331 }
332
333 cbdataReferenceDone (M->callback_data);
334}
335
336int
8bf217bd 337DiskdIOStrategy::send(int mtype, int id, DiskdFile *theFile, size_t size, off_t offset, ssize_t shm_offset, Lock *requestor)
b9ae18aa 338{
b9ae18aa 339 diomsg M;
b9ae18aa 340 M.callback_data = cbdataReference(theFile);
8bf217bd 341 theFile->lock();
b9ae18aa 342 M.requestor = requestor;
f30dcf2a 343 M.newstyle = true;
b9ae18aa 344
345 if (requestor)
8bf217bd 346 requestor->lock();
b9ae18aa 347
f30dcf2a 348 return SEND(&M, mtype, id, size, offset, shm_offset);
b9ae18aa 349}
350
351int
63be0a78 352DiskdIOStrategy::send(int mtype, int id, RefCount<StoreIOState> sio, size_t size, off_t offset, ssize_t shm_offset)
b9ae18aa 353{
b9ae18aa 354 diomsg M;
f30dcf2a 355 M.callback_data = cbdataReference(sio.getRaw());
356 M.newstyle = false;
357
358 return SEND(&M, mtype, id, size, offset, shm_offset);
359}
360
361int
ee139403 362DiskdIOStrategy::SEND(diomsg *M, int mtype, int id, size_t size, off_t offset, ssize_t shm_offset)
f30dcf2a 363{
b9ae18aa 364 static int send_errors = 0;
365 static int last_seq_no = 0;
366 static int seq_no = 0;
f30dcf2a 367 int x;
368
369 M->mtype = mtype;
370 M->size = size;
371 M->offset = offset;
372 M->status = -1;
373 M->shm_offset = (int) shm_offset;
374 M->id = id;
375 M->seq_no = ++seq_no;
b9ae18aa 376
f30dcf2a 377 if (M->seq_no < last_seq_no)
e0236918 378 debugs(79, DBG_IMPORTANT, "WARNING: sequencing out of order");
b9ae18aa 379
f30dcf2a 380 x = msgsnd(smsgid, M, diomsg::msg_snd_rcv_sz, IPC_NOWAIT);
b9ae18aa 381
f30dcf2a 382 last_seq_no = M->seq_no;
b9ae18aa 383
384 if (0 == x) {
cb4185f1
FC
385 ++diskd_stats.sent_count;
386 ++away;
b9ae18aa 387 } else {
b69e9ffa
AJ
388 int xerrno = errno;
389 debugs(79, DBG_IMPORTANT, MYNAME << "msgsnd: " << xstrerr(xerrno));
f30dcf2a 390 cbdataReferenceDone(M->callback_data);
7f56277d
AJ
391 ++send_errors;
392 assert(send_errors < 100);
6a786390 393 if (shm_offset > -1)
394 shm.put(shm_offset);
b9ae18aa 395 }
396
397 /*
398 * We have to drain the queue here if necessary. If we don't,
399 * then we can have a lot of messages in the queue (probably
400 * up to 2*magic1) and we can run out of shared memory buffers.
401 */
402 /*
c8f4eac4 403 * Note that we call Store::Root().callbackk (for all SDs), rather
404 * than callback for just this SD, so that while
b9ae18aa 405 * we're "blocking" on this SD we can also handle callbacks
406 * from other SDs that might be ready.
407 */
b9ae18aa 408
9a518127 409 struct timeval delay = {0, 1};
b9ae18aa 410
9a518127 411 while (away > magic2) {
b9ae18aa 412 select(0, NULL, NULL, NULL, &delay);
c8f4eac4 413 Store::Root().callback();
b9ae18aa 414
415 if (delay.tv_usec < 1000000)
416 delay.tv_usec <<= 1;
417 }
418
419 return x;
420}
421
422ConfigOption *
423DiskdIOStrategy::getOptionTree() const
424{
425 ConfigOptionVector *result = new ConfigOptionVector;
426 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ1Parse, &DiskdIOStrategy::optionQ1Dump));
427 result->options.push_back(new ConfigOptionAdapter<DiskdIOStrategy>(*const_cast<DiskdIOStrategy *>(this), &DiskdIOStrategy::optionQ2Parse, &DiskdIOStrategy::optionQ2Dump));
428 return result;
429}
430
431bool
350e2aec 432DiskdIOStrategy::optionQ1Parse(const char *name, const char *value, int isaReconfig)
b9ae18aa 433{
434 if (strcmp(name, "Q1") != 0)
435 return false;
436
437 int old_magic1 = magic1;
438
439 magic1 = atoi(value);
440
350e2aec 441 if (!isaReconfig)
b9ae18aa 442 return true;
443
444 if (old_magic1 < magic1) {
445 /*
446 * This is because shm.nbufs is computed at startup, when
447 * we call shmget(). We can't increase the Q1/Q2 parameters
448 * beyond their initial values because then we might have
449 * more "Q2 messages" than shared memory chunks, and this
450 * will cause an assertion in storeDiskdShmGet().
451 */
452 /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
e0236918 453 debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
b9ae18aa 454 magic1 = old_magic1;
455 return true;
456 }
457
458 if (old_magic1 != magic1)
e0236918 459 debugs(3, DBG_IMPORTANT, "cache_dir new Q1 value '" << magic1 << "'");
b9ae18aa 460
461 return true;
462}
463
464void
465DiskdIOStrategy::optionQ1Dump(StoreEntry * e) const
466{
467 storeAppendPrintf(e, " Q1=%d", magic1);
468}
469
470bool
350e2aec 471DiskdIOStrategy::optionQ2Parse(const char *name, const char *value, int isaReconfig)
b9ae18aa 472{
473 if (strcmp(name, "Q2") != 0)
474 return false;
475
476 int old_magic2 = magic2;
477
478 magic2 = atoi(value);
479
350e2aec 480 if (!isaReconfig)
b9ae18aa 481 return true;
482
483 if (old_magic2 < magic2) {
484 /* See comments in Q1 function above */
e0236918 485 debugs(3, DBG_IMPORTANT, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
b9ae18aa 486 magic2 = old_magic2;
487 return true;
488 }
489
490 if (old_magic2 != magic2)
e0236918 491 debugs(3, DBG_IMPORTANT, "cache_dir new Q2 value '" << magic2 << "'");
b9ae18aa 492
493 return true;
494}
495
496void
497DiskdIOStrategy::optionQ2Dump(StoreEntry * e) const
498{
499 storeAppendPrintf(e, " Q2=%d", magic2);
500}
501
502/*
503 * Sync any pending data. We just sit around and read the queue
504 * until the data has finished writing.
505 */
506void
507DiskdIOStrategy::sync()
508{
509 static time_t lastmsg = 0;
510
511 while (away > 0) {
512 if (squid_curtime > lastmsg) {
e0236918 513 debugs(47, DBG_IMPORTANT, "storeDiskdDirSync: " << away << " messages away");
b9ae18aa 514 lastmsg = squid_curtime;
515 }
516
517 callback();
518 }
519}
520
b9ae18aa 521/*
522 * Handle callbacks. If we have more than magic2 requests away, we block
523 * until the queue is below magic2. Otherwise, we simply return when we
524 * don't get a message.
525 */
526
527int
528DiskdIOStrategy::callback()
529{
530 diomsg M;
531 int x;
532 int retval = 0;
533
534 if (away >= magic2) {
cb4185f1 535 ++diskd_stats.block_queue_len;
b9ae18aa 536 retval = 1;
537 /* We might not have anything to do, but our queue
538 * is full.. */
539 }
540
541 if (diskd_stats.sent_count - diskd_stats.recv_count >
542 diskd_stats.max_away) {
543 diskd_stats.max_away = diskd_stats.sent_count - diskd_stats.recv_count;
544 }
545
546 while (1) {
f53969cc 547#ifdef ALWAYS_ZERO_BUFFERS
b9ae18aa 548 memset(&M, '\0', sizeof(M));
549#endif
550
551 x = msgrcv(rmsgid, &M, diomsg::msg_snd_rcv_sz, 0, IPC_NOWAIT);
552
553 if (x < 0)
554 break;
555 else if (x != diomsg::msg_snd_rcv_sz) {
e0236918 556 debugs(47, DBG_IMPORTANT, "storeDiskdDirCallback: msgget returns " << x);
b9ae18aa 557 break;
558 }
559
cb4185f1 560 ++diskd_stats.recv_count;
b9ae18aa 561 --away;
562 handle(&M);
f53969cc 563 retval = 1; /* Return that we've actually done some work */
b9ae18aa 564
565 if (M.shm_offset > -1)
566 shm.put ((off_t) M.shm_offset);
567 }
568
569 return retval;
570}
571
572void
573DiskdIOStrategy::statfs(StoreEntry & sentry)const
574{
575 storeAppendPrintf(&sentry, "Pending operations: %d\n", away);
576}
f53969cc 577