2 * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 47 Store Directory Routines */
12 #include "base/AsyncFunCalls.h"
13 #include "base/CodeContext.h"
14 #include "base/RunnersRegistry.h"
15 #include "base/TextException.h"
16 #include "DiskIO/IORequestor.h"
17 #include "DiskIO/IpcIo/IpcIoFile.h"
18 #include "DiskIO/ReadRequest.h"
19 #include "DiskIO/WriteRequest.h"
23 #include "ipc/mem/Pages.h"
24 #include "ipc/Messages.h"
26 #include "ipc/Queue.h"
27 #include "ipc/StrandCoord.h"
28 #include "ipc/StrandSearch.h"
29 #include "ipc/UdsOp.h"
30 #include "sbuf/SBuf.h"
31 #include "SquidConfig.h"
32 #include "StatCounters.h"
37 CBDATA_CLASS_INIT(IpcIoFile
);
39 /// shared memory segment path to use for IpcIoFile maps
40 static const char *const ShmLabel
= "io_file";
41 /// a single worker-to-disker or disker-to-worker queue capacity; up
42 /// to 2*QueueCapacity I/O requests queued between a single worker and
44 // TODO: make configurable or compute from squid.conf settings if possible
45 static const int QueueCapacity
= 1024;
47 const double IpcIoFile::Timeout
= 7; // seconds; XXX: ALL,9 may require more
48 IpcIoFile::IpcIoFileList
IpcIoFile::WaitingForOpen
;
49 IpcIoFile::IpcIoFilesMap
IpcIoFile::IpcIoFiles
;
50 std::unique_ptr
<IpcIoFile::Queue
> IpcIoFile::queue
;
52 bool IpcIoFile::DiskerHandleMoreRequestsScheduled
= false;
54 static bool DiskerOpen(const SBuf
&path
, int flags
, mode_t mode
);
55 static void DiskerClose(const SBuf
&path
);
57 /// IpcIo wrapper for debugs() streams; XXX: find a better class name
59 SipcIo(int aWorker
, const IpcIoMsg
&aMsg
, int aDisker
):
60 worker(aWorker
), msg(aMsg
), disker(aDisker
) {}
68 operator <<(std::ostream
&os
, const SipcIo
&sio
)
70 return os
<< "ipcIo" << sio
.worker
<< '.' << sio
.msg
.requestId
<<
71 sio
.msg
.command
<< sio
.disker
;
77 IpcIo::operator <<(std::ostream
&os
, const Command command
)
90 return os
<< static_cast<int>(command
);
95 IpcIoFile::IpcIoFile(char const *aDb
):
101 olderRequests(&requestMap1
), newerRequests(&requestMap2
),
102 timeoutCheckScheduled(false)
107 IpcIoFile::~IpcIoFile()
111 const auto i
= IpcIoFiles
.find(diskId
);
112 Must(i
!= IpcIoFiles
.end());
113 Must(i
->second
== this);
120 IpcIoFile::configure(const Config
&cfg
)
122 DiskFile::configure(cfg
);
127 IpcIoFile::open(int flags
, mode_t mode
, RefCount
<IORequestor
> callback
)
129 ioRequestor
= callback
;
130 Must(diskId
< 0); // we do not know our disker yet
133 queue
.reset(new Queue(ShmLabel
, IamWorkerProcess() ? Queue::groupA
: Queue::groupB
, KidIdentifier
));
134 AsyncCall::Pointer call
= asyncCall(79, 4, "IpcIoFile::HandleMessagesAtStart",
135 NullaryFunDialer(&IpcIoFile::HandleMessagesAtStart
));
136 ScheduleCallHere(call
);
139 if (IamDiskProcess()) {
140 error_
= !DiskerOpen(SBuf(dbName
.termedBuf()), flags
, mode
);
144 diskId
= KidIdentifier
;
145 const bool inserted
=
146 IpcIoFiles
.insert(std::make_pair(diskId
, this)).second
;
149 queue
->localRateLimit().store(config
.ioRate
);
151 Ipc::StrandMessage::NotifyCoordinator(Ipc::mtRegisterStrand
, dbName
.termedBuf());
153 ioRequestor
->ioCompletedNotification();
157 const Ipc::StrandSearchRequest
request(dbName
);
158 Ipc::TypedMsgHdr msg
;
160 Ipc::SendMessage(Ipc::Port::CoordinatorAddr(), msg
);
162 WaitingForOpen
.push_back(this);
164 eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout
,
165 this, Timeout
, 0, false); // "this" pointer is used as id
169 IpcIoFile::openCompleted(const Ipc::StrandMessage
*const response
)
171 Must(diskId
< 0); // we do not know our disker yet
174 debugs(79, DBG_IMPORTANT
, "ERROR: " << dbName
<< " communication " <<
175 "channel establishment timeout");
178 diskId
= response
->strand
.kidId
;
180 const bool inserted
=
181 IpcIoFiles
.insert(std::make_pair(diskId
, this)).second
;
185 debugs(79, DBG_IMPORTANT
, "ERROR: no disker claimed " <<
186 "responsibility for " << dbName
);
190 ioRequestor
->ioCompletedNotification();
194 * Alias for IpcIoFile::open(...)
195 \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
198 IpcIoFile::create(int flags
, mode_t mode
, RefCount
<IORequestor
> callback
)
200 assert(false); // check
201 /* We use the same logic path for open */
202 open(flags
, mode
, callback
);
208 assert(ioRequestor
!= nullptr);
210 if (IamDiskProcess())
211 DiskerClose(SBuf(dbName
.termedBuf()));
212 // XXX: else nothing to do?
214 ioRequestor
->closeCompleted();
218 IpcIoFile::canRead() const
220 return diskId
>= 0 && !error_
&& canWait();
224 IpcIoFile::canWrite() const
226 return diskId
>= 0 && !error_
&& canWait();
230 IpcIoFile::error() const
236 IpcIoFile::read(ReadRequest
*readRequest
)
238 debugs(79,3, "(disker" << diskId
<< ", " << readRequest
->len
<< ", " <<
239 readRequest
->offset
<< ")");
241 assert(ioRequestor
!= nullptr);
242 assert(readRequest
->offset
>= 0);
245 //assert(minOffset < 0 || minOffset <= readRequest->offset);
246 //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
248 IpcIoPendingRequest
*const pending
= new IpcIoPendingRequest(this);
249 pending
->readRequest
= readRequest
;
254 IpcIoFile::readCompleted(ReadRequest
*readRequest
,
255 IpcIoMsg
*const response
)
257 bool ioError
= false;
259 debugs(79, 3, "error: timeout");
260 ioError
= true; // I/O timeout does not warrant setting error_?
262 if (response
->xerrno
) {
263 debugs(79, DBG_IMPORTANT
, "ERROR: " << dbName
<< " read: " <<
264 xstrerr(response
->xerrno
));
265 ioError
= error_
= true;
266 } else if (!response
->page
) {
267 debugs(79, DBG_IMPORTANT
, "ERROR: " << dbName
<< " read ran " <<
268 "out of shared memory pages");
271 const char *const buf
= Ipc::Mem::PagePointer(response
->page
);
272 memcpy(readRequest
->buf
, buf
, response
->len
);
275 Ipc::Mem::PutPage(response
->page
);
278 const ssize_t rlen
= ioError
? -1 : (ssize_t
)readRequest
->len
;
279 const int errflag
= ioError
? DISK_ERROR
:DISK_OK
;
280 ioRequestor
->readCompleted(readRequest
->buf
, rlen
, errflag
, readRequest
);
284 IpcIoFile::write(WriteRequest
*writeRequest
)
286 debugs(79,3, "(disker" << diskId
<< ", " << writeRequest
->len
<< ", " <<
287 writeRequest
->offset
<< ")");
289 assert(ioRequestor
!= nullptr);
290 assert(writeRequest
->len
> 0); // TODO: work around mmap failures on zero-len?
291 assert(writeRequest
->offset
>= 0);
294 //assert(minOffset < 0 || minOffset <= writeRequest->offset);
295 //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
297 IpcIoPendingRequest
*const pending
= new IpcIoPendingRequest(this);
298 pending
->writeRequest
= writeRequest
;
303 IpcIoFile::writeCompleted(WriteRequest
*writeRequest
,
304 const IpcIoMsg
*const response
)
306 bool ioError
= false;
308 debugs(79, 3, "disker " << diskId
<< " timeout");
309 ioError
= true; // I/O timeout does not warrant setting error_?
310 } else if (response
->xerrno
) {
311 debugs(79, DBG_IMPORTANT
, "ERROR: disker " << diskId
<<
312 " error writing " << writeRequest
->len
<< " bytes at " <<
313 writeRequest
->offset
<< ": " << xstrerr(response
->xerrno
) <<
314 "; this worker will stop using " << dbName
);
315 ioError
= error_
= true;
316 } else if (response
->len
!= writeRequest
->len
) {
317 debugs(79, DBG_IMPORTANT
, "ERROR: disker " << diskId
<< " wrote " <<
318 response
->len
<< " instead of " << writeRequest
->len
<<
319 " bytes (offset " << writeRequest
->offset
<< "); " <<
320 "this worker will stop using " << dbName
);
324 if (writeRequest
->free_func
)
325 (writeRequest
->free_func
)(const_cast<char*>(writeRequest
->buf
)); // broken API?
328 debugs(79,5, "wrote " << writeRequest
->len
<< " to disker" <<
329 diskId
<< " at " << writeRequest
->offset
);
332 const ssize_t rlen
= ioError
? 0 : (ssize_t
)writeRequest
->len
;
333 const int errflag
= ioError
? DISK_ERROR
:DISK_OK
;
334 ioRequestor
->writeCompleted(errflag
, rlen
, writeRequest
);
338 IpcIoFile::ioInProgress() const
340 return !olderRequests
->empty() || !newerRequests
->empty();
343 /// track a new pending request
345 IpcIoFile::trackPendingRequest(const unsigned int id
, IpcIoPendingRequest
*const pending
)
347 const std::pair
<RequestMap::iterator
,bool> result
=
348 newerRequests
->insert(std::make_pair(id
, pending
));
349 Must(result
.second
); // failures means that id was not unique
350 if (!timeoutCheckScheduled
)
351 scheduleTimeoutCheck();
354 /// push an I/O request to disker
356 IpcIoFile::push(IpcIoPendingRequest
*const pending
)
358 // prevent queue overflows: check for responses to earlier requests
359 // warning: this call may result in indirect push() recursion
360 CallService(nullptr, [] {
361 HandleResponses("before push");
364 debugs(47, 7, MYNAME
);
367 Must(pending
->readRequest
|| pending
->writeRequest
);
371 if (++lastRequestId
== 0) // don't use zero value as requestId
373 ipcIo
.requestId
= lastRequestId
;
374 ipcIo
.start
= current_time
;
375 ipcIo
.workerPid
= myPid
;
376 if (pending
->readRequest
) {
377 ipcIo
.command
= IpcIo::cmdRead
;
378 ipcIo
.offset
= pending
->readRequest
->offset
;
379 ipcIo
.len
= pending
->readRequest
->len
;
380 } else { // pending->writeRequest
381 Must(pending
->writeRequest
->len
<= Ipc::Mem::PageSize());
382 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage
, ipcIo
.page
)) {
384 throw TexcHere("run out of shared memory pages for IPC I/O");
386 ipcIo
.command
= IpcIo::cmdWrite
;
387 ipcIo
.offset
= pending
->writeRequest
->offset
;
388 ipcIo
.len
= pending
->writeRequest
->len
;
389 char *const buf
= Ipc::Mem::PagePointer(ipcIo
.page
);
390 memcpy(buf
, pending
->writeRequest
->buf
, ipcIo
.len
); // optimize away
393 debugs(47, 7, "pushing " << SipcIo(KidIdentifier
, ipcIo
, diskId
));
395 // protect DiskerHandleRequest() from pop queue overflow
396 if (pendingRequests() >= QueueCapacity
)
397 throw Ipc::OneToOneUniQueue::Full();
399 if (queue
->push(diskId
, ipcIo
))
400 Notify(diskId
); // must notify disker
401 trackPendingRequest(ipcIo
.requestId
, pending
);
402 } catch (const Queue::Full
&) {
403 debugs(47, DBG_IMPORTANT
, "ERROR: worker I/O push queue for " <<
404 dbName
<< " overflow: " <<
405 SipcIo(KidIdentifier
, ipcIo
, diskId
)); // TODO: report queue len
406 // TODO: grow queue size
408 Ipc::Mem::PutPage(ipcIo
.page
);
410 pending
->completeIo(nullptr);
412 } catch (const TextException
&e
) {
413 debugs(47, DBG_IMPORTANT
, "ERROR: " << dbName
<< " exception: " << e
.what());
414 pending
->completeIo(nullptr);
419 /// whether we think there is enough time to complete the I/O
421 IpcIoFile::canWait() const
423 if (!config
.ioTimeout
)
424 return true; // no timeout specified
427 if (!queue
->findOldest(diskId
, oldestIo
) || oldestIo
.start
.tv_sec
<= 0)
428 return true; // we cannot estimate expected wait time; assume it is OK
430 const int oldestWait
= tvSubMsec(oldestIo
.start
, current_time
);
432 int rateWait
= -1; // time in millisecons
433 const int ioRate
= queue
->rateLimit(diskId
).load();
435 // if there are N requests pending, the new one will wait at
436 // least N/max-swap-rate seconds
437 rateWait
= static_cast<int>(1e3
* queue
->outSize(diskId
) / ioRate
);
438 // adjust N/max-swap-rate value based on the queue "balance"
439 // member, in case we have been borrowing time against future
441 rateWait
+= queue
->balance(diskId
);
444 const int expectedWait
= max(oldestWait
, rateWait
);
445 if (expectedWait
< 0 ||
446 static_cast<time_msec_t
>(expectedWait
) < config
.ioTimeout
)
447 return true; // expected wait time is acceptable
449 debugs(47,2, "cannot wait: " << expectedWait
<<
450 " oldest: " << SipcIo(KidIdentifier
, oldestIo
, diskId
));
451 return false; // do not want to wait that long
454 /// called when coordinator responds to worker open request
456 IpcIoFile::HandleOpenResponse(const Ipc::StrandMessage
&response
)
458 debugs(47, 7, "coordinator response to open request");
459 for (IpcIoFileList::iterator i
= WaitingForOpen
.begin();
460 i
!= WaitingForOpen
.end(); ++i
) {
461 if (response
.strand
.tag
== (*i
)->dbName
) {
462 (*i
)->openCompleted(&response
);
463 WaitingForOpen
.erase(i
);
468 debugs(47, 4, "LATE disker response to open for " <<
469 response
.strand
.tag
);
470 // nothing we can do about it; completeIo() has been called already
474 IpcIoFile::HandleResponses(const char *const when
)
476 debugs(47, 4, "popping all " << when
);
478 // get all responses we can: since we are not pushing, this will stop
480 while (queue
->pop(diskId
, ipcIo
)) {
481 const IpcIoFilesMap::const_iterator i
= IpcIoFiles
.find(diskId
);
482 if (i
== IpcIoFiles
.end()) {
483 debugs(47, 5, "ignoring disk response " << SipcIo(KidIdentifier
, ipcIo
, diskId
) << ": the file is not open");
486 i
->second
->handleResponse(ipcIo
);
491 IpcIoFile::handleResponse(IpcIoMsg
&ipcIo
)
493 const int requestId
= ipcIo
.requestId
;
496 if (IpcIoPendingRequest
*const pending
= dequeueRequest(requestId
)) {
497 CallBack(pending
->codeContext
, [&] {
498 debugs(47, 7, "popped disker response to " << SipcIo(KidIdentifier
, ipcIo
, diskId
));
499 if (myPid
== ipcIo
.workerPid
)
500 pending
->completeIo(&ipcIo
);
502 debugs(47, 5, "ignoring response meant for our predecessor PID: " << ipcIo
.workerPid
);
503 delete pending
; // XXX: leaking if throwing
506 debugs(47, 4, "LATE disker response to " << SipcIo(KidIdentifier
, ipcIo
, diskId
));
507 // nothing we can do about it; completeIo() has been called already
512 IpcIoFile::Notify(const int peerId
)
514 // TODO: Count and report the total number of notifications, pops, pushes.
515 debugs(47, 7, "kid" << peerId
);
516 Ipc::TypedMsgHdr msg
;
517 msg
.setType(Ipc::mtIpcIoNotification
); // TODO: add proper message type?
518 msg
.putInt(KidIdentifier
);
519 const String addr
= Ipc::Port::MakeAddr(Ipc::strandAddrLabel
, peerId
);
520 Ipc::SendMessage(addr
, msg
);
524 IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr
&msg
)
526 const int from
= msg
.getInt();
527 debugs(47, 7, "from " << from
);
528 queue
->clearReaderSignal(from
);
529 if (IamDiskProcess())
530 DiskerHandleRequests();
532 HandleResponses("after notification");
535 /// \copydoc CollapsedForwarding::HandleNewDataAtStart()
537 IpcIoFile::HandleMessagesAtStart()
539 /// \sa CollapsedForwarding::HandleNewDataAtStart() -- duplicates this logic
540 queue
->clearAllReaderSignals();
541 if (IamDiskProcess())
542 DiskerHandleRequests();
544 HandleResponses("at start");
548 IpcIoFile::StatQueue(std::ostream
&os
)
551 os
<< "SMP disk I/O queues:\n";
552 queue
->stat
<IpcIoMsg
>(os
);
556 /// handles open request timeout
558 IpcIoFile::OpenTimeout(void *const param
)
561 // the pointer is used for comparison only and not dereferenced
562 const IpcIoFile
*const ipcIoFile
=
563 reinterpret_cast<const IpcIoFile
*>(param
);
564 for (IpcIoFileList::iterator i
= WaitingForOpen
.begin();
565 i
!= WaitingForOpen
.end(); ++i
) {
566 if (*i
== ipcIoFile
) {
567 (*i
)->openCompleted(nullptr);
568 WaitingForOpen
.erase(i
);
574 /// IpcIoFile::checkTimeouts wrapper
576 IpcIoFile::CheckTimeouts(void *const param
)
579 const int diskId
= reinterpret_cast<uintptr_t>(param
);
580 debugs(47, 7, "diskId=" << diskId
);
581 const IpcIoFilesMap::const_iterator i
= IpcIoFiles
.find(diskId
);
582 if (i
!= IpcIoFiles
.end())
583 i
->second
->checkTimeouts();
587 IpcIoFile::checkTimeouts()
589 timeoutCheckScheduled
= false;
591 // last chance to recover in case a notification message was lost, etc.
592 const RequestMap::size_type timeoutsBefore
= olderRequests
->size();
593 HandleResponses("before timeout");
594 const RequestMap::size_type timeoutsNow
= olderRequests
->size();
596 if (timeoutsBefore
> timeoutsNow
) { // some requests were rescued
597 // notification message lost or significantly delayed?
598 debugs(47, DBG_IMPORTANT
, "WARNING: communication with " << dbName
<<
599 " may be too slow or disrupted for about " <<
600 Timeout
<< "s; rescued " << (timeoutsBefore
- timeoutsNow
) <<
601 " out of " << timeoutsBefore
<< " I/Os");
605 debugs(47, DBG_IMPORTANT
, "WARNING: abandoning " <<
606 timeoutsNow
<< ' ' << dbName
<< " I/Os after at least " <<
607 Timeout
<< "s timeout");
610 // any old request would have timed out by now
611 typedef RequestMap::const_iterator RMCI
;
612 for (RMCI i
= olderRequests
->begin(); i
!= olderRequests
->end(); ++i
) {
613 IpcIoPendingRequest
*const pending
= i
->second
;
614 CallBack(pending
->codeContext
, [&] {
615 const auto requestId
= i
->first
;
616 debugs(47, 7, "disker timeout; ipcIo" << KidIdentifier
<< '.' << requestId
);
617 pending
->completeIo(nullptr); // no response
618 delete pending
; // XXX: leaking if throwing
621 olderRequests
->clear();
623 swap(olderRequests
, newerRequests
); // switches pointers around
624 if (!olderRequests
->empty() && !timeoutCheckScheduled
)
625 scheduleTimeoutCheck();
628 /// prepare to check for timeouts in a little while
630 IpcIoFile::scheduleTimeoutCheck()
632 // We may be running in an I/O requestor CodeContext, but are scheduling
633 // one-for-all CheckTimeouts() that is not specific to any request.
634 CallService(nullptr, [&] {
635 // we check all older requests at once so some may be wait for 2*Timeout
636 eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts
,
637 reinterpret_cast<void *>(diskId
), Timeout
, 0, false);
638 timeoutCheckScheduled
= true;
642 /// returns and forgets the right IpcIoFile pending request
643 IpcIoPendingRequest
*
644 IpcIoFile::dequeueRequest(const unsigned int requestId
)
646 Must(requestId
!= 0);
648 RequestMap
*map
= nullptr;
649 RequestMap::iterator i
= requestMap1
.find(requestId
);
651 if (i
!= requestMap1
.end())
654 i
= requestMap2
.find(requestId
);
655 if (i
!= requestMap2
.end())
659 if (!map
) // not found in both maps
662 IpcIoPendingRequest
*pending
= i
->second
;
668 IpcIoFile::getFD() const
670 assert(false); // not supported; TODO: remove this method from API
676 IpcIoMsg::IpcIoMsg():
680 workerPid(-1), // Unix-like systems use process IDs starting from 0
681 command(IpcIo::cmdNone
),
689 IpcIoMsg::stat(std::ostream
&os
)
692 tvSub(elapsedTime
, start
, current_time
);
693 os
<< "id: " << requestId
<<
694 ", offset: " << offset
<<
696 ", workerPid: " << workerPid
<<
697 ", page: " << page
<<
698 ", command: " << command
<<
699 ", start: " << start
<<
700 ", elapsed: " << elapsedTime
<<
701 ", errno: " << xerrno
;
704 /* IpcIoPendingRequest */
706 IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer
&aFile
):
708 readRequest(nullptr),
709 writeRequest(nullptr),
710 codeContext(CodeContext::Current())
715 IpcIoPendingRequest::completeIo(IpcIoMsg
*const response
)
718 file
->readCompleted(readRequest
, response
);
719 else if (writeRequest
)
720 file
->writeCompleted(writeRequest
, response
);
722 Must(!response
); // only timeouts are handled here
723 file
->openCompleted(nullptr);
727 /* XXX: disker code that should probably be moved elsewhere */
729 static SBuf DbName
; ///< full db file name
730 static int TheFile
= -1; ///< db file descriptor
733 diskerRead(IpcIoMsg
&ipcIo
)
735 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage
, ipcIo
.page
)) {
737 debugs(47,2, "run out of shared memory pages for IPC I/O");
741 char *const buf
= Ipc::Mem::PagePointer(ipcIo
.page
);
742 const ssize_t read
= pread(TheFile
, buf
, min(ipcIo
.len
, Ipc::Mem::PageSize()), ipcIo
.offset
);
743 ++statCounter
.syscalls
.disk
.reads
;
744 fd_bytes(TheFile
, read
, IoDirection::Read
);
748 const size_t len
= static_cast<size_t>(read
); // safe because read > 0
749 debugs(47,8, "disker" << KidIdentifier
<< " read " <<
750 (len
== ipcIo
.len
? "all " : "just ") << read
);
753 ipcIo
.xerrno
= errno
;
755 debugs(47,5, "disker" << KidIdentifier
<< " read error: " <<
760 /// Tries to write buffer to disk (a few times if needed);
761 /// sets ipcIo results, but does no cleanup. The caller must cleanup.
763 diskerWriteAttempts(IpcIoMsg
&ipcIo
)
765 const char *buf
= Ipc::Mem::PagePointer(ipcIo
.page
);
766 size_t toWrite
= min(ipcIo
.len
, Ipc::Mem::PageSize());
767 size_t wroteSoFar
= 0;
768 off_t offset
= ipcIo
.offset
;
769 // Partial writes to disk do happen. It is unlikely that the caller can
770 // handle partial writes by doing something other than writing leftovers
771 // again, so we try to write them ourselves to minimize overheads.
772 const int attemptLimit
= 10;
773 for (int attempts
= 1; attempts
<= attemptLimit
; ++attempts
) {
774 const ssize_t result
= pwrite(TheFile
, buf
, toWrite
, offset
);
775 ++statCounter
.syscalls
.disk
.writes
;
776 fd_bytes(TheFile
, result
, IoDirection::Write
);
779 ipcIo
.xerrno
= errno
;
780 assert(ipcIo
.xerrno
);
781 debugs(47, DBG_IMPORTANT
, "ERROR: " << DbName
<< " failure" <<
782 " writing " << toWrite
<< '/' << ipcIo
.len
<<
783 " at " << ipcIo
.offset
<< '+' << wroteSoFar
<<
784 " on " << attempts
<< " try: " << xstrerr(ipcIo
.xerrno
));
785 ipcIo
.len
= wroteSoFar
;
786 return; // bail on error
789 const size_t wroteNow
= static_cast<size_t>(result
); // result >= 0
792 debugs(47,3, "disker" << KidIdentifier
<< " wrote " <<
793 (wroteNow
>= toWrite
? "all " : "just ") << wroteNow
<<
794 " out of " << toWrite
<< '/' << ipcIo
.len
<< " at " <<
795 ipcIo
.offset
<< '+' << wroteSoFar
<< " on " << attempts
<<
798 wroteSoFar
+= wroteNow
;
800 if (wroteNow
>= toWrite
) {
802 ipcIo
.len
= wroteSoFar
;
803 return; // wrote everything there was to write
811 debugs(47, DBG_IMPORTANT
, "ERROR: " << DbName
<< " exhausted all " <<
812 attemptLimit
<< " attempts while writing " <<
813 toWrite
<< '/' << ipcIo
.len
<< " at " << ipcIo
.offset
<< '+' <<
815 return; // not a fatal I/O error, unless the caller treats it as such
819 diskerWrite(IpcIoMsg
&ipcIo
)
821 diskerWriteAttempts(ipcIo
); // may fail
822 Ipc::Mem::PutPage(ipcIo
.page
);
826 IpcIoFile::DiskerHandleMoreRequests(void *source
)
828 debugs(47, 7, "resuming handling requests after " <<
829 static_cast<const char *>(source
));
830 DiskerHandleMoreRequestsScheduled
= false;
831 IpcIoFile::DiskerHandleRequests();
835 IpcIoFile::WaitBeforePop()
837 const int ioRate
= queue
->localRateLimit().load();
838 const double maxRate
= ioRate
/1e3
; // req/ms
840 // do we need to enforce configured I/O rate?
844 // is there an I/O request we could potentially delay?
847 if (!queue
->peek(kidId
, ipcIo
)) {
848 // unlike pop(), peek() is not reliable and does not block reader
849 // so we must proceed with pop() even if it is likely to fail
853 static timeval LastIo
= current_time
;
855 const double ioDuration
= 1.0 / maxRate
; // ideal distance between two I/Os
856 // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
857 const int64_t maxImbalance
= min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration
));
859 const double credit
= ioDuration
; // what the last I/O should have cost us
860 const double debit
= tvSubMsec(LastIo
, current_time
); // actual distance from the last I/O
861 LastIo
= current_time
;
863 Ipc::QueueReader::Balance
&balance
= queue
->localBalance();
864 balance
+= static_cast<int64_t>(credit
- debit
);
866 debugs(47, 7, "rate limiting balance: " << balance
<< " after +" << credit
<< " -" << debit
);
868 if (ipcIo
.command
== IpcIo::cmdWrite
&& balance
> maxImbalance
) {
869 // if the next request is (likely) write and we accumulated
870 // too much time for future slow I/Os, then shed accumulated
871 // time to keep just half of the excess
872 const int64_t toSpend
= balance
- maxImbalance
/2;
874 if (toSpend
/1e3
> Timeout
)
875 debugs(47, DBG_IMPORTANT
, "WARNING: " << DbName
<< " delays " <<
876 "I/O requests for " << (toSpend
/1e3
) << " seconds " <<
877 "to obey " << ioRate
<< "/sec rate limit");
879 debugs(47, 3, "rate limiting by " << toSpend
<< " ms to get" <<
880 (1e3
*maxRate
) << "/sec rate");
881 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
882 &IpcIoFile::DiskerHandleMoreRequests
,
883 const_cast<char*>("rate limiting"),
884 toSpend
/1e3
, 0, false);
885 DiskerHandleMoreRequestsScheduled
= true;
887 } else if (balance
< -maxImbalance
) {
888 // do not owe "too much" to avoid "too large" bursts of I/O
889 balance
= -maxImbalance
;
896 IpcIoFile::DiskerHandleRequests()
898 // Balance our desire to maximize the number of concurrent I/O requests
899 // (reordred by OS to minimize seek time) with a requirement to
900 // send 1st-I/O notification messages, process Coordinator events, etc.
901 const int maxSpentMsec
= 10; // keep small: most RAM I/Os are under 1ms
902 const timeval loopStart
= current_time
;
907 while (!WaitBeforePop() && queue
->pop(workerId
, ipcIo
)) {
910 // at least one I/O per call is guaranteed if the queue is not empty
911 DiskerHandleRequest(workerId
, ipcIo
);
914 const double elapsedMsec
= tvSubMsec(loopStart
, current_time
);
915 if (elapsedMsec
> maxSpentMsec
|| elapsedMsec
< 0) {
916 if (!DiskerHandleMoreRequestsScheduled
) {
917 // the gap must be positive for select(2) to be given a chance
918 const double minBreakSecs
= 0.001;
919 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
920 &IpcIoFile::DiskerHandleMoreRequests
,
921 const_cast<char*>("long I/O loop"),
922 minBreakSecs
, 0, false);
923 DiskerHandleMoreRequestsScheduled
= true;
925 debugs(47, 3, "pausing after " << popped
<< " I/Os in " <<
926 elapsedMsec
<< "ms; " << (elapsedMsec
/popped
) << "ms per I/O");
931 // TODO: consider using O_DIRECT with "elevator" optimization where we pop
932 // requests first, then reorder the popped requests to optimize seek time,
933 // then do I/O, then take a break, and come back for the next set of I/O
937 /// called when disker receives an I/O request
939 IpcIoFile::DiskerHandleRequest(const int workerId
, IpcIoMsg
&ipcIo
)
941 if (ipcIo
.command
!= IpcIo::cmdRead
&& ipcIo
.command
!= IpcIo::cmdWrite
) {
942 debugs(0, DBG_CRITICAL
, "ERROR: " << DbName
<<
943 " should not receive " << ipcIo
.command
<<
944 " ipcIo" << workerId
<< '.' << ipcIo
.requestId
);
948 debugs(47,5, "disker" << KidIdentifier
<<
949 (ipcIo
.command
== IpcIo::cmdRead
? " reads " : " writes ") <<
950 ipcIo
.len
<< " at " << ipcIo
.offset
<<
951 " ipcIo" << workerId
<< '.' << ipcIo
.requestId
);
953 const auto workerPid
= ipcIo
.workerPid
;
954 assert(workerPid
>= 0);
956 if (ipcIo
.command
== IpcIo::cmdRead
)
958 else // ipcIo.command == IpcIo::cmdWrite
961 assert(ipcIo
.workerPid
== workerPid
);
963 debugs(47, 7, "pushing " << SipcIo(workerId
, ipcIo
, KidIdentifier
));
966 if (queue
->push(workerId
, ipcIo
))
967 Notify(workerId
); // must notify worker
968 } catch (const Queue::Full
&) {
969 // The worker pop queue should not overflow because the worker can
970 // push only if pendingRequests() is less than QueueCapacity.
971 debugs(47, DBG_IMPORTANT
, "ERROR: Squid BUG: Worker I/O pop queue for " <<
972 DbName
<< " overflow: " <<
973 SipcIo(workerId
, ipcIo
, KidIdentifier
)); // TODO: report queue len
975 // the I/O request we could not push will timeout
980 DiskerOpen(const SBuf
&path
, int flags
, mode_t
)
985 TheFile
= file_open(DbName
.c_str(), flags
);
988 const int xerrno
= errno
;
989 debugs(47, DBG_CRITICAL
, "ERROR: cannot open " << DbName
<< ": " <<
994 ++store_open_disk_fd
;
995 debugs(79,3, "rock db opened " << DbName
<< ": FD " << TheFile
);
1000 DiskerClose(const SBuf
&path
)
1003 file_close(TheFile
);
1004 debugs(79,3, "rock db closed " << path
<< ": FD " << TheFile
);
1006 --store_open_disk_fd
;
1011 /// reports our needs for shared memory pages to Ipc::Mem::Pages
1012 /// and initializes shared memory segments used by IpcIoFile
1013 class IpcIoRr
: public Ipc::Mem::RegisteredRunner
1016 /* RegisteredRunner API */
1017 IpcIoRr(): owner(nullptr) {}
1018 ~IpcIoRr() override
;
1019 void claimMemoryNeeds() override
;
1022 /* Ipc::Mem::RegisteredRunner API */
1023 void create() override
;
1026 Ipc::FewToFewBiQueue::Owner
*owner
;
1029 DefineRunnerRegistrator(IpcIoRr
);
1032 IpcIoRr::claimMemoryNeeds()
1034 const int itemsCount
= Ipc::FewToFewBiQueue::MaxItemsCount(
1035 ::Config
.workers
, ::Config
.cacheSwap
.n_strands
, QueueCapacity
);
1036 // the maximum number of shared I/O pages is approximately the
1037 // number of queue slots, we add a fudge factor to that to account
1038 // for corner cases where I/O pages are created before queue
1039 // limits are checked or destroyed long after the I/O is dequeued
1040 Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage
,
1041 static_cast<int>(itemsCount
* 1.1));
1047 if (Config
.cacheSwap
.n_strands
<= 0)
1051 owner
= Ipc::FewToFewBiQueue::Init(ShmLabel
, Config
.workers
, 1,
1052 Config
.cacheSwap
.n_strands
,
1053 1 + Config
.workers
, sizeof(IpcIoMsg
),