2 * DEBUG: section 47 Store Directory Routines
6 #include "base/RunnersRegistry.h"
7 #include "base/TextException.h"
9 #include "DiskIO/IORequestor.h"
10 #include "DiskIO/IpcIo/IpcIoFile.h"
11 #include "DiskIO/ReadRequest.h"
12 #include "DiskIO/WriteRequest.h"
15 #include "ipc/mem/Pages.h"
16 #include "ipc/Messages.h"
18 #include "ipc/Queue.h"
19 #include "ipc/StrandSearch.h"
20 #include "ipc/UdsOp.h"
22 #include "SquidConfig.h"
23 #include "SquidTime.h"
24 #include "StatCounters.h"
29 CBDATA_CLASS_INIT(IpcIoFile
);
31 /// shared memory segment path to use for IpcIoFile maps
32 static const char *const ShmLabel
= "io_file";
33 /// a single worker-to-disker or disker-to-worker queue capacity; up
34 /// to 2*QueueCapacity I/O requests queued between a single worker and
36 // TODO: make configurable or compute from squid.conf settings if possible
37 static const int QueueCapacity
= 1024;
39 const double IpcIoFile::Timeout
= 7; // seconds; XXX: ALL,9 may require more
40 IpcIoFile::IpcIoFileList
IpcIoFile::WaitingForOpen
;
41 IpcIoFile::IpcIoFilesMap
IpcIoFile::IpcIoFiles
;
42 std::auto_ptr
<IpcIoFile::Queue
> IpcIoFile::queue
;
44 bool IpcIoFile::DiskerHandleMoreRequestsScheduled
= false;
46 static bool DiskerOpen(const SBuf
&path
, int flags
, mode_t mode
);
47 static void DiskerClose(const SBuf
&path
);
49 /// IpcIo wrapper for debugs() streams; XXX: find a better class name
51 SipcIo(int aWorker
, const IpcIoMsg
&aMsg
, int aDisker
):
52 worker(aWorker
), msg(aMsg
), disker(aDisker
) {}
60 operator <<(std::ostream
&os
, const SipcIo
&sio
)
62 return os
<< "ipcIo" << sio
.worker
<< '.' << sio
.msg
.requestId
<<
63 (sio
.msg
.command
== IpcIo::cmdRead
? 'r' : 'w') << sio
.disker
;
66 IpcIoFile::IpcIoFile(char const *aDb
):
67 dbName(aDb
), diskId(-1), error_(false), lastRequestId(0),
68 olderRequests(&requestMap1
), newerRequests(&requestMap2
),
69 timeoutCheckScheduled(false)
73 IpcIoFile::~IpcIoFile()
76 const IpcIoFilesMap::iterator i
= IpcIoFiles
.find(diskId
);
77 // XXX: warn and continue?
78 Must(i
!= IpcIoFiles
.end());
79 Must(i
->second
== this);
85 IpcIoFile::configure(const Config
&cfg
)
87 DiskFile::configure(cfg
);
92 IpcIoFile::open(int flags
, mode_t mode
, RefCount
<IORequestor
> callback
)
94 ioRequestor
= callback
;
95 Must(diskId
< 0); // we do not know our disker yet
98 queue
.reset(new Queue(ShmLabel
, IamWorkerProcess() ? Queue::groupA
: Queue::groupB
, KidIdentifier
));
100 if (IamDiskProcess()) {
101 error_
= !DiskerOpen(SBuf(dbName
.termedBuf()), flags
, mode
);
105 diskId
= KidIdentifier
;
106 const bool inserted
=
107 IpcIoFiles
.insert(std::make_pair(diskId
, this)).second
;
110 queue
->localRateLimit() =
111 static_cast<Ipc::QueueReader::Rate::Value
>(config
.ioRate
);
113 Ipc::HereIamMessage
ann(Ipc::StrandCoord(KidIdentifier
, getpid()));
114 ann
.strand
.tag
= dbName
;
115 Ipc::TypedMsgHdr message
;
117 SendMessage(Ipc::Port::CoordinatorAddr(), message
);
119 ioRequestor
->ioCompletedNotification();
123 Ipc::StrandSearchRequest request
;
124 request
.requestorId
= KidIdentifier
;
125 request
.tag
= dbName
;
127 Ipc::TypedMsgHdr msg
;
129 Ipc::SendMessage(Ipc::Port::CoordinatorAddr(), msg
);
131 WaitingForOpen
.push_back(this);
133 eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout
,
134 this, Timeout
, 0, false); // "this" pointer is used as id
138 IpcIoFile::openCompleted(const Ipc::StrandSearchResponse
*const response
)
140 Must(diskId
< 0); // we do not know our disker yet
143 debugs(79, DBG_IMPORTANT
, "ERROR: " << dbName
<< " communication " <<
144 "channel establishment timeout");
147 diskId
= response
->strand
.kidId
;
149 const bool inserted
=
150 IpcIoFiles
.insert(std::make_pair(diskId
, this)).second
;
154 debugs(79, DBG_IMPORTANT
, "ERROR: no disker claimed " <<
155 "responsibility for " << dbName
);
159 ioRequestor
->ioCompletedNotification();
163 * Alias for IpcIoFile::open(...)
164 \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
167 IpcIoFile::create(int flags
, mode_t mode
, RefCount
<IORequestor
> callback
)
169 assert(false); // check
170 /* We use the same logic path for open */
171 open(flags
, mode
, callback
);
177 assert(ioRequestor
!= NULL
);
179 if (IamDiskProcess())
180 DiskerClose(SBuf(dbName
.termedBuf()));
181 // XXX: else nothing to do?
183 ioRequestor
->closeCompleted();
187 IpcIoFile::canRead() const
189 return diskId
>= 0 && !error_
&& canWait();
193 IpcIoFile::canWrite() const
195 return diskId
>= 0 && !error_
&& canWait();
199 IpcIoFile::error() const
205 IpcIoFile::read(ReadRequest
*readRequest
)
207 debugs(79,3, HERE
<< "(disker" << diskId
<< ", " << readRequest
->len
<< ", " <<
208 readRequest
->offset
<< ")");
210 assert(ioRequestor
!= NULL
);
211 assert(readRequest
->offset
>= 0);
214 //assert(minOffset < 0 || minOffset <= readRequest->offset);
215 //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
217 IpcIoPendingRequest
*const pending
= new IpcIoPendingRequest(this);
218 pending
->readRequest
= readRequest
;
223 IpcIoFile::readCompleted(ReadRequest
*readRequest
,
224 IpcIoMsg
*const response
)
226 bool ioError
= false;
228 debugs(79, 3, HERE
<< "error: timeout");
229 ioError
= true; // I/O timeout does not warrant setting error_?
231 if (response
->xerrno
) {
232 debugs(79, DBG_IMPORTANT
, "ERROR: " << dbName
<< " read: " <<
233 xstrerr(response
->xerrno
));
234 ioError
= error_
= true;
235 } else if (!response
->page
) {
236 debugs(79, DBG_IMPORTANT
, "ERROR: " << dbName
<< " read ran " <<
237 "out of shared memory pages");
240 const char *const buf
= Ipc::Mem::PagePointer(response
->page
);
241 memcpy(readRequest
->buf
, buf
, response
->len
);
244 Ipc::Mem::PutPage(response
->page
);
247 const ssize_t rlen
= ioError
? -1 : (ssize_t
)readRequest
->len
;
248 const int errflag
= ioError
? DISK_ERROR
:DISK_OK
;
249 ioRequestor
->readCompleted(readRequest
->buf
, rlen
, errflag
, readRequest
);
253 IpcIoFile::write(WriteRequest
*writeRequest
)
255 debugs(79,3, HERE
<< "(disker" << diskId
<< ", " << writeRequest
->len
<< ", " <<
256 writeRequest
->offset
<< ")");
258 assert(ioRequestor
!= NULL
);
259 assert(writeRequest
->len
> 0); // TODO: work around mmap failures on zero-len?
260 assert(writeRequest
->offset
>= 0);
263 //assert(minOffset < 0 || minOffset <= writeRequest->offset);
264 //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
266 IpcIoPendingRequest
*const pending
= new IpcIoPendingRequest(this);
267 pending
->writeRequest
= writeRequest
;
272 IpcIoFile::writeCompleted(WriteRequest
*writeRequest
,
273 const IpcIoMsg
*const response
)
275 bool ioError
= false;
277 debugs(79, 3, "disker " << diskId
<< " timeout");
278 ioError
= true; // I/O timeout does not warrant setting error_?
279 } else if (response
->xerrno
) {
280 debugs(79, DBG_IMPORTANT
, "ERROR: disker " << diskId
<<
281 " error writing " << writeRequest
->len
<< " bytes at " <<
282 writeRequest
->offset
<< ": " << xstrerr(response
->xerrno
) <<
283 "; this worker will stop using " << dbName
);
284 ioError
= error_
= true;
285 } else if (response
->len
!= writeRequest
->len
) {
286 debugs(79, DBG_IMPORTANT
, "ERROR: disker " << diskId
<< " wrote " <<
287 response
->len
<< " instead of " << writeRequest
->len
<<
288 " bytes (offset " << writeRequest
->offset
<< "); " <<
289 "this worker will stop using " << dbName
);
293 if (writeRequest
->free_func
)
294 (writeRequest
->free_func
)(const_cast<char*>(writeRequest
->buf
)); // broken API?
297 debugs(79,5, HERE
<< "wrote " << writeRequest
->len
<< " to disker" <<
298 diskId
<< " at " << writeRequest
->offset
);
301 const ssize_t rlen
= ioError
? 0 : (ssize_t
)writeRequest
->len
;
302 const int errflag
= ioError
? DISK_ERROR
:DISK_OK
;
303 ioRequestor
->writeCompleted(errflag
, rlen
, writeRequest
);
307 IpcIoFile::ioInProgress() const
309 return !olderRequests
->empty() || !newerRequests
->empty();
312 /// track a new pending request
314 IpcIoFile::trackPendingRequest(const unsigned int id
, IpcIoPendingRequest
*const pending
)
316 const std::pair
<RequestMap::iterator
,bool> result
=
317 newerRequests
->insert(std::make_pair(id
, pending
));
318 Must(result
.second
); // failures means that id was not unique
319 if (!timeoutCheckScheduled
)
320 scheduleTimeoutCheck();
323 /// push an I/O request to disker
325 IpcIoFile::push(IpcIoPendingRequest
*const pending
)
327 // prevent queue overflows: check for responses to earlier requests
328 // warning: this call may result in indirect push() recursion
329 HandleResponses("before push");
334 Must(pending
->readRequest
|| pending
->writeRequest
);
338 if (++lastRequestId
== 0) // don't use zero value as requestId
340 ipcIo
.requestId
= lastRequestId
;
341 ipcIo
.start
= current_time
;
342 if (pending
->readRequest
) {
343 ipcIo
.command
= IpcIo::cmdRead
;
344 ipcIo
.offset
= pending
->readRequest
->offset
;
345 ipcIo
.len
= pending
->readRequest
->len
;
346 } else { // pending->writeRequest
347 Must(pending
->writeRequest
->len
<= Ipc::Mem::PageSize());
348 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage
, ipcIo
.page
)) {
350 throw TexcHere("run out of shared memory pages for IPC I/O");
352 ipcIo
.command
= IpcIo::cmdWrite
;
353 ipcIo
.offset
= pending
->writeRequest
->offset
;
354 ipcIo
.len
= pending
->writeRequest
->len
;
355 char *const buf
= Ipc::Mem::PagePointer(ipcIo
.page
);
356 memcpy(buf
, pending
->writeRequest
->buf
, ipcIo
.len
); // optimize away
359 debugs(47, 7, HERE
<< "pushing " << SipcIo(KidIdentifier
, ipcIo
, diskId
));
361 if (queue
->push(diskId
, ipcIo
))
362 Notify(diskId
); // must notify disker
363 trackPendingRequest(ipcIo
.requestId
, pending
);
364 } catch (const Queue::Full
&) {
365 debugs(47, DBG_IMPORTANT
, "ERROR: worker I/O push queue for " <<
366 dbName
<< " overflow: " <<
367 SipcIo(KidIdentifier
, ipcIo
, diskId
)); // TODO: report queue len
368 // TODO: grow queue size
370 pending
->completeIo(NULL
);
372 } catch (const TextException
&e
) {
373 debugs(47, DBG_IMPORTANT
, "ERROR: " << dbName
<< " exception: " << e
.what());
374 pending
->completeIo(NULL
);
379 /// whether we think there is enough time to complete the I/O
381 IpcIoFile::canWait() const
383 if (!config
.ioTimeout
)
384 return true; // no timeout specified
387 if (!queue
->findOldest(diskId
, oldestIo
) || oldestIo
.start
.tv_sec
<= 0)
388 return true; // we cannot estimate expected wait time; assume it is OK
390 const int oldestWait
= tvSubMsec(oldestIo
.start
, current_time
);
392 int rateWait
= -1; // time in millisecons
393 const Ipc::QueueReader::Rate::Value ioRate
= queue
->rateLimit(diskId
);
395 // if there are N requests pending, the new one will wait at
396 // least N/max-swap-rate seconds
397 rateWait
= static_cast<int>(1e3
* queue
->outSize(diskId
) / ioRate
);
398 // adjust N/max-swap-rate value based on the queue "balance"
399 // member, in case we have been borrowing time against future
401 rateWait
+= queue
->balance(diskId
);
404 const int expectedWait
= max(oldestWait
, rateWait
);
405 if (expectedWait
< 0 ||
406 static_cast<time_msec_t
>(expectedWait
) < config
.ioTimeout
)
407 return true; // expected wait time is acceptible
409 debugs(47,2, HERE
<< "cannot wait: " << expectedWait
<<
410 " oldest: " << SipcIo(KidIdentifier
, oldestIo
, diskId
));
411 return false; // do not want to wait that long
414 /// called when coordinator responds to worker open request
416 IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse
&response
)
418 debugs(47, 7, HERE
<< "coordinator response to open request");
419 for (IpcIoFileList::iterator i
= WaitingForOpen
.begin();
420 i
!= WaitingForOpen
.end(); ++i
) {
421 if (response
.strand
.tag
== (*i
)->dbName
) {
422 (*i
)->openCompleted(&response
);
423 WaitingForOpen
.erase(i
);
428 debugs(47, 4, HERE
<< "LATE disker response to open for " <<
429 response
.strand
.tag
);
430 // nothing we can do about it; completeIo() has been called already
434 IpcIoFile::HandleResponses(const char *const when
)
436 debugs(47, 4, HERE
<< "popping all " << when
);
438 // get all responses we can: since we are not pushing, this will stop
440 while (queue
->pop(diskId
, ipcIo
)) {
441 const IpcIoFilesMap::const_iterator i
= IpcIoFiles
.find(diskId
);
442 Must(i
!= IpcIoFiles
.end()); // TODO: warn but continue
443 i
->second
->handleResponse(ipcIo
);
448 IpcIoFile::handleResponse(IpcIoMsg
&ipcIo
)
450 const int requestId
= ipcIo
.requestId
;
451 debugs(47, 7, HERE
<< "popped disker response: " <<
452 SipcIo(KidIdentifier
, ipcIo
, diskId
));
455 if (IpcIoPendingRequest
*const pending
= dequeueRequest(requestId
)) {
456 pending
->completeIo(&ipcIo
);
457 delete pending
; // XXX: leaking if throwing
459 debugs(47, 4, HERE
<< "LATE disker response to " << ipcIo
.command
<<
460 "; ipcIo" << KidIdentifier
<< '.' << requestId
);
461 // nothing we can do about it; completeIo() has been called already
466 IpcIoFile::Notify(const int peerId
)
468 // TODO: Count and report the total number of notifications, pops, pushes.
469 debugs(47, 7, HERE
<< "kid" << peerId
);
470 Ipc::TypedMsgHdr msg
;
471 msg
.setType(Ipc::mtIpcIoNotification
); // TODO: add proper message type?
472 msg
.putInt(KidIdentifier
);
473 const String addr
= Ipc::Port::MakeAddr(Ipc::strandAddrLabel
, peerId
);
474 Ipc::SendMessage(addr
, msg
);
478 IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr
&msg
)
480 const int from
= msg
.getInt();
481 debugs(47, 7, HERE
<< "from " << from
);
482 queue
->clearReaderSignal(from
);
483 if (IamDiskProcess())
484 DiskerHandleRequests();
486 HandleResponses("after notification");
489 /// handles open request timeout
491 IpcIoFile::OpenTimeout(void *const param
)
494 // the pointer is used for comparison only and not dereferenced
495 const IpcIoFile
*const ipcIoFile
=
496 reinterpret_cast<const IpcIoFile
*>(param
);
497 for (IpcIoFileList::iterator i
= WaitingForOpen
.begin();
498 i
!= WaitingForOpen
.end(); ++i
) {
499 if (*i
== ipcIoFile
) {
500 (*i
)->openCompleted(NULL
);
501 WaitingForOpen
.erase(i
);
507 /// IpcIoFile::checkTimeouts wrapper
509 IpcIoFile::CheckTimeouts(void *const param
)
512 const int diskId
= reinterpret_cast<uintptr_t>(param
);
513 debugs(47, 7, HERE
<< "diskId=" << diskId
);
514 const IpcIoFilesMap::const_iterator i
= IpcIoFiles
.find(diskId
);
515 if (i
!= IpcIoFiles
.end())
516 i
->second
->checkTimeouts();
520 IpcIoFile::checkTimeouts()
522 timeoutCheckScheduled
= false;
524 // last chance to recover in case a notification message was lost, etc.
525 const RequestMap::size_type timeoutsBefore
= olderRequests
->size();
526 HandleResponses("before timeout");
527 const RequestMap::size_type timeoutsNow
= olderRequests
->size();
529 if (timeoutsBefore
> timeoutsNow
) { // some requests were rescued
530 // notification message lost or significantly delayed?
531 debugs(47, DBG_IMPORTANT
, "WARNING: communication with " << dbName
<<
532 " may be too slow or disrupted for about " <<
533 Timeout
<< "s; rescued " << (timeoutsBefore
- timeoutsNow
) <<
534 " out of " << timeoutsBefore
<< " I/Os");
538 debugs(47, DBG_IMPORTANT
, "WARNING: abandoning " <<
539 timeoutsNow
<< ' ' << dbName
<< " I/Os after at least " <<
540 Timeout
<< "s timeout");
543 // any old request would have timed out by now
544 typedef RequestMap::const_iterator RMCI
;
545 for (RMCI i
= olderRequests
->begin(); i
!= olderRequests
->end(); ++i
) {
546 IpcIoPendingRequest
*const pending
= i
->second
;
548 const unsigned int requestId
= i
->first
;
549 debugs(47, 7, HERE
<< "disker timeout; ipcIo" <<
550 KidIdentifier
<< '.' << requestId
);
552 pending
->completeIo(NULL
); // no response
553 delete pending
; // XXX: leaking if throwing
555 olderRequests
->clear();
557 swap(olderRequests
, newerRequests
); // switches pointers around
558 if (!olderRequests
->empty() && !timeoutCheckScheduled
)
559 scheduleTimeoutCheck();
562 /// prepare to check for timeouts in a little while
564 IpcIoFile::scheduleTimeoutCheck()
566 // we check all older requests at once so some may be wait for 2*Timeout
567 eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts
,
568 reinterpret_cast<void *>(diskId
), Timeout
, 0, false);
569 timeoutCheckScheduled
= true;
572 /// returns and forgets the right IpcIoFile pending request
573 IpcIoPendingRequest
*
574 IpcIoFile::dequeueRequest(const unsigned int requestId
)
576 Must(requestId
!= 0);
578 RequestMap
*map
= NULL
;
579 RequestMap::iterator i
= requestMap1
.find(requestId
);
581 if (i
!= requestMap1
.end())
584 i
= requestMap2
.find(requestId
);
585 if (i
!= requestMap2
.end())
589 if (!map
) // not found in both maps
592 IpcIoPendingRequest
*pending
= i
->second
;
598 IpcIoFile::getFD() const
600 assert(false); // not supported; TODO: remove this method from API
606 IpcIoMsg::IpcIoMsg():
610 command(IpcIo::cmdNone
),
617 /* IpcIoPendingRequest */
619 IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer
&aFile
):
620 file(aFile
), readRequest(NULL
), writeRequest(NULL
)
625 IpcIoPendingRequest::completeIo(IpcIoMsg
*const response
)
628 file
->readCompleted(readRequest
, response
);
629 else if (writeRequest
)
630 file
->writeCompleted(writeRequest
, response
);
632 Must(!response
); // only timeouts are handled here
633 file
->openCompleted(NULL
);
637 /* XXX: disker code that should probably be moved elsewhere */
639 static SBuf DbName
; ///< full db file name
640 static int TheFile
= -1; ///< db file descriptor
643 diskerRead(IpcIoMsg
&ipcIo
)
645 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage
, ipcIo
.page
)) {
647 debugs(47,2, HERE
<< "run out of shared memory pages for IPC I/O");
651 char *const buf
= Ipc::Mem::PagePointer(ipcIo
.page
);
652 const ssize_t read
= pread(TheFile
, buf
, min(ipcIo
.len
, Ipc::Mem::PageSize()), ipcIo
.offset
);
653 ++statCounter
.syscalls
.disk
.reads
;
654 fd_bytes(TheFile
, read
, FD_READ
);
658 const size_t len
= static_cast<size_t>(read
); // safe because read > 0
659 debugs(47,8, HERE
<< "disker" << KidIdentifier
<< " read " <<
660 (len
== ipcIo
.len
? "all " : "just ") << read
);
663 ipcIo
.xerrno
= errno
;
665 debugs(47,5, HERE
<< "disker" << KidIdentifier
<< " read error: " <<
670 /// Tries to write buffer to disk (a few times if needed);
671 /// sets ipcIo results, but does no cleanup. The caller must cleanup.
673 diskerWriteAttempts(IpcIoMsg
&ipcIo
)
675 const char *buf
= Ipc::Mem::PagePointer(ipcIo
.page
);
676 size_t toWrite
= min(ipcIo
.len
, Ipc::Mem::PageSize());
677 size_t wroteSoFar
= 0;
678 off_t offset
= ipcIo
.offset
;
679 // Partial writes to disk do happen. It is unlikely that the caller can
680 // handle partial writes by doing something other than writing leftovers
681 // again, so we try to write them ourselves to minimize overheads.
682 const int attemptLimit
= 10;
683 for (int attempts
= 1; attempts
<= attemptLimit
; ++attempts
) {
684 const ssize_t result
= pwrite(TheFile
, buf
, toWrite
, offset
);
685 ++statCounter
.syscalls
.disk
.writes
;
686 fd_bytes(TheFile
, result
, FD_WRITE
);
689 ipcIo
.xerrno
= errno
;
690 assert(ipcIo
.xerrno
);
691 debugs(47, DBG_IMPORTANT
, "ERROR: " << DbName
<< " failure" <<
692 " writing " << toWrite
<< '/' << ipcIo
.len
<<
693 " at " << ipcIo
.offset
<< '+' << wroteSoFar
<<
694 " on " << attempts
<< " try: " << xstrerr(ipcIo
.xerrno
));
695 ipcIo
.len
= wroteSoFar
;
696 return; // bail on error
699 const size_t wroteNow
= static_cast<size_t>(result
); // result >= 0
702 debugs(47,3, "disker" << KidIdentifier
<< " wrote " <<
703 (wroteNow
>= toWrite
? "all " : "just ") << wroteNow
<<
704 " out of " << toWrite
<< '/' << ipcIo
.len
<< " at " <<
705 ipcIo
.offset
<< '+' << wroteSoFar
<< " on " << attempts
<<
708 wroteSoFar
+= wroteNow
;
710 if (wroteNow
>= toWrite
) {
712 ipcIo
.len
= wroteSoFar
;
713 return; // wrote everything there was to write
721 debugs(47, DBG_IMPORTANT
, "ERROR: " << DbName
<< " exhausted all " <<
722 attemptLimit
<< " attempts while writing " <<
723 toWrite
<< '/' << ipcIo
.len
<< " at " << ipcIo
.offset
<< '+' <<
725 return; // not a fatal I/O error, unless the caller treats it as such
729 diskerWrite(IpcIoMsg
&ipcIo
)
731 diskerWriteAttempts(ipcIo
); // may fail
732 Ipc::Mem::PutPage(ipcIo
.page
);
736 IpcIoFile::DiskerHandleMoreRequests(void *source
)
738 debugs(47, 7, HERE
<< "resuming handling requests after " <<
739 static_cast<const char *>(source
));
740 DiskerHandleMoreRequestsScheduled
= false;
741 IpcIoFile::DiskerHandleRequests();
745 IpcIoFile::WaitBeforePop()
747 const Ipc::QueueReader::Rate::Value ioRate
= queue
->localRateLimit();
748 const double maxRate
= ioRate
/1e3
; // req/ms
750 // do we need to enforce configured I/O rate?
754 // is there an I/O request we could potentially delay?
757 if (!queue
->peek(processId
, ipcIo
)) {
758 // unlike pop(), peek() is not reliable and does not block reader
759 // so we must proceed with pop() even if it is likely to fail
763 static timeval LastIo
= current_time
;
765 const double ioDuration
= 1.0 / maxRate
; // ideal distance between two I/Os
766 // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
767 const int64_t maxImbalance
= min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration
));
769 const double credit
= ioDuration
; // what the last I/O should have cost us
770 const double debit
= tvSubMsec(LastIo
, current_time
); // actual distance from the last I/O
771 LastIo
= current_time
;
773 Ipc::QueueReader::Balance
&balance
= queue
->localBalance();
774 balance
+= static_cast<int64_t>(credit
- debit
);
776 debugs(47, 7, HERE
<< "rate limiting balance: " << balance
<< " after +" << credit
<< " -" << debit
);
778 if (ipcIo
.command
== IpcIo::cmdWrite
&& balance
> maxImbalance
) {
779 // if the next request is (likely) write and we accumulated
780 // too much time for future slow I/Os, then shed accumulated
781 // time to keep just half of the excess
782 const int64_t toSpend
= balance
- maxImbalance
/2;
784 if (toSpend
/1e3
> Timeout
)
785 debugs(47, DBG_IMPORTANT
, "WARNING: " << DbName
<< " delays " <<
786 "I/O requests for " << (toSpend
/1e3
) << " seconds " <<
787 "to obey " << ioRate
<< "/sec rate limit");
789 debugs(47, 3, HERE
<< "rate limiting by " << toSpend
<< " ms to get" <<
790 (1e3
*maxRate
) << "/sec rate");
791 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
792 &IpcIoFile::DiskerHandleMoreRequests
,
793 const_cast<char*>("rate limiting"),
794 toSpend
/1e3
, 0, false);
795 DiskerHandleMoreRequestsScheduled
= true;
797 } else if (balance
< -maxImbalance
) {
798 // do not owe "too much" to avoid "too large" bursts of I/O
799 balance
= -maxImbalance
;
806 IpcIoFile::DiskerHandleRequests()
808 // Balance our desire to maximize the number of concurrent I/O requests
809 // (reordred by OS to minimize seek time) with a requirement to
810 // send 1st-I/O notification messages, process Coordinator events, etc.
811 const int maxSpentMsec
= 10; // keep small: most RAM I/Os are under 1ms
812 const timeval loopStart
= current_time
;
817 while (!WaitBeforePop() && queue
->pop(workerId
, ipcIo
)) {
820 // at least one I/O per call is guaranteed if the queue is not empty
821 DiskerHandleRequest(workerId
, ipcIo
);
824 const double elapsedMsec
= tvSubMsec(loopStart
, current_time
);
825 if (elapsedMsec
> maxSpentMsec
|| elapsedMsec
< 0) {
826 if (!DiskerHandleMoreRequestsScheduled
) {
827 // the gap must be positive for select(2) to be given a chance
828 const double minBreakSecs
= 0.001;
829 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
830 &IpcIoFile::DiskerHandleMoreRequests
,
831 const_cast<char*>("long I/O loop"),
832 minBreakSecs
, 0, false);
833 DiskerHandleMoreRequestsScheduled
= true;
835 debugs(47, 3, HERE
<< "pausing after " << popped
<< " I/Os in " <<
836 elapsedMsec
<< "ms; " << (elapsedMsec
/popped
) << "ms per I/O");
841 // TODO: consider using O_DIRECT with "elevator" optimization where we pop
842 // requests first, then reorder the popped requests to optimize seek time,
843 // then do I/O, then take a break, and come back for the next set of I/O
847 /// called when disker receives an I/O request
849 IpcIoFile::DiskerHandleRequest(const int workerId
, IpcIoMsg
&ipcIo
)
851 if (ipcIo
.command
!= IpcIo::cmdRead
&& ipcIo
.command
!= IpcIo::cmdWrite
) {
852 debugs(0, DBG_CRITICAL
, "ERROR: " << DbName
<<
853 " should not receive " << ipcIo
.command
<<
854 " ipcIo" << workerId
<< '.' << ipcIo
.requestId
);
858 debugs(47,5, HERE
<< "disker" << KidIdentifier
<<
859 (ipcIo
.command
== IpcIo::cmdRead
? " reads " : " writes ") <<
860 ipcIo
.len
<< " at " << ipcIo
.offset
<<
861 " ipcIo" << workerId
<< '.' << ipcIo
.requestId
);
863 if (ipcIo
.command
== IpcIo::cmdRead
)
865 else // ipcIo.command == IpcIo::cmdWrite
868 debugs(47, 7, HERE
<< "pushing " << SipcIo(workerId
, ipcIo
, KidIdentifier
));
871 if (queue
->push(workerId
, ipcIo
))
872 Notify(workerId
); // must notify worker
873 } catch (const Queue::Full
&) {
874 // The worker queue should not overflow because the worker should pop()
875 // before push()ing and because if disker pops N requests at a time,
876 // we should make sure the worker pop() queue length is the worker
877 // push queue length plus N+1. XXX: implement the N+1 difference.
878 debugs(47, DBG_IMPORTANT
, "BUG: Worker I/O pop queue for " <<
879 DbName
<< " overflow: " <<
880 SipcIo(workerId
, ipcIo
, KidIdentifier
)); // TODO: report queue len
882 // the I/O request we could not push will timeout
887 DiskerOpen(const SBuf
&path
, int flags
, mode_t mode
)
892 TheFile
= file_open(DbName
.c_str(), flags
);
895 const int xerrno
= errno
;
896 debugs(47, DBG_CRITICAL
, "ERROR: cannot open " << DbName
<< ": " <<
901 ++store_open_disk_fd
;
902 debugs(79,3, "rock db opened " << DbName
<< ": FD " << TheFile
);
907 DiskerClose(const SBuf
&path
)
911 debugs(79,3, HERE
<< "rock db closed " << path
<< ": FD " << TheFile
);
913 --store_open_disk_fd
;
918 /// reports our needs for shared memory pages to Ipc::Mem::Pages
919 /// and initializes shared memory segments used by IpcIoFile
920 class IpcIoRr
: public Ipc::Mem::RegisteredRunner
923 /* RegisteredRunner API */
924 IpcIoRr(): owner(NULL
) {}
926 virtual void claimMemoryNeeds();
929 /* Ipc::Mem::RegisteredRunner API */
930 virtual void create();
933 Ipc::FewToFewBiQueue::Owner
*owner
;
936 RunnerRegistrationEntry(IpcIoRr
);
939 IpcIoRr::claimMemoryNeeds()
941 const int itemsCount
= Ipc::FewToFewBiQueue::MaxItemsCount(
942 ::Config
.workers
, ::Config
.cacheSwap
.n_strands
, QueueCapacity
);
943 // the maximum number of shared I/O pages is approximately the
944 // number of queue slots, we add a fudge factor to that to account
945 // for corner cases where I/O pages are created before queue
946 // limits are checked or destroyed long after the I/O is dequeued
947 Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage
,
948 static_cast<int>(itemsCount
* 1.1));
954 if (Config
.cacheSwap
.n_strands
<= 0)
958 owner
= Ipc::FewToFewBiQueue::Init(ShmLabel
, Config
.workers
, 1,
959 Config
.cacheSwap
.n_strands
,
960 1 + Config
.workers
, sizeof(IpcIoMsg
),