4 * DEBUG: section 79 Squid-side DISKD I/O functions.
5 * AUTHOR: Duane Wessels
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
33 * Copyright (c) 2003, Robert Collins <robertc@squid-cache.org>
41 #include "DiskdIOStrategy.h"
42 #include "ConfigOption.h"
43 #include "DiskIO/DiskFile.h"
44 #include "DiskdFile.h"
48 #include "SquidTime.h"
50 diskd_stats_t diskd_stats
;
52 size_t DiskdIOStrategy::nextInstanceID (0);
53 const int diomsg::msg_snd_rcv_sz
= sizeof(diomsg
) - sizeof(mtyp_t
);
56 DiskdIOStrategy::newInstance()
58 return ++nextInstanceID
;
62 DiskdIOStrategy::shedLoad()
65 * Fail on open() if there are too many requests queued.
69 debugs(79, 3, "storeDiskdIO::shedLoad: Shedding, too many requests away");
78 DiskdIOStrategy::load()
80 /* Calculate the storedir load relative to magic2 on a scale of 0 .. 1000 */
81 /* the parse function guarantees magic2 is positivie */
82 return away
* 1000 / magic2
;
86 DiskdIOStrategy::openFailed()
88 diskd_stats
.open_fail_queue_len
++;
92 DiskdIOStrategy::newFile(char const *path
)
99 return new DiskdFile (path
, this);
102 DiskdIOStrategy::DiskdIOStrategy() : magic1(64), magic2(72), away(0) , smsgid(-1), rmsgid(-1), wfd(-1) , instanceID(newInstance())
106 DiskdIOStrategy::unlinkFile(char const *path
)
109 /* Damn, we need to issue a sync unlink here :( */
110 debugs(79, 2, "storeDiskUnlink: Out of queue space, sync unlink");
122 /* We can attempt a diskd unlink */
129 buf
= (char *)shm
.get(&shm_offset
);
131 xstrncpy(buf
, path
, SHMBUF_BLKSZ
);
133 x
= send(_MQD_UNLINK
,
135 (StoreIOState::Pointer
)NULL
,
141 debugs(79, 1, "storeDiskdSend UNLINK: " << xstrerror());
142 ::unlink(buf
); /* XXX EWW! */
143 // shm.put (shm_offset);
146 diskd_stats
.unlink
.ops
++;
150 DiskdIOStrategy::init()
162 ikey
= (getpid() << 10) + (instanceID
<< 2);
164 smsgid
= msgget((key_t
) ikey
, 0700 | IPC_CREAT
);
167 debugs(50, 0, "storeDiskdInit: msgget: " << xstrerror());
168 fatal("msgget failed");
171 rmsgid
= msgget((key_t
) (ikey
+ 1), 0700 | IPC_CREAT
);
174 debugs(50, 0, "storeDiskdInit: msgget: " << xstrerror());
175 fatal("msgget failed");
178 shm
.init(ikey
, magic2
);
179 snprintf(skey1
, 32, "%d", ikey
);
180 snprintf(skey2
, 32, "%d", ikey
+ 1);
181 snprintf(skey3
, 32, "%d", ikey
+ 2);
187 localhost
.SetLocalhost();
188 pid
= ipcCreate(IPC_STREAM
,
189 Config
.Program
.diskd
,
198 fatalf("execl: %s", Config
.Program
.diskd
);
203 fd_note(wfd
, "squid -> diskd");
205 commSetTimeout(wfd
, -1, NULL
, NULL
);
207 commSetNonBlocking(wfd
);
209 comm_quick_poll_required();
213 * SHM manipulation routines
216 SharedMemory::put(ssize_t offset
)
220 assert(offset
< nbufs
* SHMBUF_BLKSZ
);
221 i
= offset
/ SHMBUF_BLKSZ
;
223 assert(CBIT_TEST(inuse_map
, i
));
224 CBIT_CLR(inuse_map
, i
);
225 --diskd_stats
.shmbuf_count
;
230 SharedMemory::get(ssize_t
* shm_offset
)
235 for (i
= 0; i
< nbufs
; i
++) {
236 if (CBIT_TEST(inuse_map
, i
))
239 CBIT_SET(inuse_map
, i
);
241 *shm_offset
= i
* SHMBUF_BLKSZ
;
243 aBuf
= buf
+ (*shm_offset
);
250 assert(aBuf
< buf
+ (nbufs
* SHMBUF_BLKSZ
));
251 diskd_stats
.shmbuf_count
++;
253 if (diskd_stats
.max_shmuse
< diskd_stats
.shmbuf_count
)
254 diskd_stats
.max_shmuse
= diskd_stats
.shmbuf_count
;
260 SharedMemory::init(int ikey
, int magic2
)
262 nbufs
= (int)(magic2
* 1.3);
263 id
= shmget((key_t
) (ikey
+ 2),
264 nbufs
* SHMBUF_BLKSZ
, 0600 | IPC_CREAT
);
267 debugs(50, 0, "storeDiskdInit: shmget: " << xstrerror());
268 fatal("shmget failed");
271 buf
= (char *)shmat(id
, NULL
, 0);
273 if (buf
== (void *) -1) {
274 debugs(50, 0, "storeDiskdInit: shmat: " << xstrerror());
275 fatal("shmat failed");
278 inuse_map
= (char *)xcalloc((nbufs
+ 7) / 8, 1);
279 diskd_stats
.shmbuf_count
+= nbufs
;
281 for (int i
= 0; i
< nbufs
; i
++) {
282 CBIT_SET(inuse_map
, i
);
283 put (i
* SHMBUF_BLKSZ
);
288 DiskdIOStrategy::unlinkDone(diomsg
* M
)
290 debugs(79, 3, "storeDiskdUnlinkDone: file " << shm
.buf
+ M
->shm_offset
<< " status " << M
->status
);
291 statCounter
.syscalls
.disk
.unlinks
++;
294 diskd_stats
.unlink
.fail
++;
296 diskd_stats
.unlink
.success
++;
300 DiskdIOStrategy::handle(diomsg
* M
)
302 if (!cbdataReferenceValid (M
->callback_data
)) {
303 /* I.e. already closed file
304 * - say when we have a error opening after
305 * a read was already queued
307 debugs(79, 3, "storeDiskdHandle: Invalid callback_data " << M
->callback_data
);
308 cbdataReferenceDone (M
->callback_data
);
313 /* set errno passed from diskd. makes debugging more meaningful */
318 DiskdFile
*theFile
= (DiskdFile
*)M
->callback_data
;
319 theFile
->RefCountDereference();
320 theFile
->completed (M
);
345 cbdataReferenceDone (M
->callback_data
);
349 DiskdIOStrategy::send(int mtype
, int id
, DiskdFile
*theFile
, size_t size
, off_t offset
, ssize_t shm_offset
, RefCountable_
*requestor
)
352 M
.callback_data
= cbdataReference(theFile
);
353 theFile
->RefCountReference();
354 M
.requestor
= requestor
;
358 requestor
->RefCountReference();
360 return SEND(&M
, mtype
, id
, size
, offset
, shm_offset
);
364 DiskdIOStrategy::send(int mtype
, int id
, RefCount
<StoreIOState
> sio
, size_t size
, off_t offset
, ssize_t shm_offset
)
367 M
.callback_data
= cbdataReference(sio
.getRaw());
370 return SEND(&M
, mtype
, id
, size
, offset
, shm_offset
);
374 DiskdIOStrategy::SEND(diomsg
*M
, int mtype
, int id
, size_t size
, off_t offset
, ssize_t shm_offset
)
376 static int send_errors
= 0;
377 static int last_seq_no
= 0;
378 static int seq_no
= 0;
385 M
->shm_offset
= (int) shm_offset
;
387 M
->seq_no
= ++seq_no
;
389 if (M
->seq_no
< last_seq_no
)
390 debugs(79, 1, "WARNING: sequencing out of order");
392 x
= msgsnd(smsgid
, M
, diomsg::msg_snd_rcv_sz
, IPC_NOWAIT
);
394 last_seq_no
= M
->seq_no
;
397 diskd_stats
.sent_count
++;
400 debugs(79, 1, "storeDiskdSend: msgsnd: " << xstrerror());
401 cbdataReferenceDone(M
->callback_data
);
402 assert(++send_errors
< 100);
408 * We have to drain the queue here if necessary. If we don't,
409 * then we can have a lot of messages in the queue (probably
410 * up to 2*magic1) and we can run out of shared memory buffers.
413 * Note that we call Store::Root().callbackk (for all SDs), rather
414 * than callback for just this SD, so that while
415 * we're "blocking" on this SD we can also handle callbacks
416 * from other SDs that might be ready.
419 struct timeval delay
= {0, 1};
421 while (away
> magic2
) {
422 select(0, NULL
, NULL
, NULL
, &delay
);
423 Store::Root().callback();
425 if (delay
.tv_usec
< 1000000)
433 DiskdIOStrategy::getOptionTree() const
435 ConfigOptionVector
*result
= new ConfigOptionVector
;
436 result
->options
.push_back(new ConfigOptionAdapter
<DiskdIOStrategy
>(*const_cast<DiskdIOStrategy
*>(this), &DiskdIOStrategy::optionQ1Parse
, &DiskdIOStrategy::optionQ1Dump
));
437 result
->options
.push_back(new ConfigOptionAdapter
<DiskdIOStrategy
>(*const_cast<DiskdIOStrategy
*>(this), &DiskdIOStrategy::optionQ2Parse
, &DiskdIOStrategy::optionQ2Dump
));
442 DiskdIOStrategy::optionQ1Parse(const char *name
, const char *value
, int reconfiguring
)
444 if (strcmp(name
, "Q1") != 0)
447 int old_magic1
= magic1
;
449 magic1
= atoi(value
);
454 if (old_magic1
< magic1
) {
456 * This is because shm.nbufs is computed at startup, when
457 * we call shmget(). We can't increase the Q1/Q2 parameters
458 * beyond their initial values because then we might have
459 * more "Q2 messages" than shared memory chunks, and this
460 * will cause an assertion in storeDiskdShmGet().
462 /* TODO: have DiskdIO hold a link to the swapdir, to allow detailed reporting again */
463 debugs(3, 1, "WARNING: cannot increase cache_dir Q1 value while Squid is running.");
468 if (old_magic1
!= magic1
)
469 debugs(3, 1, "cache_dir new Q1 value '" << magic1
<< "'");
475 DiskdIOStrategy::optionQ1Dump(StoreEntry
* e
) const
477 storeAppendPrintf(e
, " Q1=%d", magic1
);
481 DiskdIOStrategy::optionQ2Parse(const char *name
, const char *value
, int reconfiguring
)
483 if (strcmp(name
, "Q2") != 0)
486 int old_magic2
= magic2
;
488 magic2
= atoi(value
);
493 if (old_magic2
< magic2
) {
494 /* See comments in Q1 function above */
495 debugs(3, 1, "WARNING: cannot increase cache_dir Q2 value while Squid is running.");
500 if (old_magic2
!= magic2
)
501 debugs(3, 1, "cache_dir new Q2 value '" << magic2
<< "'");
507 DiskdIOStrategy::optionQ2Dump(StoreEntry
* e
) const
509 storeAppendPrintf(e
, " Q2=%d", magic2
);
513 * Sync any pending data. We just sit around and read the queue
514 * until the data has finished writing.
517 DiskdIOStrategy::sync()
519 static time_t lastmsg
= 0;
522 if (squid_curtime
> lastmsg
) {
523 debugs(47, 1, "storeDiskdDirSync: " << away
<< " messages away");
524 lastmsg
= squid_curtime
;
533 * Handle callbacks. If we have more than magic2 requests away, we block
534 * until the queue is below magic2. Otherwise, we simply return when we
535 * don't get a message.
539 DiskdIOStrategy::callback()
545 if (away
>= magic2
) {
546 diskd_stats
.block_queue_len
++;
548 /* We might not have anything to do, but our queue
552 if (diskd_stats
.sent_count
- diskd_stats
.recv_count
>
553 diskd_stats
.max_away
) {
554 diskd_stats
.max_away
= diskd_stats
.sent_count
- diskd_stats
.recv_count
;
558 #ifdef ALWAYS_ZERO_BUFFERS
559 memset(&M
, '\0', sizeof(M
));
562 x
= msgrcv(rmsgid
, &M
, diomsg::msg_snd_rcv_sz
, 0, IPC_NOWAIT
);
566 else if (x
!= diomsg::msg_snd_rcv_sz
) {
567 debugs(47, 1, "storeDiskdDirCallback: msgget returns " << x
);
571 diskd_stats
.recv_count
++;
574 retval
= 1; /* Return that we've actually done some work */
576 if (M
.shm_offset
> -1)
577 shm
.put ((off_t
) M
.shm_offset
);
584 DiskdIOStrategy::statfs(StoreEntry
& sentry
)const
586 storeAppendPrintf(&sentry
, "Pending operations: %d\n", away
);