2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 47 Store Directory Routines */
12 #include "base/RunnersRegistry.h"
13 #include "base/TextException.h"
15 #include "DiskIO/IORequestor.h"
16 #include "DiskIO/IpcIo/IpcIoFile.h"
17 #include "DiskIO/ReadRequest.h"
18 #include "DiskIO/WriteRequest.h"
21 #include "ipc/mem/Pages.h"
22 #include "ipc/Messages.h"
24 #include "ipc/Queue.h"
25 #include "ipc/StrandSearch.h"
26 #include "ipc/UdsOp.h"
28 #include "SquidConfig.h"
29 #include "SquidTime.h"
30 #include "StatCounters.h"
35 CBDATA_CLASS_INIT(IpcIoFile
);
37 /// shared memory segment path to use for IpcIoFile maps
38 static const char *const ShmLabel
= "io_file";
39 /// a single worker-to-disker or disker-to-worker queue capacity; up
40 /// to 2*QueueCapacity I/O requests queued between a single worker and
42 // TODO: make configurable or compute from squid.conf settings if possible
43 static const int QueueCapacity
= 1024;
45 const double IpcIoFile::Timeout
= 7; // seconds; XXX: ALL,9 may require more
46 IpcIoFile::IpcIoFileList
IpcIoFile::WaitingForOpen
;
47 IpcIoFile::IpcIoFilesMap
IpcIoFile::IpcIoFiles
;
48 std::auto_ptr
<IpcIoFile::Queue
> IpcIoFile::queue
;
50 bool IpcIoFile::DiskerHandleMoreRequestsScheduled
= false;
52 static bool DiskerOpen(const SBuf
&path
, int flags
, mode_t mode
);
53 static void DiskerClose(const SBuf
&path
);
55 /// IpcIo wrapper for debugs() streams; XXX: find a better class name
57 SipcIo(int aWorker
, const IpcIoMsg
&aMsg
, int aDisker
):
58 worker(aWorker
), msg(aMsg
), disker(aDisker
) {}
66 operator <<(std::ostream
&os
, const SipcIo
&sio
)
68 return os
<< "ipcIo" << sio
.worker
<< '.' << sio
.msg
.requestId
<<
69 (sio
.msg
.command
== IpcIo::cmdRead
? 'r' : 'w') << sio
.disker
;
72 IpcIoFile::IpcIoFile(char const *aDb
):
73 dbName(aDb
), diskId(-1), error_(false), lastRequestId(0),
74 olderRequests(&requestMap1
), newerRequests(&requestMap2
),
75 timeoutCheckScheduled(false)
79 IpcIoFile::~IpcIoFile()
82 const IpcIoFilesMap::iterator i
= IpcIoFiles
.find(diskId
);
83 // XXX: warn and continue?
84 Must(i
!= IpcIoFiles
.end());
85 Must(i
->second
== this);
91 IpcIoFile::configure(const Config
&cfg
)
93 DiskFile::configure(cfg
);
98 IpcIoFile::open(int flags
, mode_t mode
, RefCount
<IORequestor
> callback
)
100 ioRequestor
= callback
;
101 Must(diskId
< 0); // we do not know our disker yet
104 queue
.reset(new Queue(ShmLabel
, IamWorkerProcess() ? Queue::groupA
: Queue::groupB
, KidIdentifier
));
106 if (IamDiskProcess()) {
107 error_
= !DiskerOpen(SBuf(dbName
.termedBuf()), flags
, mode
);
111 diskId
= KidIdentifier
;
112 const bool inserted
=
113 IpcIoFiles
.insert(std::make_pair(diskId
, this)).second
;
116 queue
->localRateLimit() =
117 static_cast<Ipc::QueueReader::Rate::Value
>(config
.ioRate
);
119 Ipc::HereIamMessage
ann(Ipc::StrandCoord(KidIdentifier
, getpid()));
120 ann
.strand
.tag
= dbName
;
121 Ipc::TypedMsgHdr message
;
123 SendMessage(Ipc::Port::CoordinatorAddr(), message
);
125 ioRequestor
->ioCompletedNotification();
129 Ipc::StrandSearchRequest request
;
130 request
.requestorId
= KidIdentifier
;
131 request
.tag
= dbName
;
133 Ipc::TypedMsgHdr msg
;
135 Ipc::SendMessage(Ipc::Port::CoordinatorAddr(), msg
);
137 WaitingForOpen
.push_back(this);
139 eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout
,
140 this, Timeout
, 0, false); // "this" pointer is used as id
144 IpcIoFile::openCompleted(const Ipc::StrandSearchResponse
*const response
)
146 Must(diskId
< 0); // we do not know our disker yet
149 debugs(79, DBG_IMPORTANT
, "ERROR: " << dbName
<< " communication " <<
150 "channel establishment timeout");
153 diskId
= response
->strand
.kidId
;
155 const bool inserted
=
156 IpcIoFiles
.insert(std::make_pair(diskId
, this)).second
;
160 debugs(79, DBG_IMPORTANT
, "ERROR: no disker claimed " <<
161 "responsibility for " << dbName
);
165 ioRequestor
->ioCompletedNotification();
169 * Alias for IpcIoFile::open(...)
170 \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
173 IpcIoFile::create(int flags
, mode_t mode
, RefCount
<IORequestor
> callback
)
175 assert(false); // check
176 /* We use the same logic path for open */
177 open(flags
, mode
, callback
);
183 assert(ioRequestor
!= NULL
);
185 if (IamDiskProcess())
186 DiskerClose(SBuf(dbName
.termedBuf()));
187 // XXX: else nothing to do?
189 ioRequestor
->closeCompleted();
193 IpcIoFile::canRead() const
195 return diskId
>= 0 && !error_
&& canWait();
199 IpcIoFile::canWrite() const
201 return diskId
>= 0 && !error_
&& canWait();
205 IpcIoFile::error() const
211 IpcIoFile::read(ReadRequest
*readRequest
)
213 debugs(79,3, HERE
<< "(disker" << diskId
<< ", " << readRequest
->len
<< ", " <<
214 readRequest
->offset
<< ")");
216 assert(ioRequestor
!= NULL
);
217 assert(readRequest
->offset
>= 0);
220 //assert(minOffset < 0 || minOffset <= readRequest->offset);
221 //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
223 IpcIoPendingRequest
*const pending
= new IpcIoPendingRequest(this);
224 pending
->readRequest
= readRequest
;
229 IpcIoFile::readCompleted(ReadRequest
*readRequest
,
230 IpcIoMsg
*const response
)
232 bool ioError
= false;
234 debugs(79, 3, HERE
<< "error: timeout");
235 ioError
= true; // I/O timeout does not warrant setting error_?
237 if (response
->xerrno
) {
238 debugs(79, DBG_IMPORTANT
, "ERROR: " << dbName
<< " read: " <<
239 xstrerr(response
->xerrno
));
240 ioError
= error_
= true;
241 } else if (!response
->page
) {
242 debugs(79, DBG_IMPORTANT
, "ERROR: " << dbName
<< " read ran " <<
243 "out of shared memory pages");
246 const char *const buf
= Ipc::Mem::PagePointer(response
->page
);
247 memcpy(readRequest
->buf
, buf
, response
->len
);
250 Ipc::Mem::PutPage(response
->page
);
253 const ssize_t rlen
= ioError
? -1 : (ssize_t
)readRequest
->len
;
254 const int errflag
= ioError
? DISK_ERROR
:DISK_OK
;
255 ioRequestor
->readCompleted(readRequest
->buf
, rlen
, errflag
, readRequest
);
259 IpcIoFile::write(WriteRequest
*writeRequest
)
261 debugs(79,3, HERE
<< "(disker" << diskId
<< ", " << writeRequest
->len
<< ", " <<
262 writeRequest
->offset
<< ")");
264 assert(ioRequestor
!= NULL
);
265 assert(writeRequest
->len
> 0); // TODO: work around mmap failures on zero-len?
266 assert(writeRequest
->offset
>= 0);
269 //assert(minOffset < 0 || minOffset <= writeRequest->offset);
270 //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
272 IpcIoPendingRequest
*const pending
= new IpcIoPendingRequest(this);
273 pending
->writeRequest
= writeRequest
;
278 IpcIoFile::writeCompleted(WriteRequest
*writeRequest
,
279 const IpcIoMsg
*const response
)
281 bool ioError
= false;
283 debugs(79, 3, "disker " << diskId
<< " timeout");
284 ioError
= true; // I/O timeout does not warrant setting error_?
285 } else if (response
->xerrno
) {
286 debugs(79, DBG_IMPORTANT
, "ERROR: disker " << diskId
<<
287 " error writing " << writeRequest
->len
<< " bytes at " <<
288 writeRequest
->offset
<< ": " << xstrerr(response
->xerrno
) <<
289 "; this worker will stop using " << dbName
);
290 ioError
= error_
= true;
291 } else if (response
->len
!= writeRequest
->len
) {
292 debugs(79, DBG_IMPORTANT
, "ERROR: disker " << diskId
<< " wrote " <<
293 response
->len
<< " instead of " << writeRequest
->len
<<
294 " bytes (offset " << writeRequest
->offset
<< "); " <<
295 "this worker will stop using " << dbName
);
299 if (writeRequest
->free_func
)
300 (writeRequest
->free_func
)(const_cast<char*>(writeRequest
->buf
)); // broken API?
303 debugs(79,5, HERE
<< "wrote " << writeRequest
->len
<< " to disker" <<
304 diskId
<< " at " << writeRequest
->offset
);
307 const ssize_t rlen
= ioError
? 0 : (ssize_t
)writeRequest
->len
;
308 const int errflag
= ioError
? DISK_ERROR
:DISK_OK
;
309 ioRequestor
->writeCompleted(errflag
, rlen
, writeRequest
);
313 IpcIoFile::ioInProgress() const
315 return !olderRequests
->empty() || !newerRequests
->empty();
318 /// track a new pending request
320 IpcIoFile::trackPendingRequest(const unsigned int id
, IpcIoPendingRequest
*const pending
)
322 const std::pair
<RequestMap::iterator
,bool> result
=
323 newerRequests
->insert(std::make_pair(id
, pending
));
324 Must(result
.second
); // failures means that id was not unique
325 if (!timeoutCheckScheduled
)
326 scheduleTimeoutCheck();
329 /// push an I/O request to disker
331 IpcIoFile::push(IpcIoPendingRequest
*const pending
)
333 // prevent queue overflows: check for responses to earlier requests
334 // warning: this call may result in indirect push() recursion
335 HandleResponses("before push");
340 Must(pending
->readRequest
|| pending
->writeRequest
);
344 if (++lastRequestId
== 0) // don't use zero value as requestId
346 ipcIo
.requestId
= lastRequestId
;
347 ipcIo
.start
= current_time
;
348 if (pending
->readRequest
) {
349 ipcIo
.command
= IpcIo::cmdRead
;
350 ipcIo
.offset
= pending
->readRequest
->offset
;
351 ipcIo
.len
= pending
->readRequest
->len
;
352 } else { // pending->writeRequest
353 Must(pending
->writeRequest
->len
<= Ipc::Mem::PageSize());
354 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage
, ipcIo
.page
)) {
356 throw TexcHere("run out of shared memory pages for IPC I/O");
358 ipcIo
.command
= IpcIo::cmdWrite
;
359 ipcIo
.offset
= pending
->writeRequest
->offset
;
360 ipcIo
.len
= pending
->writeRequest
->len
;
361 char *const buf
= Ipc::Mem::PagePointer(ipcIo
.page
);
362 memcpy(buf
, pending
->writeRequest
->buf
, ipcIo
.len
); // optimize away
365 debugs(47, 7, HERE
<< "pushing " << SipcIo(KidIdentifier
, ipcIo
, diskId
));
367 if (queue
->push(diskId
, ipcIo
))
368 Notify(diskId
); // must notify disker
369 trackPendingRequest(ipcIo
.requestId
, pending
);
370 } catch (const Queue::Full
&) {
371 debugs(47, DBG_IMPORTANT
, "ERROR: worker I/O push queue for " <<
372 dbName
<< " overflow: " <<
373 SipcIo(KidIdentifier
, ipcIo
, diskId
)); // TODO: report queue len
374 // TODO: grow queue size
376 pending
->completeIo(NULL
);
378 } catch (const TextException
&e
) {
379 debugs(47, DBG_IMPORTANT
, "ERROR: " << dbName
<< " exception: " << e
.what());
380 pending
->completeIo(NULL
);
385 /// whether we think there is enough time to complete the I/O
387 IpcIoFile::canWait() const
389 if (!config
.ioTimeout
)
390 return true; // no timeout specified
393 if (!queue
->findOldest(diskId
, oldestIo
) || oldestIo
.start
.tv_sec
<= 0)
394 return true; // we cannot estimate expected wait time; assume it is OK
396 const int oldestWait
= tvSubMsec(oldestIo
.start
, current_time
);
398 int rateWait
= -1; // time in millisecons
399 const Ipc::QueueReader::Rate::Value ioRate
= queue
->rateLimit(diskId
);
401 // if there are N requests pending, the new one will wait at
402 // least N/max-swap-rate seconds
403 rateWait
= static_cast<int>(1e3
* queue
->outSize(diskId
) / ioRate
);
404 // adjust N/max-swap-rate value based on the queue "balance"
405 // member, in case we have been borrowing time against future
407 rateWait
+= queue
->balance(diskId
);
410 const int expectedWait
= max(oldestWait
, rateWait
);
411 if (expectedWait
< 0 ||
412 static_cast<time_msec_t
>(expectedWait
) < config
.ioTimeout
)
413 return true; // expected wait time is acceptible
415 debugs(47,2, HERE
<< "cannot wait: " << expectedWait
<<
416 " oldest: " << SipcIo(KidIdentifier
, oldestIo
, diskId
));
417 return false; // do not want to wait that long
420 /// called when coordinator responds to worker open request
422 IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse
&response
)
424 debugs(47, 7, HERE
<< "coordinator response to open request");
425 for (IpcIoFileList::iterator i
= WaitingForOpen
.begin();
426 i
!= WaitingForOpen
.end(); ++i
) {
427 if (response
.strand
.tag
== (*i
)->dbName
) {
428 (*i
)->openCompleted(&response
);
429 WaitingForOpen
.erase(i
);
434 debugs(47, 4, HERE
<< "LATE disker response to open for " <<
435 response
.strand
.tag
);
436 // nothing we can do about it; completeIo() has been called already
440 IpcIoFile::HandleResponses(const char *const when
)
442 debugs(47, 4, HERE
<< "popping all " << when
);
444 // get all responses we can: since we are not pushing, this will stop
446 while (queue
->pop(diskId
, ipcIo
)) {
447 const IpcIoFilesMap::const_iterator i
= IpcIoFiles
.find(diskId
);
448 Must(i
!= IpcIoFiles
.end()); // TODO: warn but continue
449 i
->second
->handleResponse(ipcIo
);
454 IpcIoFile::handleResponse(IpcIoMsg
&ipcIo
)
456 const int requestId
= ipcIo
.requestId
;
457 debugs(47, 7, HERE
<< "popped disker response: " <<
458 SipcIo(KidIdentifier
, ipcIo
, diskId
));
461 if (IpcIoPendingRequest
*const pending
= dequeueRequest(requestId
)) {
462 pending
->completeIo(&ipcIo
);
463 delete pending
; // XXX: leaking if throwing
465 debugs(47, 4, HERE
<< "LATE disker response to " << ipcIo
.command
<<
466 "; ipcIo" << KidIdentifier
<< '.' << requestId
);
467 // nothing we can do about it; completeIo() has been called already
472 IpcIoFile::Notify(const int peerId
)
474 // TODO: Count and report the total number of notifications, pops, pushes.
475 debugs(47, 7, HERE
<< "kid" << peerId
);
476 Ipc::TypedMsgHdr msg
;
477 msg
.setType(Ipc::mtIpcIoNotification
); // TODO: add proper message type?
478 msg
.putInt(KidIdentifier
);
479 const String addr
= Ipc::Port::MakeAddr(Ipc::strandAddrLabel
, peerId
);
480 Ipc::SendMessage(addr
, msg
);
484 IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr
&msg
)
486 const int from
= msg
.getInt();
487 debugs(47, 7, HERE
<< "from " << from
);
488 queue
->clearReaderSignal(from
);
489 if (IamDiskProcess())
490 DiskerHandleRequests();
492 HandleResponses("after notification");
495 /// handles open request timeout
497 IpcIoFile::OpenTimeout(void *const param
)
500 // the pointer is used for comparison only and not dereferenced
501 const IpcIoFile
*const ipcIoFile
=
502 reinterpret_cast<const IpcIoFile
*>(param
);
503 for (IpcIoFileList::iterator i
= WaitingForOpen
.begin();
504 i
!= WaitingForOpen
.end(); ++i
) {
505 if (*i
== ipcIoFile
) {
506 (*i
)->openCompleted(NULL
);
507 WaitingForOpen
.erase(i
);
513 /// IpcIoFile::checkTimeouts wrapper
515 IpcIoFile::CheckTimeouts(void *const param
)
518 const int diskId
= reinterpret_cast<uintptr_t>(param
);
519 debugs(47, 7, HERE
<< "diskId=" << diskId
);
520 const IpcIoFilesMap::const_iterator i
= IpcIoFiles
.find(diskId
);
521 if (i
!= IpcIoFiles
.end())
522 i
->second
->checkTimeouts();
526 IpcIoFile::checkTimeouts()
528 timeoutCheckScheduled
= false;
530 // last chance to recover in case a notification message was lost, etc.
531 const RequestMap::size_type timeoutsBefore
= olderRequests
->size();
532 HandleResponses("before timeout");
533 const RequestMap::size_type timeoutsNow
= olderRequests
->size();
535 if (timeoutsBefore
> timeoutsNow
) { // some requests were rescued
536 // notification message lost or significantly delayed?
537 debugs(47, DBG_IMPORTANT
, "WARNING: communication with " << dbName
<<
538 " may be too slow or disrupted for about " <<
539 Timeout
<< "s; rescued " << (timeoutsBefore
- timeoutsNow
) <<
540 " out of " << timeoutsBefore
<< " I/Os");
544 debugs(47, DBG_IMPORTANT
, "WARNING: abandoning " <<
545 timeoutsNow
<< ' ' << dbName
<< " I/Os after at least " <<
546 Timeout
<< "s timeout");
549 // any old request would have timed out by now
550 typedef RequestMap::const_iterator RMCI
;
551 for (RMCI i
= olderRequests
->begin(); i
!= olderRequests
->end(); ++i
) {
552 IpcIoPendingRequest
*const pending
= i
->second
;
554 const unsigned int requestId
= i
->first
;
555 debugs(47, 7, HERE
<< "disker timeout; ipcIo" <<
556 KidIdentifier
<< '.' << requestId
);
558 pending
->completeIo(NULL
); // no response
559 delete pending
; // XXX: leaking if throwing
561 olderRequests
->clear();
563 swap(olderRequests
, newerRequests
); // switches pointers around
564 if (!olderRequests
->empty() && !timeoutCheckScheduled
)
565 scheduleTimeoutCheck();
568 /// prepare to check for timeouts in a little while
570 IpcIoFile::scheduleTimeoutCheck()
572 // we check all older requests at once so some may be wait for 2*Timeout
573 eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts
,
574 reinterpret_cast<void *>(diskId
), Timeout
, 0, false);
575 timeoutCheckScheduled
= true;
578 /// returns and forgets the right IpcIoFile pending request
579 IpcIoPendingRequest
*
580 IpcIoFile::dequeueRequest(const unsigned int requestId
)
582 Must(requestId
!= 0);
584 RequestMap
*map
= NULL
;
585 RequestMap::iterator i
= requestMap1
.find(requestId
);
587 if (i
!= requestMap1
.end())
590 i
= requestMap2
.find(requestId
);
591 if (i
!= requestMap2
.end())
595 if (!map
) // not found in both maps
598 IpcIoPendingRequest
*pending
= i
->second
;
604 IpcIoFile::getFD() const
606 assert(false); // not supported; TODO: remove this method from API
612 IpcIoMsg::IpcIoMsg():
616 command(IpcIo::cmdNone
),
623 /* IpcIoPendingRequest */
625 IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer
&aFile
):
626 file(aFile
), readRequest(NULL
), writeRequest(NULL
)
631 IpcIoPendingRequest::completeIo(IpcIoMsg
*const response
)
634 file
->readCompleted(readRequest
, response
);
635 else if (writeRequest
)
636 file
->writeCompleted(writeRequest
, response
);
638 Must(!response
); // only timeouts are handled here
639 file
->openCompleted(NULL
);
643 /* XXX: disker code that should probably be moved elsewhere */
645 static SBuf DbName
; ///< full db file name
646 static int TheFile
= -1; ///< db file descriptor
649 diskerRead(IpcIoMsg
&ipcIo
)
651 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage
, ipcIo
.page
)) {
653 debugs(47,2, HERE
<< "run out of shared memory pages for IPC I/O");
657 char *const buf
= Ipc::Mem::PagePointer(ipcIo
.page
);
658 const ssize_t read
= pread(TheFile
, buf
, min(ipcIo
.len
, Ipc::Mem::PageSize()), ipcIo
.offset
);
659 ++statCounter
.syscalls
.disk
.reads
;
660 fd_bytes(TheFile
, read
, FD_READ
);
664 const size_t len
= static_cast<size_t>(read
); // safe because read > 0
665 debugs(47,8, HERE
<< "disker" << KidIdentifier
<< " read " <<
666 (len
== ipcIo
.len
? "all " : "just ") << read
);
669 ipcIo
.xerrno
= errno
;
671 debugs(47,5, HERE
<< "disker" << KidIdentifier
<< " read error: " <<
676 /// Tries to write buffer to disk (a few times if needed);
677 /// sets ipcIo results, but does no cleanup. The caller must cleanup.
679 diskerWriteAttempts(IpcIoMsg
&ipcIo
)
681 const char *buf
= Ipc::Mem::PagePointer(ipcIo
.page
);
682 size_t toWrite
= min(ipcIo
.len
, Ipc::Mem::PageSize());
683 size_t wroteSoFar
= 0;
684 off_t offset
= ipcIo
.offset
;
685 // Partial writes to disk do happen. It is unlikely that the caller can
686 // handle partial writes by doing something other than writing leftovers
687 // again, so we try to write them ourselves to minimize overheads.
688 const int attemptLimit
= 10;
689 for (int attempts
= 1; attempts
<= attemptLimit
; ++attempts
) {
690 const ssize_t result
= pwrite(TheFile
, buf
, toWrite
, offset
);
691 ++statCounter
.syscalls
.disk
.writes
;
692 fd_bytes(TheFile
, result
, FD_WRITE
);
695 ipcIo
.xerrno
= errno
;
696 assert(ipcIo
.xerrno
);
697 debugs(47, DBG_IMPORTANT
, "ERROR: " << DbName
<< " failure" <<
698 " writing " << toWrite
<< '/' << ipcIo
.len
<<
699 " at " << ipcIo
.offset
<< '+' << wroteSoFar
<<
700 " on " << attempts
<< " try: " << xstrerr(ipcIo
.xerrno
));
701 ipcIo
.len
= wroteSoFar
;
702 return; // bail on error
705 const size_t wroteNow
= static_cast<size_t>(result
); // result >= 0
708 debugs(47,3, "disker" << KidIdentifier
<< " wrote " <<
709 (wroteNow
>= toWrite
? "all " : "just ") << wroteNow
<<
710 " out of " << toWrite
<< '/' << ipcIo
.len
<< " at " <<
711 ipcIo
.offset
<< '+' << wroteSoFar
<< " on " << attempts
<<
714 wroteSoFar
+= wroteNow
;
716 if (wroteNow
>= toWrite
) {
718 ipcIo
.len
= wroteSoFar
;
719 return; // wrote everything there was to write
727 debugs(47, DBG_IMPORTANT
, "ERROR: " << DbName
<< " exhausted all " <<
728 attemptLimit
<< " attempts while writing " <<
729 toWrite
<< '/' << ipcIo
.len
<< " at " << ipcIo
.offset
<< '+' <<
731 return; // not a fatal I/O error, unless the caller treats it as such
735 diskerWrite(IpcIoMsg
&ipcIo
)
737 diskerWriteAttempts(ipcIo
); // may fail
738 Ipc::Mem::PutPage(ipcIo
.page
);
742 IpcIoFile::DiskerHandleMoreRequests(void *source
)
744 debugs(47, 7, HERE
<< "resuming handling requests after " <<
745 static_cast<const char *>(source
));
746 DiskerHandleMoreRequestsScheduled
= false;
747 IpcIoFile::DiskerHandleRequests();
751 IpcIoFile::WaitBeforePop()
753 const Ipc::QueueReader::Rate::Value ioRate
= queue
->localRateLimit();
754 const double maxRate
= ioRate
/1e3
; // req/ms
756 // do we need to enforce configured I/O rate?
760 // is there an I/O request we could potentially delay?
763 if (!queue
->peek(processId
, ipcIo
)) {
764 // unlike pop(), peek() is not reliable and does not block reader
765 // so we must proceed with pop() even if it is likely to fail
769 static timeval LastIo
= current_time
;
771 const double ioDuration
= 1.0 / maxRate
; // ideal distance between two I/Os
772 // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
773 const int64_t maxImbalance
= min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration
));
775 const double credit
= ioDuration
; // what the last I/O should have cost us
776 const double debit
= tvSubMsec(LastIo
, current_time
); // actual distance from the last I/O
777 LastIo
= current_time
;
779 Ipc::QueueReader::Balance
&balance
= queue
->localBalance();
780 balance
+= static_cast<int64_t>(credit
- debit
);
782 debugs(47, 7, HERE
<< "rate limiting balance: " << balance
<< " after +" << credit
<< " -" << debit
);
784 if (ipcIo
.command
== IpcIo::cmdWrite
&& balance
> maxImbalance
) {
785 // if the next request is (likely) write and we accumulated
786 // too much time for future slow I/Os, then shed accumulated
787 // time to keep just half of the excess
788 const int64_t toSpend
= balance
- maxImbalance
/2;
790 if (toSpend
/1e3
> Timeout
)
791 debugs(47, DBG_IMPORTANT
, "WARNING: " << DbName
<< " delays " <<
792 "I/O requests for " << (toSpend
/1e3
) << " seconds " <<
793 "to obey " << ioRate
<< "/sec rate limit");
795 debugs(47, 3, HERE
<< "rate limiting by " << toSpend
<< " ms to get" <<
796 (1e3
*maxRate
) << "/sec rate");
797 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
798 &IpcIoFile::DiskerHandleMoreRequests
,
799 const_cast<char*>("rate limiting"),
800 toSpend
/1e3
, 0, false);
801 DiskerHandleMoreRequestsScheduled
= true;
803 } else if (balance
< -maxImbalance
) {
804 // do not owe "too much" to avoid "too large" bursts of I/O
805 balance
= -maxImbalance
;
812 IpcIoFile::DiskerHandleRequests()
814 // Balance our desire to maximize the number of concurrent I/O requests
815 // (reordred by OS to minimize seek time) with a requirement to
816 // send 1st-I/O notification messages, process Coordinator events, etc.
817 const int maxSpentMsec
= 10; // keep small: most RAM I/Os are under 1ms
818 const timeval loopStart
= current_time
;
823 while (!WaitBeforePop() && queue
->pop(workerId
, ipcIo
)) {
826 // at least one I/O per call is guaranteed if the queue is not empty
827 DiskerHandleRequest(workerId
, ipcIo
);
830 const double elapsedMsec
= tvSubMsec(loopStart
, current_time
);
831 if (elapsedMsec
> maxSpentMsec
|| elapsedMsec
< 0) {
832 if (!DiskerHandleMoreRequestsScheduled
) {
833 // the gap must be positive for select(2) to be given a chance
834 const double minBreakSecs
= 0.001;
835 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
836 &IpcIoFile::DiskerHandleMoreRequests
,
837 const_cast<char*>("long I/O loop"),
838 minBreakSecs
, 0, false);
839 DiskerHandleMoreRequestsScheduled
= true;
841 debugs(47, 3, HERE
<< "pausing after " << popped
<< " I/Os in " <<
842 elapsedMsec
<< "ms; " << (elapsedMsec
/popped
) << "ms per I/O");
847 // TODO: consider using O_DIRECT with "elevator" optimization where we pop
848 // requests first, then reorder the popped requests to optimize seek time,
849 // then do I/O, then take a break, and come back for the next set of I/O
853 /// called when disker receives an I/O request
855 IpcIoFile::DiskerHandleRequest(const int workerId
, IpcIoMsg
&ipcIo
)
857 if (ipcIo
.command
!= IpcIo::cmdRead
&& ipcIo
.command
!= IpcIo::cmdWrite
) {
858 debugs(0, DBG_CRITICAL
, "ERROR: " << DbName
<<
859 " should not receive " << ipcIo
.command
<<
860 " ipcIo" << workerId
<< '.' << ipcIo
.requestId
);
864 debugs(47,5, HERE
<< "disker" << KidIdentifier
<<
865 (ipcIo
.command
== IpcIo::cmdRead
? " reads " : " writes ") <<
866 ipcIo
.len
<< " at " << ipcIo
.offset
<<
867 " ipcIo" << workerId
<< '.' << ipcIo
.requestId
);
869 if (ipcIo
.command
== IpcIo::cmdRead
)
871 else // ipcIo.command == IpcIo::cmdWrite
874 debugs(47, 7, HERE
<< "pushing " << SipcIo(workerId
, ipcIo
, KidIdentifier
));
877 if (queue
->push(workerId
, ipcIo
))
878 Notify(workerId
); // must notify worker
879 } catch (const Queue::Full
&) {
880 // The worker queue should not overflow because the worker should pop()
881 // before push()ing and because if disker pops N requests at a time,
882 // we should make sure the worker pop() queue length is the worker
883 // push queue length plus N+1. XXX: implement the N+1 difference.
884 debugs(47, DBG_IMPORTANT
, "BUG: Worker I/O pop queue for " <<
885 DbName
<< " overflow: " <<
886 SipcIo(workerId
, ipcIo
, KidIdentifier
)); // TODO: report queue len
888 // the I/O request we could not push will timeout
893 DiskerOpen(const SBuf
&path
, int flags
, mode_t mode
)
898 TheFile
= file_open(DbName
.c_str(), flags
);
901 const int xerrno
= errno
;
902 debugs(47, DBG_CRITICAL
, "ERROR: cannot open " << DbName
<< ": " <<
907 ++store_open_disk_fd
;
908 debugs(79,3, "rock db opened " << DbName
<< ": FD " << TheFile
);
913 DiskerClose(const SBuf
&path
)
917 debugs(79,3, HERE
<< "rock db closed " << path
<< ": FD " << TheFile
);
919 --store_open_disk_fd
;
924 /// reports our needs for shared memory pages to Ipc::Mem::Pages
925 /// and initializes shared memory segments used by IpcIoFile
926 class IpcIoRr
: public Ipc::Mem::RegisteredRunner
929 /* RegisteredRunner API */
930 IpcIoRr(): owner(NULL
) {}
932 virtual void claimMemoryNeeds();
935 /* Ipc::Mem::RegisteredRunner API */
936 virtual void create();
939 Ipc::FewToFewBiQueue::Owner
*owner
;
942 RunnerRegistrationEntry(IpcIoRr
);
945 IpcIoRr::claimMemoryNeeds()
947 const int itemsCount
= Ipc::FewToFewBiQueue::MaxItemsCount(
948 ::Config
.workers
, ::Config
.cacheSwap
.n_strands
, QueueCapacity
);
949 // the maximum number of shared I/O pages is approximately the
950 // number of queue slots, we add a fudge factor to that to account
951 // for corner cases where I/O pages are created before queue
952 // limits are checked or destroyed long after the I/O is dequeued
953 Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage
,
954 static_cast<int>(itemsCount
* 1.1));
960 if (Config
.cacheSwap
.n_strands
<= 0)
964 owner
= Ipc::FewToFewBiQueue::Init(ShmLabel
, Config
.workers
, 1,
965 Config
.cacheSwap
.n_strands
,
966 1 + Config
.workers
, sizeof(IpcIoMsg
),