2 * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 79 Squid-side DISKD I/O functions. */
12 #include "comm/Loops.h"
13 #include "ConfigOption.h"
15 #include "DiskdFile.h"
16 #include "DiskdIOStrategy.h"
17 #include "DiskIO/DiskFile.h"
19 #include "SquidConfig.h"
21 #include "SquidTime.h"
22 #include "StatCounters.h"
37 diskd_stats_t diskd_stats
;
39 size_t DiskdIOStrategy::nextInstanceID (0);
40 const int diomsg::msg_snd_rcv_sz
= sizeof(diomsg
) - sizeof(mtyp_t
);
43 DiskdIOStrategy::newInstance()
45 return ++nextInstanceID
;
49 DiskdIOStrategy::shedLoad()
52 * Fail on open() if there are too many requests queued.
56 debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
65 DiskdIOStrategy::load()
67 /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
68 /* the parse function guarantees magic2 is positivie */
69 return away
* 1000 / magic2
;
73 DiskdIOStrategy::openFailed()
75 ++diskd_stats
.open_fail_queue_len
;
79 DiskdIOStrategy::newFile(char const *path
)
86 return new DiskdFile (path
, this);
89 DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0) , smsgid(-1), rmsgid(-1), wfd(-1) , instanceID(newInstance())
93 DiskdIOStrategy::unlinkdUseful() const
99 DiskdIOStrategy::unlinkFile(char const *path
)
102 /* Damn, we need to issue a sync unlink here :( */
103 debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
108 /* We can attempt a diskd unlink */
115 buf
= (char *)shm
.get(&shm_offset
);
117 xstrncpy(buf
, path
, SHMBUF_BLKSZ
);
119 x
= send(_MQD_UNLINK
,
121 (StoreIOState::Pointer
)NULL
,
128 debugs(79, DBG_IMPORTANT
, "storeDiskdSend UNLINK: " << xstrerr(xerrno
));
129 ::unlink(buf
); /* XXX EWW! */
130 // shm.put (shm_offset);
133 ++diskd_stats
.unlink
.ops
;
137 DiskdIOStrategy::init()
147 Ip::Address localhost
;
149 ikey
= (getpid() << 10) + (instanceID
<< 2);
151 smsgid
= msgget((key_t
) ikey
, 0700 | IPC_CREAT
);
155 debugs(50, DBG_CRITICAL
, MYNAME
<< "msgget: " << xstrerr(xerrno
));
156 fatal("msgget failed");
159 rmsgid
= msgget((key_t
) (ikey
+ 1), 0700 | IPC_CREAT
);
163 debugs(50, DBG_CRITICAL
, MYNAME
<< "msgget: " << xstrerr(xerrno
));
164 fatal("msgget failed");
167 shm
.init(ikey
, magic2
);
168 snprintf(skey1
, 32, "%d", ikey
);
169 snprintf(skey2
, 32, "%d", ikey
+ 1);
170 snprintf(skey3
, 32, "%d", ikey
+ 2);
176 localhost
.setLocalhost();
177 pid
= ipcCreate(IPC_STREAM
,
178 Config
.Program
.diskd
,
187 fatalf("execl: %s", Config
.Program
.diskd
);
192 fd_note(wfd
, "squid -> diskd");
194 commUnsetFdTimeout(wfd
);
195 commSetNonBlocking(wfd
);
196 Comm::QuickPollRequired();
200 * SHM manipulation routines
203 SharedMemory::put(ssize_t offset
)
207 assert(offset
< nbufs
* SHMBUF_BLKSZ
);
208 i
= offset
/ SHMBUF_BLKSZ
;
210 assert(CBIT_TEST(inuse_map
, i
));
211 CBIT_CLR(inuse_map
, i
);
212 --diskd_stats
.shmbuf_count
;
217 SharedMemory::get(ssize_t
* shm_offset
)
222 for (i
= 0; i
< nbufs
; ++i
) {
223 if (CBIT_TEST(inuse_map
, i
))
226 CBIT_SET(inuse_map
, i
);
228 *shm_offset
= i
* SHMBUF_BLKSZ
;
230 aBuf
= buf
+ (*shm_offset
);
237 assert(aBuf
< buf
+ (nbufs
* SHMBUF_BLKSZ
));
238 ++diskd_stats
.shmbuf_count
;
240 if (diskd_stats
.max_shmuse
< diskd_stats
.shmbuf_count
)
241 diskd_stats
.max_shmuse
= diskd_stats
.shmbuf_count
;
247 SharedMemory::init(int ikey
, int magic2
)
249 nbufs
= (int)(magic2
* 1.3);
250 id
= shmget((key_t
) (ikey
+ 2),
251 nbufs
* SHMBUF_BLKSZ
, 0600 | IPC_CREAT
);
255 debugs(50, DBG_CRITICAL
, MYNAME
<< "shmget: " << xstrerr(xerrno
));
256 fatal("shmget failed");
259 buf
= (char *)shmat(id
, NULL
, 0);
261 if (buf
== (void *) -1) {
263 debugs(50, DBG_CRITICAL
, MYNAME
<< "shmat: " << xstrerr(xerrno
));
264 fatal("shmat failed");
267 inuse_map
= (char *)xcalloc((nbufs
+ 7) / 8, 1);
268 diskd_stats
.shmbuf_count
+= nbufs
;
270 for (int i
= 0; i
< nbufs
; ++i
) {
271 CBIT_SET(inuse_map
, i
);
272 put (i
* SHMBUF_BLKSZ
);
277 DiskdIOStrategy::unlinkDone(diomsg
* M
)
279 debugs(79, 3, "storeDiskdUnlinkDone: file " << shm
.buf
+ M
->shm_offset
<< " status " << M
->status
);
280 ++statCounter
.syscalls
.disk
.unlinks
;
283 ++diskd_stats
.unlink
.fail
;
285 ++diskd_stats
.unlink
.success
;
289 DiskdIOStrategy::handle(diomsg
* M
)
291 if (!cbdataReferenceValid (M
->callback_data
)) {
292 /* I.e. already closed file
293 * - say when we have a error opening after
294 * a read was already queued
296 debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M
->callback_data
);
297 cbdataReferenceDone (M
->callback_data
);
301 /* set errno passed from diskd. makes debugging more meaningful */
306 DiskdFile
*theFile
= (DiskdFile
*)M
->callback_data
;
308 theFile
->completed (M
);
333 cbdataReferenceDone (M
->callback_data
);
337 DiskdIOStrategy::send(int mtype
, int id
, DiskdFile
*theFile
, size_t size
, off_t offset
, ssize_t shm_offset
, Lock
*requestor
)
340 M
.callback_data
= cbdataReference(theFile
);
342 M
.requestor
= requestor
;
348 return SEND(&M
, mtype
, id
, size
, offset
, shm_offset
);
352 DiskdIOStrategy::send(int mtype
, int id
, RefCount
<StoreIOState
> sio
, size_t size
, off_t offset
, ssize_t shm_offset
)
355 M
.callback_data
= cbdataReference(sio
.getRaw());
358 return SEND(&M
, mtype
, id
, size
, offset
, shm_offset
);
362 DiskdIOStrategy::SEND(diomsg
*M
, int mtype
, int id
, size_t size
, off_t offset
, ssize_t shm_offset
)
364 static int send_errors
= 0;
365 static int last_seq_no
= 0;
366 static int seq_no
= 0;
373 M
->shm_offset
= (int) shm_offset
;
375 M
->seq_no
= ++seq_no
;
377 if (M
->seq_no
< last_seq_no
)
378 debugs(79, DBG_IMPORTANT
, "WARNING: sequencing out of order");
380 x
= msgsnd(smsgid
, M
, diomsg::msg_snd_rcv_sz
, IPC_NOWAIT
);
382 last_seq_no
= M
->seq_no
;
385 ++diskd_stats
.sent_count
;
389 debugs(79, DBG_IMPORTANT
, MYNAME
<< "msgsnd: " << xstrerr(xerrno
));
390 cbdataReferenceDone(M
->callback_data
);
392 assert(send_errors
< 100);
398 * We have to drain the queue here if necessary. If we don't,
399 * then we can have a lot of messages in the queue (probably
400 * up to 2*magic1) and we can run out of shared memory buffers.
403 * Note that we call Store::Root().callbackk (for all SDs), rather
404 * than callback for just this SD, so that while
405 * we're "blocking" on this SD we can also handle callbacks
406 * from other SDs that might be ready.
409 struct timeval delay
= {0, 1};
411 while (away
> magic2
) {
412 select(0, NULL
, NULL
, NULL
, &delay
);
413 Store::Root().callback();
415 if (delay
.tv_usec
< 1000000)
423 DiskdIOStrategy::getOptionTree() const
425 ConfigOptionVector
*result
= new ConfigOptionVector
;
426 result
->options
.push_back(new ConfigOptionAdapter
<DiskdIOStrategy
>(*const_cast<DiskdIOStrategy
*>(this), &DiskdIOStrategy::optionQ1Parse
, &DiskdIOStrategy::optionQ1Dump
));
427 result
->options
.push_back(new ConfigOptionAdapter
<DiskdIOStrategy
>(*const_cast<DiskdIOStrategy
*>(this), &DiskdIOStrategy::optionQ2Parse
, &DiskdIOStrategy::optionQ2Dump
));
432 DiskdIOStrategy::optionQ1Parse(const char *name
, const char *value
, int isaReconfig
)
434 if (strcmp(name
, "Q1") != 0)
437 int old_magic1
= magic1
;
439 magic1
= atoi(value
);
444 if (old_magic1
< magic1
) {
446 * This is because shm.nbufs is computed at startup, when
447 * we call shmget(). We can't increase the Q1/Q2 parameters
448 * beyond their initial values because then we might have
449 * more "Q2 messages" than shared memory chunks, and this
450 * will cause an assertion in storeDiskdShmGet().
452 /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
453 debugs(3, DBG_IMPORTANT
, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
458 if (old_magic1
!= magic1
)
459 debugs(3, DBG_IMPORTANT
, "cache_dir new Q1 value '" << magic1
<< "'");
465 DiskdIOStrategy::optionQ1Dump(StoreEntry
* e
) const
467 storeAppendPrintf(e
, " Q1=%d", magic1
);
471 DiskdIOStrategy::optionQ2Parse(const char *name
, const char *value
, int isaReconfig
)
473 if (strcmp(name
, "Q2") != 0)
476 int old_magic2
= magic2
;
478 magic2
= atoi(value
);
483 if (old_magic2
< magic2
) {
484 /* See comments in Q1 function above */
485 debugs(3, DBG_IMPORTANT
, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
490 if (old_magic2
!= magic2
)
491 debugs(3, DBG_IMPORTANT
, "cache_dir new Q2 value '" << magic2
<< "'");
497 DiskdIOStrategy::optionQ2Dump(StoreEntry
* e
) const
499 storeAppendPrintf(e
, " Q2=%d", magic2
);
503 * Sync any pending data. We just sit around and read the queue
504 * until the data has finished writing.
507 DiskdIOStrategy::sync()
509 static time_t lastmsg
= 0;
512 if (squid_curtime
> lastmsg
) {
513 debugs(47, DBG_IMPORTANT
, "storeDiskdDirSync: " << away
<< " messages away");
514 lastmsg
= squid_curtime
;
522 * Handle callbacks. If we have more than magic2 requests away, we block
523 * until the queue is below magic2. Otherwise, we simply return when we
524 * don't get a message.
528 DiskdIOStrategy::callback()
534 if (away
>= magic2
) {
535 ++diskd_stats
.block_queue_len
;
537 /* We might not have anything to do, but our queue
541 if (diskd_stats
.sent_count
- diskd_stats
.recv_count
>
542 diskd_stats
.max_away
) {
543 diskd_stats
.max_away
= diskd_stats
.sent_count
- diskd_stats
.recv_count
;
547 #ifdef ALWAYS_ZERO_BUFFERS
548 memset(&M
, '\0', sizeof(M
));
551 x
= msgrcv(rmsgid
, &M
, diomsg::msg_snd_rcv_sz
, 0, IPC_NOWAIT
);
555 else if (x
!= diomsg::msg_snd_rcv_sz
) {
556 debugs(47, DBG_IMPORTANT
, "storeDiskdDirCallback: msgget returns " << x
);
560 ++diskd_stats
.recv_count
;
563 retval
= 1; /* Return that we've actually done some work */
565 if (M
.shm_offset
> -1)
566 shm
.put ((off_t
) M
.shm_offset
);
573 DiskdIOStrategy::statfs(StoreEntry
& sentry
)const
575 storeAppendPrintf(&sentry
, "Pending operations: %d\n", away
);