2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 47 Store Directory Routines */
12 #include "base/RunnersRegistry.h"
13 #include "base/TextException.h"
15 #include "DiskIO/IORequestor.h"
16 #include "DiskIO/IpcIo/IpcIoFile.h"
17 #include "DiskIO/ReadRequest.h"
18 #include "DiskIO/WriteRequest.h"
21 #include "ipc/mem/Pages.h"
22 #include "ipc/Messages.h"
24 #include "ipc/Queue.h"
25 #include "ipc/StrandSearch.h"
26 #include "ipc/UdsOp.h"
28 #include "SquidConfig.h"
29 #include "SquidTime.h"
30 #include "StatCounters.h"
35 CBDATA_CLASS_INIT(IpcIoFile
);
37 /// shared memory segment path to use for IpcIoFile maps
38 static const char *const ShmLabel
= "io_file";
39 /// a single worker-to-disker or disker-to-worker queue capacity; up
40 /// to 2*QueueCapacity I/O requests queued between a single worker and
42 // TODO: make configurable or compute from squid.conf settings if possible
43 static const int QueueCapacity
= 1024;
45 const double IpcIoFile::Timeout
= 7; // seconds; XXX: ALL,9 may require more
46 IpcIoFile::IpcIoFileList
IpcIoFile::WaitingForOpen
;
47 IpcIoFile::IpcIoFilesMap
IpcIoFile::IpcIoFiles
;
48 std::unique_ptr
<IpcIoFile::Queue
> IpcIoFile::queue
;
50 bool IpcIoFile::DiskerHandleMoreRequestsScheduled
= false;
52 static bool DiskerOpen(const SBuf
&path
, int flags
, mode_t mode
);
53 static void DiskerClose(const SBuf
&path
);
55 /// IpcIo wrapper for debugs() streams; XXX: find a better class name
57 SipcIo(int aWorker
, const IpcIoMsg
&aMsg
, int aDisker
):
58 worker(aWorker
), msg(aMsg
), disker(aDisker
) {}
66 operator <<(std::ostream
&os
, const SipcIo
&sio
)
68 return os
<< "ipcIo" << sio
.worker
<< '.' << sio
.msg
.requestId
<<
69 (sio
.msg
.command
== IpcIo::cmdRead
? 'r' : 'w') << sio
.disker
;
72 IpcIoFile::IpcIoFile(char const *aDb
):
73 dbName(aDb
), diskId(-1), error_(false), lastRequestId(0),
74 olderRequests(&requestMap1
), newerRequests(&requestMap2
),
75 timeoutCheckScheduled(false)
79 IpcIoFile::~IpcIoFile()
82 const IpcIoFilesMap::iterator i
= IpcIoFiles
.find(diskId
);
83 // XXX: warn and continue?
84 Must(i
!= IpcIoFiles
.end());
85 Must(i
->second
== this);
91 IpcIoFile::configure(const Config
&cfg
)
93 DiskFile::configure(cfg
);
98 IpcIoFile::open(int flags
, mode_t mode
, RefCount
<IORequestor
> callback
)
100 ioRequestor
= callback
;
101 Must(diskId
< 0); // we do not know our disker yet
104 queue
.reset(new Queue(ShmLabel
, IamWorkerProcess() ? Queue::groupA
: Queue::groupB
, KidIdentifier
));
106 if (IamDiskProcess()) {
107 error_
= !DiskerOpen(SBuf(dbName
.termedBuf()), flags
, mode
);
111 diskId
= KidIdentifier
;
112 const bool inserted
=
113 IpcIoFiles
.insert(std::make_pair(diskId
, this)).second
;
116 queue
->localRateLimit().store(config
.ioRate
);
118 Ipc::HereIamMessage
ann(Ipc::StrandCoord(KidIdentifier
, getpid()));
119 ann
.strand
.tag
= dbName
;
120 Ipc::TypedMsgHdr message
;
122 SendMessage(Ipc::Port::CoordinatorAddr(), message
);
124 ioRequestor
->ioCompletedNotification();
128 Ipc::StrandSearchRequest request
;
129 request
.requestorId
= KidIdentifier
;
130 request
.tag
= dbName
;
132 Ipc::TypedMsgHdr msg
;
134 Ipc::SendMessage(Ipc::Port::CoordinatorAddr(), msg
);
136 WaitingForOpen
.push_back(this);
138 eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout
,
139 this, Timeout
, 0, false); // "this" pointer is used as id
143 IpcIoFile::openCompleted(const Ipc::StrandSearchResponse
*const response
)
145 Must(diskId
< 0); // we do not know our disker yet
148 debugs(79, DBG_IMPORTANT
, "ERROR: " << dbName
<< " communication " <<
149 "channel establishment timeout");
152 diskId
= response
->strand
.kidId
;
154 const bool inserted
=
155 IpcIoFiles
.insert(std::make_pair(diskId
, this)).second
;
159 debugs(79, DBG_IMPORTANT
, "ERROR: no disker claimed " <<
160 "responsibility for " << dbName
);
164 ioRequestor
->ioCompletedNotification();
168 * Alias for IpcIoFile::open(...)
169 \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
172 IpcIoFile::create(int flags
, mode_t mode
, RefCount
<IORequestor
> callback
)
174 assert(false); // check
175 /* We use the same logic path for open */
176 open(flags
, mode
, callback
);
182 assert(ioRequestor
!= NULL
);
184 if (IamDiskProcess())
185 DiskerClose(SBuf(dbName
.termedBuf()));
186 // XXX: else nothing to do?
188 ioRequestor
->closeCompleted();
192 IpcIoFile::canRead() const
194 return diskId
>= 0 && !error_
&& canWait();
198 IpcIoFile::canWrite() const
200 return diskId
>= 0 && !error_
&& canWait();
204 IpcIoFile::error() const
210 IpcIoFile::read(ReadRequest
*readRequest
)
212 debugs(79,3, HERE
<< "(disker" << diskId
<< ", " << readRequest
->len
<< ", " <<
213 readRequest
->offset
<< ")");
215 assert(ioRequestor
!= NULL
);
216 assert(readRequest
->offset
>= 0);
219 //assert(minOffset < 0 || minOffset <= readRequest->offset);
220 //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
222 IpcIoPendingRequest
*const pending
= new IpcIoPendingRequest(this);
223 pending
->readRequest
= readRequest
;
228 IpcIoFile::readCompleted(ReadRequest
*readRequest
,
229 IpcIoMsg
*const response
)
231 bool ioError
= false;
233 debugs(79, 3, HERE
<< "error: timeout");
234 ioError
= true; // I/O timeout does not warrant setting error_?
236 if (response
->xerrno
) {
237 debugs(79, DBG_IMPORTANT
, "ERROR: " << dbName
<< " read: " <<
238 xstrerr(response
->xerrno
));
239 ioError
= error_
= true;
240 } else if (!response
->page
) {
241 debugs(79, DBG_IMPORTANT
, "ERROR: " << dbName
<< " read ran " <<
242 "out of shared memory pages");
245 const char *const buf
= Ipc::Mem::PagePointer(response
->page
);
246 memcpy(readRequest
->buf
, buf
, response
->len
);
249 Ipc::Mem::PutPage(response
->page
);
252 const ssize_t rlen
= ioError
? -1 : (ssize_t
)readRequest
->len
;
253 const int errflag
= ioError
? DISK_ERROR
:DISK_OK
;
254 ioRequestor
->readCompleted(readRequest
->buf
, rlen
, errflag
, readRequest
);
258 IpcIoFile::write(WriteRequest
*writeRequest
)
260 debugs(79,3, HERE
<< "(disker" << diskId
<< ", " << writeRequest
->len
<< ", " <<
261 writeRequest
->offset
<< ")");
263 assert(ioRequestor
!= NULL
);
264 assert(writeRequest
->len
> 0); // TODO: work around mmap failures on zero-len?
265 assert(writeRequest
->offset
>= 0);
268 //assert(minOffset < 0 || minOffset <= writeRequest->offset);
269 //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
271 IpcIoPendingRequest
*const pending
= new IpcIoPendingRequest(this);
272 pending
->writeRequest
= writeRequest
;
277 IpcIoFile::writeCompleted(WriteRequest
*writeRequest
,
278 const IpcIoMsg
*const response
)
280 bool ioError
= false;
282 debugs(79, 3, "disker " << diskId
<< " timeout");
283 ioError
= true; // I/O timeout does not warrant setting error_?
284 } else if (response
->xerrno
) {
285 debugs(79, DBG_IMPORTANT
, "ERROR: disker " << diskId
<<
286 " error writing " << writeRequest
->len
<< " bytes at " <<
287 writeRequest
->offset
<< ": " << xstrerr(response
->xerrno
) <<
288 "; this worker will stop using " << dbName
);
289 ioError
= error_
= true;
290 } else if (response
->len
!= writeRequest
->len
) {
291 debugs(79, DBG_IMPORTANT
, "ERROR: disker " << diskId
<< " wrote " <<
292 response
->len
<< " instead of " << writeRequest
->len
<<
293 " bytes (offset " << writeRequest
->offset
<< "); " <<
294 "this worker will stop using " << dbName
);
298 if (writeRequest
->free_func
)
299 (writeRequest
->free_func
)(const_cast<char*>(writeRequest
->buf
)); // broken API?
302 debugs(79,5, HERE
<< "wrote " << writeRequest
->len
<< " to disker" <<
303 diskId
<< " at " << writeRequest
->offset
);
306 const ssize_t rlen
= ioError
? 0 : (ssize_t
)writeRequest
->len
;
307 const int errflag
= ioError
? DISK_ERROR
:DISK_OK
;
308 ioRequestor
->writeCompleted(errflag
, rlen
, writeRequest
);
312 IpcIoFile::ioInProgress() const
314 return !olderRequests
->empty() || !newerRequests
->empty();
317 /// track a new pending request
319 IpcIoFile::trackPendingRequest(const unsigned int id
, IpcIoPendingRequest
*const pending
)
321 const std::pair
<RequestMap::iterator
,bool> result
=
322 newerRequests
->insert(std::make_pair(id
, pending
));
323 Must(result
.second
); // failures means that id was not unique
324 if (!timeoutCheckScheduled
)
325 scheduleTimeoutCheck();
328 /// push an I/O request to disker
330 IpcIoFile::push(IpcIoPendingRequest
*const pending
)
332 // prevent queue overflows: check for responses to earlier requests
333 // warning: this call may result in indirect push() recursion
334 HandleResponses("before push");
339 Must(pending
->readRequest
|| pending
->writeRequest
);
343 if (++lastRequestId
== 0) // don't use zero value as requestId
345 ipcIo
.requestId
= lastRequestId
;
346 ipcIo
.start
= current_time
;
347 if (pending
->readRequest
) {
348 ipcIo
.command
= IpcIo::cmdRead
;
349 ipcIo
.offset
= pending
->readRequest
->offset
;
350 ipcIo
.len
= pending
->readRequest
->len
;
351 } else { // pending->writeRequest
352 Must(pending
->writeRequest
->len
<= Ipc::Mem::PageSize());
353 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage
, ipcIo
.page
)) {
355 throw TexcHere("run out of shared memory pages for IPC I/O");
357 ipcIo
.command
= IpcIo::cmdWrite
;
358 ipcIo
.offset
= pending
->writeRequest
->offset
;
359 ipcIo
.len
= pending
->writeRequest
->len
;
360 char *const buf
= Ipc::Mem::PagePointer(ipcIo
.page
);
361 memcpy(buf
, pending
->writeRequest
->buf
, ipcIo
.len
); // optimize away
364 debugs(47, 7, HERE
<< "pushing " << SipcIo(KidIdentifier
, ipcIo
, diskId
));
366 if (queue
->push(diskId
, ipcIo
))
367 Notify(diskId
); // must notify disker
368 trackPendingRequest(ipcIo
.requestId
, pending
);
369 } catch (const Queue::Full
&) {
370 debugs(47, DBG_IMPORTANT
, "ERROR: worker I/O push queue for " <<
371 dbName
<< " overflow: " <<
372 SipcIo(KidIdentifier
, ipcIo
, diskId
)); // TODO: report queue len
373 // TODO: grow queue size
375 pending
->completeIo(NULL
);
377 } catch (const TextException
&e
) {
378 debugs(47, DBG_IMPORTANT
, "ERROR: " << dbName
<< " exception: " << e
.what());
379 pending
->completeIo(NULL
);
384 /// whether we think there is enough time to complete the I/O
386 IpcIoFile::canWait() const
388 if (!config
.ioTimeout
)
389 return true; // no timeout specified
392 if (!queue
->findOldest(diskId
, oldestIo
) || oldestIo
.start
.tv_sec
<= 0)
393 return true; // we cannot estimate expected wait time; assume it is OK
395 const int oldestWait
= tvSubMsec(oldestIo
.start
, current_time
);
397 int rateWait
= -1; // time in millisecons
398 const int ioRate
= queue
->rateLimit(diskId
).load();
400 // if there are N requests pending, the new one will wait at
401 // least N/max-swap-rate seconds
402 rateWait
= static_cast<int>(1e3
* queue
->outSize(diskId
) / ioRate
);
403 // adjust N/max-swap-rate value based on the queue "balance"
404 // member, in case we have been borrowing time against future
406 rateWait
+= queue
->balance(diskId
);
409 const int expectedWait
= max(oldestWait
, rateWait
);
410 if (expectedWait
< 0 ||
411 static_cast<time_msec_t
>(expectedWait
) < config
.ioTimeout
)
412 return true; // expected wait time is acceptible
414 debugs(47,2, HERE
<< "cannot wait: " << expectedWait
<<
415 " oldest: " << SipcIo(KidIdentifier
, oldestIo
, diskId
));
416 return false; // do not want to wait that long
419 /// called when coordinator responds to worker open request
421 IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse
&response
)
423 debugs(47, 7, HERE
<< "coordinator response to open request");
424 for (IpcIoFileList::iterator i
= WaitingForOpen
.begin();
425 i
!= WaitingForOpen
.end(); ++i
) {
426 if (response
.strand
.tag
== (*i
)->dbName
) {
427 (*i
)->openCompleted(&response
);
428 WaitingForOpen
.erase(i
);
433 debugs(47, 4, HERE
<< "LATE disker response to open for " <<
434 response
.strand
.tag
);
435 // nothing we can do about it; completeIo() has been called already
439 IpcIoFile::HandleResponses(const char *const when
)
441 debugs(47, 4, HERE
<< "popping all " << when
);
443 // get all responses we can: since we are not pushing, this will stop
445 while (queue
->pop(diskId
, ipcIo
)) {
446 const IpcIoFilesMap::const_iterator i
= IpcIoFiles
.find(diskId
);
447 Must(i
!= IpcIoFiles
.end()); // TODO: warn but continue
448 i
->second
->handleResponse(ipcIo
);
453 IpcIoFile::handleResponse(IpcIoMsg
&ipcIo
)
455 const int requestId
= ipcIo
.requestId
;
456 debugs(47, 7, HERE
<< "popped disker response: " <<
457 SipcIo(KidIdentifier
, ipcIo
, diskId
));
460 if (IpcIoPendingRequest
*const pending
= dequeueRequest(requestId
)) {
461 pending
->completeIo(&ipcIo
);
462 delete pending
; // XXX: leaking if throwing
464 debugs(47, 4, HERE
<< "LATE disker response to " << ipcIo
.command
<<
465 "; ipcIo" << KidIdentifier
<< '.' << requestId
);
466 // nothing we can do about it; completeIo() has been called already
471 IpcIoFile::Notify(const int peerId
)
473 // TODO: Count and report the total number of notifications, pops, pushes.
474 debugs(47, 7, HERE
<< "kid" << peerId
);
475 Ipc::TypedMsgHdr msg
;
476 msg
.setType(Ipc::mtIpcIoNotification
); // TODO: add proper message type?
477 msg
.putInt(KidIdentifier
);
478 const String addr
= Ipc::Port::MakeAddr(Ipc::strandAddrLabel
, peerId
);
479 Ipc::SendMessage(addr
, msg
);
483 IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr
&msg
)
485 const int from
= msg
.getInt();
486 debugs(47, 7, HERE
<< "from " << from
);
487 queue
->clearReaderSignal(from
);
488 if (IamDiskProcess())
489 DiskerHandleRequests();
491 HandleResponses("after notification");
494 /// handles open request timeout
496 IpcIoFile::OpenTimeout(void *const param
)
499 // the pointer is used for comparison only and not dereferenced
500 const IpcIoFile
*const ipcIoFile
=
501 reinterpret_cast<const IpcIoFile
*>(param
);
502 for (IpcIoFileList::iterator i
= WaitingForOpen
.begin();
503 i
!= WaitingForOpen
.end(); ++i
) {
504 if (*i
== ipcIoFile
) {
505 (*i
)->openCompleted(NULL
);
506 WaitingForOpen
.erase(i
);
512 /// IpcIoFile::checkTimeouts wrapper
514 IpcIoFile::CheckTimeouts(void *const param
)
517 const int diskId
= reinterpret_cast<uintptr_t>(param
);
518 debugs(47, 7, HERE
<< "diskId=" << diskId
);
519 const IpcIoFilesMap::const_iterator i
= IpcIoFiles
.find(diskId
);
520 if (i
!= IpcIoFiles
.end())
521 i
->second
->checkTimeouts();
525 IpcIoFile::checkTimeouts()
527 timeoutCheckScheduled
= false;
529 // last chance to recover in case a notification message was lost, etc.
530 const RequestMap::size_type timeoutsBefore
= olderRequests
->size();
531 HandleResponses("before timeout");
532 const RequestMap::size_type timeoutsNow
= olderRequests
->size();
534 if (timeoutsBefore
> timeoutsNow
) { // some requests were rescued
535 // notification message lost or significantly delayed?
536 debugs(47, DBG_IMPORTANT
, "WARNING: communication with " << dbName
<<
537 " may be too slow or disrupted for about " <<
538 Timeout
<< "s; rescued " << (timeoutsBefore
- timeoutsNow
) <<
539 " out of " << timeoutsBefore
<< " I/Os");
543 debugs(47, DBG_IMPORTANT
, "WARNING: abandoning " <<
544 timeoutsNow
<< ' ' << dbName
<< " I/Os after at least " <<
545 Timeout
<< "s timeout");
548 // any old request would have timed out by now
549 typedef RequestMap::const_iterator RMCI
;
550 for (RMCI i
= olderRequests
->begin(); i
!= olderRequests
->end(); ++i
) {
551 IpcIoPendingRequest
*const pending
= i
->second
;
553 const unsigned int requestId
= i
->first
;
554 debugs(47, 7, HERE
<< "disker timeout; ipcIo" <<
555 KidIdentifier
<< '.' << requestId
);
557 pending
->completeIo(NULL
); // no response
558 delete pending
; // XXX: leaking if throwing
560 olderRequests
->clear();
562 swap(olderRequests
, newerRequests
); // switches pointers around
563 if (!olderRequests
->empty() && !timeoutCheckScheduled
)
564 scheduleTimeoutCheck();
567 /// prepare to check for timeouts in a little while
569 IpcIoFile::scheduleTimeoutCheck()
571 // we check all older requests at once so some may be wait for 2*Timeout
572 eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts
,
573 reinterpret_cast<void *>(diskId
), Timeout
, 0, false);
574 timeoutCheckScheduled
= true;
577 /// returns and forgets the right IpcIoFile pending request
578 IpcIoPendingRequest
*
579 IpcIoFile::dequeueRequest(const unsigned int requestId
)
581 Must(requestId
!= 0);
583 RequestMap
*map
= NULL
;
584 RequestMap::iterator i
= requestMap1
.find(requestId
);
586 if (i
!= requestMap1
.end())
589 i
= requestMap2
.find(requestId
);
590 if (i
!= requestMap2
.end())
594 if (!map
) // not found in both maps
597 IpcIoPendingRequest
*pending
= i
->second
;
603 IpcIoFile::getFD() const
605 assert(false); // not supported; TODO: remove this method from API
611 IpcIoMsg::IpcIoMsg():
615 command(IpcIo::cmdNone
),
622 /* IpcIoPendingRequest */
624 IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer
&aFile
):
625 file(aFile
), readRequest(NULL
), writeRequest(NULL
)
630 IpcIoPendingRequest::completeIo(IpcIoMsg
*const response
)
633 file
->readCompleted(readRequest
, response
);
634 else if (writeRequest
)
635 file
->writeCompleted(writeRequest
, response
);
637 Must(!response
); // only timeouts are handled here
638 file
->openCompleted(NULL
);
642 /* XXX: disker code that should probably be moved elsewhere */
644 static SBuf DbName
; ///< full db file name
645 static int TheFile
= -1; ///< db file descriptor
648 diskerRead(IpcIoMsg
&ipcIo
)
650 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage
, ipcIo
.page
)) {
652 debugs(47,2, HERE
<< "run out of shared memory pages for IPC I/O");
656 char *const buf
= Ipc::Mem::PagePointer(ipcIo
.page
);
657 const ssize_t read
= pread(TheFile
, buf
, min(ipcIo
.len
, Ipc::Mem::PageSize()), ipcIo
.offset
);
658 ++statCounter
.syscalls
.disk
.reads
;
659 fd_bytes(TheFile
, read
, FD_READ
);
663 const size_t len
= static_cast<size_t>(read
); // safe because read > 0
664 debugs(47,8, HERE
<< "disker" << KidIdentifier
<< " read " <<
665 (len
== ipcIo
.len
? "all " : "just ") << read
);
668 ipcIo
.xerrno
= errno
;
670 debugs(47,5, HERE
<< "disker" << KidIdentifier
<< " read error: " <<
675 /// Tries to write buffer to disk (a few times if needed);
676 /// sets ipcIo results, but does no cleanup. The caller must cleanup.
678 diskerWriteAttempts(IpcIoMsg
&ipcIo
)
680 const char *buf
= Ipc::Mem::PagePointer(ipcIo
.page
);
681 size_t toWrite
= min(ipcIo
.len
, Ipc::Mem::PageSize());
682 size_t wroteSoFar
= 0;
683 off_t offset
= ipcIo
.offset
;
684 // Partial writes to disk do happen. It is unlikely that the caller can
685 // handle partial writes by doing something other than writing leftovers
686 // again, so we try to write them ourselves to minimize overheads.
687 const int attemptLimit
= 10;
688 for (int attempts
= 1; attempts
<= attemptLimit
; ++attempts
) {
689 const ssize_t result
= pwrite(TheFile
, buf
, toWrite
, offset
);
690 ++statCounter
.syscalls
.disk
.writes
;
691 fd_bytes(TheFile
, result
, FD_WRITE
);
694 ipcIo
.xerrno
= errno
;
695 assert(ipcIo
.xerrno
);
696 debugs(47, DBG_IMPORTANT
, "ERROR: " << DbName
<< " failure" <<
697 " writing " << toWrite
<< '/' << ipcIo
.len
<<
698 " at " << ipcIo
.offset
<< '+' << wroteSoFar
<<
699 " on " << attempts
<< " try: " << xstrerr(ipcIo
.xerrno
));
700 ipcIo
.len
= wroteSoFar
;
701 return; // bail on error
704 const size_t wroteNow
= static_cast<size_t>(result
); // result >= 0
707 debugs(47,3, "disker" << KidIdentifier
<< " wrote " <<
708 (wroteNow
>= toWrite
? "all " : "just ") << wroteNow
<<
709 " out of " << toWrite
<< '/' << ipcIo
.len
<< " at " <<
710 ipcIo
.offset
<< '+' << wroteSoFar
<< " on " << attempts
<<
713 wroteSoFar
+= wroteNow
;
715 if (wroteNow
>= toWrite
) {
717 ipcIo
.len
= wroteSoFar
;
718 return; // wrote everything there was to write
726 debugs(47, DBG_IMPORTANT
, "ERROR: " << DbName
<< " exhausted all " <<
727 attemptLimit
<< " attempts while writing " <<
728 toWrite
<< '/' << ipcIo
.len
<< " at " << ipcIo
.offset
<< '+' <<
730 return; // not a fatal I/O error, unless the caller treats it as such
734 diskerWrite(IpcIoMsg
&ipcIo
)
736 diskerWriteAttempts(ipcIo
); // may fail
737 Ipc::Mem::PutPage(ipcIo
.page
);
741 IpcIoFile::DiskerHandleMoreRequests(void *source
)
743 debugs(47, 7, HERE
<< "resuming handling requests after " <<
744 static_cast<const char *>(source
));
745 DiskerHandleMoreRequestsScheduled
= false;
746 IpcIoFile::DiskerHandleRequests();
750 IpcIoFile::WaitBeforePop()
752 const int ioRate
= queue
->localRateLimit().load();
753 const double maxRate
= ioRate
/1e3
; // req/ms
755 // do we need to enforce configured I/O rate?
759 // is there an I/O request we could potentially delay?
762 if (!queue
->peek(processId
, ipcIo
)) {
763 // unlike pop(), peek() is not reliable and does not block reader
764 // so we must proceed with pop() even if it is likely to fail
768 static timeval LastIo
= current_time
;
770 const double ioDuration
= 1.0 / maxRate
; // ideal distance between two I/Os
771 // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
772 const int64_t maxImbalance
= min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration
));
774 const double credit
= ioDuration
; // what the last I/O should have cost us
775 const double debit
= tvSubMsec(LastIo
, current_time
); // actual distance from the last I/O
776 LastIo
= current_time
;
778 Ipc::QueueReader::Balance
&balance
= queue
->localBalance();
779 balance
+= static_cast<int64_t>(credit
- debit
);
781 debugs(47, 7, HERE
<< "rate limiting balance: " << balance
<< " after +" << credit
<< " -" << debit
);
783 if (ipcIo
.command
== IpcIo::cmdWrite
&& balance
> maxImbalance
) {
784 // if the next request is (likely) write and we accumulated
785 // too much time for future slow I/Os, then shed accumulated
786 // time to keep just half of the excess
787 const int64_t toSpend
= balance
- maxImbalance
/2;
789 if (toSpend
/1e3
> Timeout
)
790 debugs(47, DBG_IMPORTANT
, "WARNING: " << DbName
<< " delays " <<
791 "I/O requests for " << (toSpend
/1e3
) << " seconds " <<
792 "to obey " << ioRate
<< "/sec rate limit");
794 debugs(47, 3, HERE
<< "rate limiting by " << toSpend
<< " ms to get" <<
795 (1e3
*maxRate
) << "/sec rate");
796 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
797 &IpcIoFile::DiskerHandleMoreRequests
,
798 const_cast<char*>("rate limiting"),
799 toSpend
/1e3
, 0, false);
800 DiskerHandleMoreRequestsScheduled
= true;
802 } else if (balance
< -maxImbalance
) {
803 // do not owe "too much" to avoid "too large" bursts of I/O
804 balance
= -maxImbalance
;
811 IpcIoFile::DiskerHandleRequests()
813 // Balance our desire to maximize the number of concurrent I/O requests
814 // (reordred by OS to minimize seek time) with a requirement to
815 // send 1st-I/O notification messages, process Coordinator events, etc.
816 const int maxSpentMsec
= 10; // keep small: most RAM I/Os are under 1ms
817 const timeval loopStart
= current_time
;
822 while (!WaitBeforePop() && queue
->pop(workerId
, ipcIo
)) {
825 // at least one I/O per call is guaranteed if the queue is not empty
826 DiskerHandleRequest(workerId
, ipcIo
);
829 const double elapsedMsec
= tvSubMsec(loopStart
, current_time
);
830 if (elapsedMsec
> maxSpentMsec
|| elapsedMsec
< 0) {
831 if (!DiskerHandleMoreRequestsScheduled
) {
832 // the gap must be positive for select(2) to be given a chance
833 const double minBreakSecs
= 0.001;
834 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
835 &IpcIoFile::DiskerHandleMoreRequests
,
836 const_cast<char*>("long I/O loop"),
837 minBreakSecs
, 0, false);
838 DiskerHandleMoreRequestsScheduled
= true;
840 debugs(47, 3, HERE
<< "pausing after " << popped
<< " I/Os in " <<
841 elapsedMsec
<< "ms; " << (elapsedMsec
/popped
) << "ms per I/O");
846 // TODO: consider using O_DIRECT with "elevator" optimization where we pop
847 // requests first, then reorder the popped requests to optimize seek time,
848 // then do I/O, then take a break, and come back for the next set of I/O
852 /// called when disker receives an I/O request
854 IpcIoFile::DiskerHandleRequest(const int workerId
, IpcIoMsg
&ipcIo
)
856 if (ipcIo
.command
!= IpcIo::cmdRead
&& ipcIo
.command
!= IpcIo::cmdWrite
) {
857 debugs(0, DBG_CRITICAL
, "ERROR: " << DbName
<<
858 " should not receive " << ipcIo
.command
<<
859 " ipcIo" << workerId
<< '.' << ipcIo
.requestId
);
863 debugs(47,5, HERE
<< "disker" << KidIdentifier
<<
864 (ipcIo
.command
== IpcIo::cmdRead
? " reads " : " writes ") <<
865 ipcIo
.len
<< " at " << ipcIo
.offset
<<
866 " ipcIo" << workerId
<< '.' << ipcIo
.requestId
);
868 if (ipcIo
.command
== IpcIo::cmdRead
)
870 else // ipcIo.command == IpcIo::cmdWrite
873 debugs(47, 7, HERE
<< "pushing " << SipcIo(workerId
, ipcIo
, KidIdentifier
));
876 if (queue
->push(workerId
, ipcIo
))
877 Notify(workerId
); // must notify worker
878 } catch (const Queue::Full
&) {
879 // The worker queue should not overflow because the worker should pop()
880 // before push()ing and because if disker pops N requests at a time,
881 // we should make sure the worker pop() queue length is the worker
882 // push queue length plus N+1. XXX: implement the N+1 difference.
883 debugs(47, DBG_IMPORTANT
, "BUG: Worker I/O pop queue for " <<
884 DbName
<< " overflow: " <<
885 SipcIo(workerId
, ipcIo
, KidIdentifier
)); // TODO: report queue len
887 // the I/O request we could not push will timeout
892 DiskerOpen(const SBuf
&path
, int flags
, mode_t
)
897 TheFile
= file_open(DbName
.c_str(), flags
);
900 const int xerrno
= errno
;
901 debugs(47, DBG_CRITICAL
, "ERROR: cannot open " << DbName
<< ": " <<
906 ++store_open_disk_fd
;
907 debugs(79,3, "rock db opened " << DbName
<< ": FD " << TheFile
);
912 DiskerClose(const SBuf
&path
)
916 debugs(79,3, HERE
<< "rock db closed " << path
<< ": FD " << TheFile
);
918 --store_open_disk_fd
;
923 /// reports our needs for shared memory pages to Ipc::Mem::Pages
924 /// and initializes shared memory segments used by IpcIoFile
925 class IpcIoRr
: public Ipc::Mem::RegisteredRunner
928 /* RegisteredRunner API */
929 IpcIoRr(): owner(NULL
) {}
931 virtual void claimMemoryNeeds();
934 /* Ipc::Mem::RegisteredRunner API */
935 virtual void create();
938 Ipc::FewToFewBiQueue::Owner
*owner
;
941 RunnerRegistrationEntry(IpcIoRr
);
944 IpcIoRr::claimMemoryNeeds()
946 const int itemsCount
= Ipc::FewToFewBiQueue::MaxItemsCount(
947 ::Config
.workers
, ::Config
.cacheSwap
.n_strands
, QueueCapacity
);
948 // the maximum number of shared I/O pages is approximately the
949 // number of queue slots, we add a fudge factor to that to account
950 // for corner cases where I/O pages are created before queue
951 // limits are checked or destroyed long after the I/O is dequeued
952 Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage
,
953 static_cast<int>(itemsCount
* 1.1));
959 if (Config
.cacheSwap
.n_strands
<= 0)
963 owner
= Ipc::FewToFewBiQueue::Init(ShmLabel
, Config
.workers
, 1,
964 Config
.cacheSwap
.n_strands
,
965 1 + Config
.workers
, sizeof(IpcIoMsg
),