4 * DEBUG: section 47 Store Directory Routines
8 #include "base/RunnersRegistry.h"
9 #include "base/TextException.h"
10 #include "DiskIO/IORequestor.h"
11 #include "DiskIO/IpcIo/IpcIoFile.h"
12 #include "DiskIO/ReadRequest.h"
13 #include "DiskIO/WriteRequest.h"
15 #include "ipc/mem/Pages.h"
16 #include "ipc/Messages.h"
18 #include "ipc/Queue.h"
19 #include "ipc/StrandSearch.h"
20 #include "ipc/UdsOp.h"
22 #include "SquidTime.h"
23 #include "StatCounters.h"
28 CBDATA_CLASS_INIT(IpcIoFile
);
30 /// shared memory segment path to use for IpcIoFile maps
31 static const char *const ShmLabel
= "io_file";
32 /// a single worker-to-disker or disker-to-worker queue capacity; up
33 /// to 2*QueueCapacity I/O requests queued between a single worker and
35 // TODO: make configurable or compute from squid.conf settings if possible
36 static const int QueueCapacity
= 1024;
38 const double IpcIoFile::Timeout
= 7; // seconds; XXX: ALL,9 may require more
39 IpcIoFile::IpcIoFileList
IpcIoFile::WaitingForOpen
;
40 IpcIoFile::IpcIoFilesMap
IpcIoFile::IpcIoFiles
;
41 std::auto_ptr
<IpcIoFile::Queue
> IpcIoFile::queue
;
43 bool IpcIoFile::DiskerHandleMoreRequestsScheduled
= false;
45 static bool DiskerOpen(const String
&path
, int flags
, mode_t mode
);
46 static void DiskerClose(const String
&path
);
48 /// IpcIo wrapper for debugs() streams; XXX: find a better class name
50 SipcIo(int aWorker
, const IpcIoMsg
&aMsg
, int aDisker
):
51 worker(aWorker
), msg(aMsg
), disker(aDisker
) {}
59 operator <<(std::ostream
&os
, const SipcIo
&sio
)
61 return os
<< "ipcIo" << sio
.worker
<< '.' << sio
.msg
.requestId
<<
62 (sio
.msg
.command
== IpcIo::cmdRead
? 'r' : 'w') << sio
.disker
;
65 IpcIoFile::IpcIoFile(char const *aDb
):
66 dbName(aDb
), diskId(-1), error_(false), lastRequestId(0),
67 olderRequests(&requestMap1
), newerRequests(&requestMap2
),
68 timeoutCheckScheduled(false)
72 IpcIoFile::~IpcIoFile()
75 const IpcIoFilesMap::iterator i
= IpcIoFiles
.find(diskId
);
76 // XXX: warn and continue?
77 Must(i
!= IpcIoFiles
.end());
78 Must(i
->second
== this);
84 IpcIoFile::configure(const Config
&cfg
)
86 DiskFile::configure(cfg
);
91 IpcIoFile::open(int flags
, mode_t mode
, RefCount
<IORequestor
> callback
)
93 ioRequestor
= callback
;
94 Must(diskId
< 0); // we do not know our disker yet
97 queue
.reset(new Queue(ShmLabel
, IamWorkerProcess() ? Queue::groupA
: Queue::groupB
, KidIdentifier
));
99 if (IamDiskProcess()) {
100 error_
= !DiskerOpen(dbName
, flags
, mode
);
104 diskId
= KidIdentifier
;
105 const bool inserted
=
106 IpcIoFiles
.insert(std::make_pair(diskId
, this)).second
;
109 queue
->localRateLimit() =
110 static_cast<Ipc::QueueReader::Rate::Value
>(config
.ioRate
);
112 Ipc::HereIamMessage
ann(Ipc::StrandCoord(KidIdentifier
, getpid()));
113 ann
.strand
.tag
= dbName
;
114 Ipc::TypedMsgHdr message
;
116 SendMessage(Ipc::coordinatorAddr
, message
);
118 ioRequestor
->ioCompletedNotification();
122 Ipc::StrandSearchRequest request
;
123 request
.requestorId
= KidIdentifier
;
124 request
.tag
= dbName
;
126 Ipc::TypedMsgHdr msg
;
128 Ipc::SendMessage(Ipc::coordinatorAddr
, msg
);
130 WaitingForOpen
.push_back(this);
132 eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout
,
133 this, Timeout
, 0, false); // "this" pointer is used as id
137 IpcIoFile::openCompleted(const Ipc::StrandSearchResponse
*const response
)
139 Must(diskId
< 0); // we do not know our disker yet
142 debugs(79, DBG_IMPORTANT
, HERE
<< "error: timeout");
145 diskId
= response
->strand
.kidId
;
147 const bool inserted
=
148 IpcIoFiles
.insert(std::make_pair(diskId
, this)).second
;
152 debugs(79, DBG_IMPORTANT
, HERE
<< "error: no disker claimed " << dbName
);
156 ioRequestor
->ioCompletedNotification();
160 * Alias for IpcIoFile::open(...)
161 \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
164 IpcIoFile::create(int flags
, mode_t mode
, RefCount
<IORequestor
> callback
)
166 assert(false); // check
167 /* We use the same logic path for open */
168 open(flags
, mode
, callback
);
174 assert(ioRequestor
!= NULL
);
176 if (IamDiskProcess())
178 // XXX: else nothing to do?
180 ioRequestor
->closeCompleted();
184 IpcIoFile::canRead() const
186 return diskId
>= 0 && canWait();
190 IpcIoFile::canWrite() const
192 return diskId
>= 0 && canWait();
196 IpcIoFile::error() const
202 IpcIoFile::read(ReadRequest
*readRequest
)
204 debugs(79,3, HERE
<< "(disker" << diskId
<< ", " << readRequest
->len
<< ", " <<
205 readRequest
->offset
<< ")");
207 assert(ioRequestor
!= NULL
);
208 assert(readRequest
->offset
>= 0);
211 //assert(minOffset < 0 || minOffset <= readRequest->offset);
212 //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
214 IpcIoPendingRequest
*const pending
= new IpcIoPendingRequest(this);
215 pending
->readRequest
= readRequest
;
220 IpcIoFile::readCompleted(ReadRequest
*readRequest
,
221 IpcIoMsg
*const response
)
223 bool ioError
= false;
225 debugs(79, 3, HERE
<< "error: timeout");
226 ioError
= true; // I/O timeout does not warrant setting error_?
228 if (response
->xerrno
) {
229 debugs(79, DBG_IMPORTANT
, HERE
<< "error: " << xstrerr(response
->xerrno
));
230 ioError
= error_
= true;
231 } else if (!response
->page
) {
232 debugs(79, DBG_IMPORTANT
, HERE
<< "error: run out of shared memory pages");
235 const char *const buf
= Ipc::Mem::PagePointer(response
->page
);
236 memcpy(readRequest
->buf
, buf
, response
->len
);
239 Ipc::Mem::PutPage(response
->page
);
242 const ssize_t rlen
= ioError
? -1 : (ssize_t
)readRequest
->len
;
243 const int errflag
= ioError
? DISK_ERROR
:DISK_OK
;
244 ioRequestor
->readCompleted(readRequest
->buf
, rlen
, errflag
, readRequest
);
248 IpcIoFile::write(WriteRequest
*writeRequest
)
250 debugs(79,3, HERE
<< "(disker" << diskId
<< ", " << writeRequest
->len
<< ", " <<
251 writeRequest
->offset
<< ")");
253 assert(ioRequestor
!= NULL
);
254 assert(writeRequest
->len
> 0); // TODO: work around mmap failures on zero-len?
255 assert(writeRequest
->offset
>= 0);
258 //assert(minOffset < 0 || minOffset <= writeRequest->offset);
259 //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
261 IpcIoPendingRequest
*const pending
= new IpcIoPendingRequest(this);
262 pending
->writeRequest
= writeRequest
;
267 IpcIoFile::writeCompleted(WriteRequest
*writeRequest
,
268 const IpcIoMsg
*const response
)
270 bool ioError
= false;
272 debugs(79, 3, HERE
<< "error: timeout");
273 ioError
= true; // I/O timeout does not warrant setting error_?
274 } else if (response
->xerrno
) {
275 debugs(79, DBG_IMPORTANT
, HERE
<< "error: " << xstrerr(response
->xerrno
));
276 ioError
= error_
= true;
277 } else if (response
->len
!= writeRequest
->len
) {
278 debugs(79, DBG_IMPORTANT
, HERE
<< "problem: " << response
->len
<< " < " << writeRequest
->len
);
282 if (writeRequest
->free_func
)
283 (writeRequest
->free_func
)(const_cast<char*>(writeRequest
->buf
)); // broken API?
286 debugs(79,5, HERE
<< "wrote " << writeRequest
->len
<< " to disker" <<
287 diskId
<< " at " << writeRequest
->offset
);
290 const ssize_t rlen
= ioError
? 0 : (ssize_t
)writeRequest
->len
;
291 const int errflag
= ioError
? DISK_ERROR
:DISK_OK
;
292 ioRequestor
->writeCompleted(errflag
, rlen
, writeRequest
);
296 IpcIoFile::ioInProgress() const
298 return !olderRequests
->empty() || !newerRequests
->empty();
301 /// track a new pending request
303 IpcIoFile::trackPendingRequest(IpcIoPendingRequest
*const pending
)
305 newerRequests
->insert(std::make_pair(lastRequestId
, pending
));
306 if (!timeoutCheckScheduled
)
307 scheduleTimeoutCheck();
310 /// push an I/O request to disker
312 IpcIoFile::push(IpcIoPendingRequest
*const pending
)
314 // prevent queue overflows: check for responses to earlier requests
315 HandleResponses("before push");
320 Must(pending
->readRequest
|| pending
->writeRequest
);
324 ipcIo
.requestId
= lastRequestId
;
325 ipcIo
.start
= current_time
;
326 if (pending
->readRequest
) {
327 ipcIo
.command
= IpcIo::cmdRead
;
328 ipcIo
.offset
= pending
->readRequest
->offset
;
329 ipcIo
.len
= pending
->readRequest
->len
;
330 } else { // pending->writeRequest
331 Must(pending
->writeRequest
->len
<= Ipc::Mem::PageSize());
332 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage
, ipcIo
.page
)) {
334 throw TexcHere("run out of shared memory pages for IPC I/O");
336 ipcIo
.command
= IpcIo::cmdWrite
;
337 ipcIo
.offset
= pending
->writeRequest
->offset
;
338 ipcIo
.len
= pending
->writeRequest
->len
;
339 char *const buf
= Ipc::Mem::PagePointer(ipcIo
.page
);
340 memcpy(buf
, pending
->writeRequest
->buf
, ipcIo
.len
); // optimize away
343 debugs(47, 7, HERE
<< "pushing " << SipcIo(KidIdentifier
, ipcIo
, diskId
));
345 if (queue
->push(diskId
, ipcIo
))
346 Notify(diskId
); // must notify disker
347 trackPendingRequest(pending
);
348 } catch (const Queue::Full
&) {
349 debugs(47, DBG_IMPORTANT
, "Worker I/O push queue overflow: " <<
350 SipcIo(KidIdentifier
, ipcIo
, diskId
)); // TODO: report queue len
351 // TODO: grow queue size
353 pending
->completeIo(NULL
);
355 } catch (const TextException
&e
) {
356 debugs(47, DBG_IMPORTANT
, HERE
<< e
.what());
357 pending
->completeIo(NULL
);
362 /// whether we think there is enough time to complete the I/O
364 IpcIoFile::canWait() const
366 if (!config
.ioTimeout
)
367 return true; // no timeout specified
370 if (!queue
->findOldest(diskId
, oldestIo
) || oldestIo
.start
.tv_sec
<= 0)
371 return true; // we cannot estimate expected wait time; assume it is OK
373 const int oldestWait
= tvSubMsec(oldestIo
.start
, current_time
);
375 int rateWait
= -1; // time in millisecons
376 const Ipc::QueueReader::Rate::Value ioRate
= queue
->rateLimit(diskId
);
378 // if there are N requests pending, the new one will wait at
379 // least N/max-swap-rate seconds
380 rateWait
= static_cast<int>(1e3
* queue
->outSize(diskId
) / ioRate
);
381 // adjust N/max-swap-rate value based on the queue "balance"
382 // member, in case we have been borrowing time against future
384 rateWait
+= queue
->balance(diskId
);
387 const int expectedWait
= max(oldestWait
, rateWait
);
388 if (expectedWait
< 0 ||
389 static_cast<time_msec_t
>(expectedWait
) < config
.ioTimeout
)
390 return true; // expected wait time is acceptible
392 debugs(47,2, HERE
<< "cannot wait: " << expectedWait
<<
393 " oldest: " << SipcIo(KidIdentifier
, oldestIo
, diskId
));
394 return false; // do not want to wait that long
397 /// called when coordinator responds to worker open request
399 IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse
&response
)
401 debugs(47, 7, HERE
<< "coordinator response to open request");
402 for (IpcIoFileList::iterator i
= WaitingForOpen
.begin();
403 i
!= WaitingForOpen
.end(); ++i
) {
404 if (response
.strand
.tag
== (*i
)->dbName
) {
405 (*i
)->openCompleted(&response
);
406 WaitingForOpen
.erase(i
);
411 debugs(47, 4, HERE
<< "LATE disker response to open for " <<
412 response
.strand
.tag
);
413 // nothing we can do about it; completeIo() has been called already
417 IpcIoFile::HandleResponses(const char *const when
)
419 debugs(47, 4, HERE
<< "popping all " << when
);
421 // get all responses we can: since we are not pushing, this will stop
423 while (queue
->pop(diskId
, ipcIo
)) {
424 const IpcIoFilesMap::const_iterator i
= IpcIoFiles
.find(diskId
);
425 Must(i
!= IpcIoFiles
.end()); // TODO: warn but continue
426 i
->second
->handleResponse(ipcIo
);
431 IpcIoFile::handleResponse(IpcIoMsg
&ipcIo
)
433 const int requestId
= ipcIo
.requestId
;
434 debugs(47, 7, HERE
<< "popped disker response: " <<
435 SipcIo(KidIdentifier
, ipcIo
, diskId
));
438 if (IpcIoPendingRequest
*const pending
= dequeueRequest(requestId
)) {
439 pending
->completeIo(&ipcIo
);
440 delete pending
; // XXX: leaking if throwing
442 debugs(47, 4, HERE
<< "LATE disker response to " << ipcIo
.command
<<
443 "; ipcIo" << KidIdentifier
<< '.' << requestId
);
444 // nothing we can do about it; completeIo() has been called already
449 IpcIoFile::Notify(const int peerId
)
451 // TODO: Count and report the total number of notifications, pops, pushes.
452 debugs(47, 7, HERE
<< "kid" << peerId
);
453 Ipc::TypedMsgHdr msg
;
454 msg
.setType(Ipc::mtIpcIoNotification
); // TODO: add proper message type?
455 msg
.putInt(KidIdentifier
);
456 const String addr
= Ipc::Port::MakeAddr(Ipc::strandAddrPfx
, peerId
);
457 Ipc::SendMessage(addr
, msg
);
461 IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr
&msg
)
463 const int from
= msg
.getInt();
464 debugs(47, 7, HERE
<< "from " << from
);
465 queue
->clearReaderSignal(from
);
466 if (IamDiskProcess())
467 DiskerHandleRequests();
469 HandleResponses("after notification");
472 /// handles open request timeout
474 IpcIoFile::OpenTimeout(void *const param
)
477 // the pointer is used for comparison only and not dereferenced
478 const IpcIoFile
*const ipcIoFile
=
479 reinterpret_cast<const IpcIoFile
*>(param
);
480 for (IpcIoFileList::iterator i
= WaitingForOpen
.begin();
481 i
!= WaitingForOpen
.end(); ++i
) {
482 if (*i
== ipcIoFile
) {
483 (*i
)->openCompleted(NULL
);
484 WaitingForOpen
.erase(i
);
490 /// IpcIoFile::checkTimeouts wrapper
492 IpcIoFile::CheckTimeouts(void *const param
)
495 const int diskId
= reinterpret_cast<uintptr_t>(param
);
496 debugs(47, 7, HERE
<< "diskId=" << diskId
);
497 const IpcIoFilesMap::const_iterator i
= IpcIoFiles
.find(diskId
);
498 if (i
!= IpcIoFiles
.end())
499 i
->second
->checkTimeouts();
503 IpcIoFile::checkTimeouts()
505 timeoutCheckScheduled
= false;
507 // last chance to recover in case a notification message was lost, etc.
508 const RequestMap::size_type timeoutsBefore
= olderRequests
->size();
509 HandleResponses("before timeout");
510 const RequestMap::size_type timeoutsNow
= olderRequests
->size();
512 if (timeoutsBefore
> timeoutsNow
) { // some requests were rescued
513 // notification message lost or significantly delayed?
514 debugs(47, DBG_IMPORTANT
, "WARNING: communication with disker " <<
515 "may be too slow or disrupted for about " <<
516 Timeout
<< "s; rescued " << (timeoutsBefore
- timeoutsNow
) <<
517 " out of " << timeoutsBefore
<< " I/Os");
521 debugs(47, DBG_IMPORTANT
, "WARNING: abandoning " <<
522 timeoutsNow
<< " I/Os after at least " <<
523 Timeout
<< "s timeout");
526 // any old request would have timed out by now
527 typedef RequestMap::const_iterator RMCI
;
528 for (RMCI i
= olderRequests
->begin(); i
!= olderRequests
->end(); ++i
) {
529 IpcIoPendingRequest
*const pending
= i
->second
;
531 const unsigned int requestId
= i
->first
;
532 debugs(47, 7, HERE
<< "disker timeout; ipcIo" <<
533 KidIdentifier
<< '.' << requestId
);
535 pending
->completeIo(NULL
); // no response
536 delete pending
; // XXX: leaking if throwing
538 olderRequests
->clear();
540 swap(olderRequests
, newerRequests
); // switches pointers around
541 if (!olderRequests
->empty() && !timeoutCheckScheduled
)
542 scheduleTimeoutCheck();
545 /// prepare to check for timeouts in a little while
547 IpcIoFile::scheduleTimeoutCheck()
549 // we check all older requests at once so some may be wait for 2*Timeout
550 eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts
,
551 reinterpret_cast<void *>(diskId
), Timeout
, 0, false);
552 timeoutCheckScheduled
= true;
555 /// returns and forgets the right IpcIoFile pending request
556 IpcIoPendingRequest
*
557 IpcIoFile::dequeueRequest(const unsigned int requestId
)
559 Must(requestId
!= 0);
561 RequestMap
*map
= NULL
;
562 RequestMap::iterator i
= requestMap1
.find(requestId
);
564 if (i
!= requestMap1
.end())
567 i
= requestMap2
.find(requestId
);
568 if (i
!= requestMap2
.end())
572 if (!map
) // not found in both maps
575 IpcIoPendingRequest
*pending
= i
->second
;
581 IpcIoFile::getFD() const
583 assert(false); // not supported; TODO: remove this method from API
589 IpcIoMsg::IpcIoMsg():
590 requestId(0), offset(0), len(0), command(IpcIo::cmdNone
), xerrno(0)
595 /* IpcIoPendingRequest */
597 IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer
&aFile
):
598 file(aFile
), readRequest(NULL
), writeRequest(NULL
)
601 if (++file
->lastRequestId
== 0) // don't use zero value as requestId
602 ++file
->lastRequestId
;
606 IpcIoPendingRequest::completeIo(IpcIoMsg
*const response
)
609 file
->readCompleted(readRequest
, response
);
610 else if (writeRequest
)
611 file
->writeCompleted(writeRequest
, response
);
613 Must(!response
); // only timeouts are handled here
614 file
->openCompleted(NULL
);
618 /* XXX: disker code that should probably be moved elsewhere */
620 static int TheFile
= -1; ///< db file descriptor
623 diskerRead(IpcIoMsg
&ipcIo
)
625 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage
, ipcIo
.page
)) {
627 debugs(47,2, HERE
<< "run out of shared memory pages for IPC I/O");
631 char *const buf
= Ipc::Mem::PagePointer(ipcIo
.page
);
632 const ssize_t read
= pread(TheFile
, buf
, min(ipcIo
.len
, Ipc::Mem::PageSize()), ipcIo
.offset
);
633 ++statCounter
.syscalls
.disk
.reads
;
634 fd_bytes(TheFile
, read
, FD_READ
);
638 const size_t len
= static_cast<size_t>(read
); // safe because read > 0
639 debugs(47,8, HERE
<< "disker" << KidIdentifier
<< " read " <<
640 (len
== ipcIo
.len
? "all " : "just ") << read
);
643 ipcIo
.xerrno
= errno
;
645 debugs(47,5, HERE
<< "disker" << KidIdentifier
<< " read error: " <<
651 diskerWrite(IpcIoMsg
&ipcIo
)
653 const char *const buf
= Ipc::Mem::PagePointer(ipcIo
.page
);
654 const ssize_t wrote
= pwrite(TheFile
, buf
, min(ipcIo
.len
, Ipc::Mem::PageSize()), ipcIo
.offset
);
655 ++statCounter
.syscalls
.disk
.writes
;
656 fd_bytes(TheFile
, wrote
, FD_WRITE
);
660 const size_t len
= static_cast<size_t>(wrote
); // safe because wrote > 0
661 debugs(47,8, HERE
<< "disker" << KidIdentifier
<< " wrote " <<
662 (len
== ipcIo
.len
? "all " : "just ") << wrote
);
665 ipcIo
.xerrno
= errno
;
667 debugs(47,5, HERE
<< "disker" << KidIdentifier
<< " write error: " <<
671 Ipc::Mem::PutPage(ipcIo
.page
);
675 IpcIoFile::DiskerHandleMoreRequests(void *source
)
677 debugs(47, 7, HERE
<< "resuming handling requests after " <<
678 static_cast<const char *>(source
));
679 DiskerHandleMoreRequestsScheduled
= false;
680 IpcIoFile::DiskerHandleRequests();
684 IpcIoFile::WaitBeforePop()
686 const Ipc::QueueReader::Rate::Value ioRate
= queue
->localRateLimit();
687 const double maxRate
= ioRate
/1e3
; // req/ms
689 // do we need to enforce configured I/O rate?
693 // is there an I/O request we could potentially delay?
696 if (!queue
->peek(processId
, ipcIo
)) {
697 // unlike pop(), peek() is not reliable and does not block reader
698 // so we must proceed with pop() even if it is likely to fail
702 static timeval LastIo
= current_time
;
704 const double ioDuration
= 1.0 / maxRate
; // ideal distance between two I/Os
705 // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
706 const int64_t maxImbalance
= min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration
));
708 const double credit
= ioDuration
; // what the last I/O should have cost us
709 const double debit
= tvSubMsec(LastIo
, current_time
); // actual distance from the last I/O
710 LastIo
= current_time
;
712 Ipc::QueueReader::Balance
&balance
= queue
->localBalance();
713 balance
+= static_cast<int64_t>(credit
- debit
);
715 debugs(47, 7, HERE
<< "rate limiting balance: " << balance
<< " after +" << credit
<< " -" << debit
);
717 if (ipcIo
.command
== IpcIo::cmdWrite
&& balance
> maxImbalance
) {
718 // if the next request is (likely) write and we accumulated
719 // too much time for future slow I/Os, then shed accumulated
720 // time to keep just half of the excess
721 const int64_t toSpend
= balance
- maxImbalance
/2;
723 if (toSpend
/1e3
> Timeout
)
724 debugs(47, DBG_IMPORTANT
, "WARNING: Rock disker delays I/O " <<
725 "requests for " << (toSpend
/1e3
) << " seconds to obey " <<
726 ioRate
<< "/sec rate limit");
728 debugs(47, 3, HERE
<< "rate limiting by " << toSpend
<< " ms to get" <<
729 (1e3
*maxRate
) << "/sec rate");
730 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
731 &IpcIoFile::DiskerHandleMoreRequests
,
732 const_cast<char*>("rate limiting"),
733 toSpend
/1e3
, 0, false);
734 DiskerHandleMoreRequestsScheduled
= true;
736 } else if (balance
< -maxImbalance
) {
737 // do not owe "too much" to avoid "too large" bursts of I/O
738 balance
= -maxImbalance
;
745 IpcIoFile::DiskerHandleRequests()
747 // Balance our desire to maximize the number of concurrent I/O requests
748 // (reordred by OS to minimize seek time) with a requirement to
749 // send 1st-I/O notification messages, process Coordinator events, etc.
750 const int maxSpentMsec
= 10; // keep small: most RAM I/Os are under 1ms
751 const timeval loopStart
= current_time
;
756 while (!WaitBeforePop() && queue
->pop(workerId
, ipcIo
)) {
759 // at least one I/O per call is guaranteed if the queue is not empty
760 DiskerHandleRequest(workerId
, ipcIo
);
763 const double elapsedMsec
= tvSubMsec(loopStart
, current_time
);
764 if (elapsedMsec
> maxSpentMsec
|| elapsedMsec
< 0) {
765 if (!DiskerHandleMoreRequestsScheduled
) {
766 // the gap must be positive for select(2) to be given a chance
767 const double minBreakSecs
= 0.001;
768 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
769 &IpcIoFile::DiskerHandleMoreRequests
,
770 const_cast<char*>("long I/O loop"),
771 minBreakSecs
, 0, false);
772 DiskerHandleMoreRequestsScheduled
= true;
774 debugs(47, 3, HERE
<< "pausing after " << popped
<< " I/Os in " <<
775 elapsedMsec
<< "ms; " << (elapsedMsec
/popped
) << "ms per I/O");
780 // TODO: consider using O_DIRECT with "elevator" optimization where we pop
781 // requests first, then reorder the popped requests to optimize seek time,
782 // then do I/O, then take a break, and come back for the next set of I/O
786 /// called when disker receives an I/O request
788 IpcIoFile::DiskerHandleRequest(const int workerId
, IpcIoMsg
&ipcIo
)
790 if (ipcIo
.command
!= IpcIo::cmdRead
&& ipcIo
.command
!= IpcIo::cmdWrite
) {
791 debugs(0, DBG_CRITICAL
, HERE
<< "disker" << KidIdentifier
<<
792 " should not receive " << ipcIo
.command
<<
793 " ipcIo" << workerId
<< '.' << ipcIo
.requestId
);
797 debugs(47,5, HERE
<< "disker" << KidIdentifier
<<
798 (ipcIo
.command
== IpcIo::cmdRead
? " reads " : " writes ") <<
799 ipcIo
.len
<< " at " << ipcIo
.offset
<<
800 " ipcIo" << workerId
<< '.' << ipcIo
.requestId
);
802 if (ipcIo
.command
== IpcIo::cmdRead
)
804 else // ipcIo.command == IpcIo::cmdWrite
807 debugs(47, 7, HERE
<< "pushing " << SipcIo(workerId
, ipcIo
, KidIdentifier
));
810 if (queue
->push(workerId
, ipcIo
))
811 Notify(workerId
); // must notify worker
812 } catch (const Queue::Full
&) {
813 // The worker queue should not overflow because the worker should pop()
814 // before push()ing and because if disker pops N requests at a time,
815 // we should make sure the worker pop() queue length is the worker
816 // push queue length plus N+1. XXX: implement the N+1 difference.
817 debugs(47, DBG_IMPORTANT
, "BUG: Worker I/O pop queue overflow: " <<
818 SipcIo(workerId
, ipcIo
, KidIdentifier
)); // TODO: report queue len
820 // the I/O request we could not push will timeout
825 DiskerOpen(const String
&path
, int flags
, mode_t mode
)
829 TheFile
= file_open(path
.termedBuf(), flags
);
832 const int xerrno
= errno
;
833 debugs(47, DBG_CRITICAL
, HERE
<< "rock db error opening " << path
<< ": " <<
838 ++store_open_disk_fd
;
839 debugs(79,3, HERE
<< "rock db opened " << path
<< ": FD " << TheFile
);
844 DiskerClose(const String
&path
)
848 debugs(79,3, HERE
<< "rock db closed " << path
<< ": FD " << TheFile
);
850 --store_open_disk_fd
;
854 /// reports our needs for shared memory pages to Ipc::Mem::Pages
855 class IpcIoClaimMemoryNeedsRr
: public RegisteredRunner
858 /* RegisteredRunner API */
859 virtual void run(const RunnerRegistry
&r
);
862 RunnerRegistrationEntry(rrClaimMemoryNeeds
, IpcIoClaimMemoryNeedsRr
);
865 IpcIoClaimMemoryNeedsRr::run(const RunnerRegistry
&)
867 const int itemsCount
= Ipc::FewToFewBiQueue::MaxItemsCount(
868 ::Config
.workers
, ::Config
.cacheSwap
.n_strands
, QueueCapacity
);
869 // the maximum number of shared I/O pages is approximately the
870 // number of queue slots, we add a fudge factor to that to account
871 // for corner cases where I/O pages are created before queue
872 // limits are checked or destroyed long after the I/O is dequeued
873 Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage
,
874 static_cast<int>(itemsCount
* 1.1));
877 /// initializes shared memory segments used by IpcIoFile
878 class IpcIoRr
: public Ipc::Mem::RegisteredRunner
881 /* RegisteredRunner API */
882 IpcIoRr(): owner(NULL
) {}
886 virtual void create(const RunnerRegistry
&);
889 Ipc::FewToFewBiQueue::Owner
*owner
;
892 RunnerRegistrationEntry(rrAfterConfig
, IpcIoRr
);
894 void IpcIoRr::create(const RunnerRegistry
&)
896 if (Config
.cacheSwap
.n_strands
<= 0)
900 owner
= Ipc::FewToFewBiQueue::Init(ShmLabel
, Config
.workers
, 1,
901 Config
.cacheSwap
.n_strands
,
902 1 + Config
.workers
, sizeof(IpcIoMsg
),