]> git.ipfire.org Git - thirdparty/squid.git/blame - src/DiskIO/IpcIo/IpcIoFile.cc
Simplify appending SBuf to String (#2108)
[thirdparty/squid.git] / src / DiskIO / IpcIo / IpcIoFile.cc
CommitLineData
254912f3 1/*
1f7b830e 2 * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
bbc27441
AJ
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
254912f3
AR
7 */
8
bbc27441
AJ
9/* DEBUG: section 47 Store Directory Routines */
10
f7f3304a 11#include "squid.h"
5faec1a1 12#include "base/AsyncFunCalls.h"
ccfbe8f4 13#include "base/CodeContext.h"
f5591061 14#include "base/RunnersRegistry.h"
254912f3
AR
15#include "base/TextException.h"
16#include "DiskIO/IORequestor.h"
17#include "DiskIO/IpcIo/IpcIoFile.h"
18#include "DiskIO/ReadRequest.h"
19#include "DiskIO/WriteRequest.h"
c4ad1349 20#include "fd.h"
b3f7fd88 21#include "fs_io.h"
582c2af2 22#include "globals.h"
21d845b1 23#include "ipc/mem/Pages.h"
254912f3
AR
24#include "ipc/Messages.h"
25#include "ipc/Port.h"
f5591061 26#include "ipc/Queue.h"
6ccfd70a 27#include "ipc/StrandCoord.h"
9a51593d 28#include "ipc/StrandSearch.h"
254912f3 29#include "ipc/UdsOp.h"
65e41a45 30#include "sbuf/SBuf.h"
4d5904f7 31#include "SquidConfig.h"
21d845b1 32#include "StatCounters.h"
5bed43d6 33#include "tools.h"
254912f3 34
1a30fdf5
AJ
35#include <cerrno>
36
254912f3
AR
37CBDATA_CLASS_INIT(IpcIoFile);
38
f5591061
DK
39/// shared memory segment path to use for IpcIoFile maps
40static const char *const ShmLabel = "io_file";
ea2cdeb6
DK
41/// a single worker-to-disker or disker-to-worker queue capacity; up
42/// to 2*QueueCapacity I/O requests queued between a single worker and
43/// a single disker
44// TODO: make configurable or compute from squid.conf settings if possible
45static const int QueueCapacity = 1024;
f5591061 46
fa61cefe 47const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
b2aa0934
DK
48IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
49IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
829030b5 50std::unique_ptr<IpcIoFile::Queue> IpcIoFile::queue;
254912f3 51
c792f7fc
AR
52bool IpcIoFile::DiskerHandleMoreRequestsScheduled = false;
53
7015a149
AR
54static bool DiskerOpen(const SBuf &path, int flags, mode_t mode);
55static void DiskerClose(const SBuf &path);
254912f3 56
fa61cefe
AR
57/// IpcIo wrapper for debugs() streams; XXX: find a better class name
58struct SipcIo {
59 SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
f53969cc 60 worker(aWorker), msg(aMsg), disker(aDisker) {}
fa61cefe
AR
61
62 int worker;
63 const IpcIoMsg &msg;
64 int disker;
65};
66
8b082ed9 67static std::ostream &
fa61cefe
AR
68operator <<(std::ostream &os, const SipcIo &sio)
69{
70 return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
d8ee9e8d 71 sio.msg.command << sio.disker;
fa61cefe
AR
72}
73
d8ee9e8d
EB
74/* IpcIo::Command */
75
76std::ostream &
25ecffe5 77IpcIo::operator <<(std::ostream &os, const Command command)
d8ee9e8d
EB
78{
79 switch (command) {
25ecffe5 80 case cmdNone:
d8ee9e8d 81 return os << '-';
25ecffe5 82 case cmdOpen:
d8ee9e8d 83 return os << 'o';
25ecffe5 84 case cmdRead:
d8ee9e8d 85 return os << 'r';
25ecffe5 86 case cmdWrite:
d8ee9e8d
EB
87 return os << 'w';
88 }
89 // unreachable code
90 return os << static_cast<int>(command);
91}
92
93/* IpcIoFile */
94
254912f3 95IpcIoFile::IpcIoFile(char const *aDb):
30fe12bc
EB
96 dbName(aDb),
97 myPid(getpid()),
98 diskId(-1),
99 error_(false),
100 lastRequestId(0),
f53969cc
SM
101 olderRequests(&requestMap1), newerRequests(&requestMap2),
102 timeoutCheckScheduled(false)
254912f3 103{
30fe12bc 104 assert(myPid >= 0);
254912f3
AR
105}
106
107IpcIoFile::~IpcIoFile()
108{
ebaabe74
AR
109 SWALLOW_EXCEPTIONS({
110 if (diskId >= 0) {
111 const auto i = IpcIoFiles.find(diskId);
112 Must(i != IpcIoFiles.end());
113 Must(i->second == this);
114 IpcIoFiles.erase(i);
115 }
116 });
254912f3
AR
117}
118
43ebbac3
AR
119void
120IpcIoFile::configure(const Config &cfg)
121{
122 DiskFile::configure(cfg);
123 config = cfg;
124}
125
254912f3
AR
126void
127IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
128{
129 ioRequestor = callback;
130 Must(diskId < 0); // we do not know our disker yet
f5591061 131
5faec1a1 132 if (!queue.get()) {
f5591061 133 queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier));
5faec1a1
EB
134 AsyncCall::Pointer call = asyncCall(79, 4, "IpcIoFile::HandleMessagesAtStart",
135 NullaryFunDialer(&IpcIoFile::HandleMessagesAtStart));
136 ScheduleCallHere(call);
137 }
254912f3
AR
138
139 if (IamDiskProcess()) {
7015a149 140 error_ = !DiskerOpen(SBuf(dbName.termedBuf()), flags, mode);
f7091279
DK
141 if (error_)
142 return;
143
b2aa0934
DK
144 diskId = KidIdentifier;
145 const bool inserted =
146 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
147 Must(inserted);
9a51593d 148
6c6a656a 149 queue->localRateLimit().store(config.ioRate);
df881a0f 150
6ccfd70a 151 Ipc::StrandMessage::NotifyCoordinator(Ipc::mtRegisterStrand, dbName.termedBuf());
f7091279 152
e528e570 153 ioRequestor->ioCompletedNotification();
254912f3 154 return;
9a51593d 155 }
254912f3 156
4c218615 157 const Ipc::StrandSearchRequest request(dbName);
9a51593d
DK
158 Ipc::TypedMsgHdr msg;
159 request.pack(msg);
1ee292b7 160 Ipc::SendMessage(Ipc::Port::CoordinatorAddr(), msg);
9a51593d 161
b2aa0934
DK
162 WaitingForOpen.push_back(this);
163
164 eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
165 this, Timeout, 0, false); // "this" pointer is used as id
254912f3
AR
166}
167
168void
6ccfd70a 169IpcIoFile::openCompleted(const Ipc::StrandMessage *const response)
9199139f 170{
9a51593d 171 Must(diskId < 0); // we do not know our disker yet
9a51593d
DK
172
173 if (!response) {
82ed74dc
AR
174 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " communication " <<
175 "channel establishment timeout");
caca86d7 176 error_ = true;
9a51593d
DK
177 } else {
178 diskId = response->strand.kidId;
179 if (diskId >= 0) {
b2aa0934
DK
180 const bool inserted =
181 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
182 Must(inserted);
9a51593d 183 } else {
254912f3 184 error_ = true;
82ed74dc
AR
185 debugs(79, DBG_IMPORTANT, "ERROR: no disker claimed " <<
186 "responsibility for " << dbName);
9a51593d
DK
187 }
188 }
254912f3
AR
189
190 ioRequestor->ioCompletedNotification();
191}
192
193/**
194 * Alias for IpcIoFile::open(...)
195 \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
196 */
197void
198IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
199{
200 assert(false); // check
201 /* We use the same logic path for open */
202 open(flags, mode, callback);
203}
204
205void
206IpcIoFile::close()
207{
aee3523a 208 assert(ioRequestor != nullptr);
254912f3
AR
209
210 if (IamDiskProcess())
7015a149 211 DiskerClose(SBuf(dbName.termedBuf()));
9a51593d 212 // XXX: else nothing to do?
254912f3
AR
213
214 ioRequestor->closeCompleted();
215}
216
217bool
218IpcIoFile::canRead() const
219{
7e2d5ef6 220 return diskId >= 0 && !error_ && canWait();
254912f3
AR
221}
222
223bool
224IpcIoFile::canWrite() const
225{
7e2d5ef6 226 return diskId >= 0 && !error_ && canWait();
254912f3
AR
227}
228
229bool
230IpcIoFile::error() const
231{
232 return error_;
233}
234
235void
236IpcIoFile::read(ReadRequest *readRequest)
237{
bf95c10a 238 debugs(79,3, "(disker" << diskId << ", " << readRequest->len << ", " <<
9199139f 239 readRequest->offset << ")");
254912f3 240
aee3523a 241 assert(ioRequestor != nullptr);
254912f3
AR
242 assert(readRequest->offset >= 0);
243 Must(!error_);
244
245 //assert(minOffset < 0 || minOffset <= readRequest->offset);
246 //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
247
9a51593d 248 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
254912f3 249 pending->readRequest = readRequest;
7a907247 250 push(pending);
254912f3
AR
251}
252
253void
254IpcIoFile::readCompleted(ReadRequest *readRequest,
8ed94021 255 IpcIoMsg *const response)
254912f3 256{
caca86d7 257 bool ioError = false;
9a51593d 258 if (!response) {
bf95c10a 259 debugs(79, 3, "error: timeout");
caca86d7 260 ioError = true; // I/O timeout does not warrant setting error_?
9a51593d 261 } else {
70e3f706 262 if (response->xerrno) {
82ed74dc
AR
263 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read: " <<
264 xstrerr(response->xerrno));
70e3f706 265 ioError = error_ = true;
9199139f 266 } else if (!response->page) {
82ed74dc
AR
267 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read ran " <<
268 "out of shared memory pages");
70e3f706
DK
269 ioError = true;
270 } else {
271 const char *const buf = Ipc::Mem::PagePointer(response->page);
272 memcpy(readRequest->buf, buf, response->len);
273 }
254912f3 274
70e3f706
DK
275 Ipc::Mem::PutPage(response->page);
276 }
8ed94021 277
caca86d7 278 const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
9199139f 279 const int errflag = ioError ? DISK_ERROR :DISK_OK;
20b0e1fe 280 ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
254912f3
AR
281}
282
283void
284IpcIoFile::write(WriteRequest *writeRequest)
285{
bf95c10a 286 debugs(79,3, "(disker" << diskId << ", " << writeRequest->len << ", " <<
9199139f 287 writeRequest->offset << ")");
254912f3 288
aee3523a 289 assert(ioRequestor != nullptr);
254912f3
AR
290 assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
291 assert(writeRequest->offset >= 0);
292 Must(!error_);
293
294 //assert(minOffset < 0 || minOffset <= writeRequest->offset);
295 //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
296
9a51593d 297 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
254912f3 298 pending->writeRequest = writeRequest;
7a907247 299 push(pending);
254912f3
AR
300}
301
302void
303IpcIoFile::writeCompleted(WriteRequest *writeRequest,
9a51593d 304 const IpcIoMsg *const response)
254912f3 305{
caca86d7 306 bool ioError = false;
9a51593d 307 if (!response) {
4e4f7fd9 308 debugs(79, 3, "disker " << diskId << " timeout");
caca86d7 309 ioError = true; // I/O timeout does not warrant setting error_?
9199139f 310 } else if (response->xerrno) {
4e4f7fd9
AR
311 debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId <<
312 " error writing " << writeRequest->len << " bytes at " <<
313 writeRequest->offset << ": " << xstrerr(response->xerrno) <<
314 "; this worker will stop using " << dbName);
9a51593d 315 ioError = error_ = true;
9199139f 316 } else if (response->len != writeRequest->len) {
4e4f7fd9
AR
317 debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId << " wrote " <<
318 response->len << " instead of " << writeRequest->len <<
319 " bytes (offset " << writeRequest->offset << "); " <<
320 "this worker will stop using " << dbName);
254912f3
AR
321 error_ = true;
322 }
323
324 if (writeRequest->free_func)
325 (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
326
caca86d7 327 if (!ioError) {
bf95c10a 328 debugs(79,5, "wrote " << writeRequest->len << " to disker" <<
9199139f
AR
329 diskId << " at " << writeRequest->offset);
330 }
254912f3 331
caca86d7 332 const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
9199139f 333 const int errflag = ioError ? DISK_ERROR :DISK_OK;
254912f3
AR
334 ioRequestor->writeCompleted(errflag, rlen, writeRequest);
335}
336
337bool
338IpcIoFile::ioInProgress() const
339{
9a51593d 340 return !olderRequests->empty() || !newerRequests->empty();
254912f3
AR
341}
342
9a51593d 343/// track a new pending request
254912f3 344void
28bd45ba 345IpcIoFile::trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
254912f3 346{
28bd45ba
AR
347 const std::pair<RequestMap::iterator,bool> result =
348 newerRequests->insert(std::make_pair(id, pending));
349 Must(result.second); // failures means that id was not unique
9a51593d
DK
350 if (!timeoutCheckScheduled)
351 scheduleTimeoutCheck();
352}
254912f3 353
9a51593d
DK
354/// push an I/O request to disker
355void
7a907247 356IpcIoFile::push(IpcIoPendingRequest *const pending)
9a51593d 357{
fa61cefe 358 // prevent queue overflows: check for responses to earlier requests
28bd45ba 359 // warning: this call may result in indirect push() recursion
ccfbe8f4
AR
360 CallService(nullptr, [] {
361 HandleResponses("before push");
362 });
fa61cefe 363
bf95c10a 364 debugs(47, 7, MYNAME);
9a51593d 365 Must(diskId >= 0);
7a907247
DK
366 Must(pending);
367 Must(pending->readRequest || pending->writeRequest);
9a51593d
DK
368
369 IpcIoMsg ipcIo;
8ed94021 370 try {
28bd45ba
AR
371 if (++lastRequestId == 0) // don't use zero value as requestId
372 ++lastRequestId;
8ed94021 373 ipcIo.requestId = lastRequestId;
0a11e039 374 ipcIo.start = current_time;
30fe12bc 375 ipcIo.workerPid = myPid;
8ed94021
DK
376 if (pending->readRequest) {
377 ipcIo.command = IpcIo::cmdRead;
378 ipcIo.offset = pending->readRequest->offset;
379 ipcIo.len = pending->readRequest->len;
380 } else { // pending->writeRequest
381 Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
551f8a18 382 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
8ed94021 383 ipcIo.len = 0;
551f8a18 384 throw TexcHere("run out of shared memory pages for IPC I/O");
8ed94021
DK
385 }
386 ipcIo.command = IpcIo::cmdWrite;
387 ipcIo.offset = pending->writeRequest->offset;
388 ipcIo.len = pending->writeRequest->len;
389 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
390 memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
391 }
254912f3 392
bf95c10a 393 debugs(47, 7, "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
fa61cefe 394
387e1d45
EB
395 // protect DiskerHandleRequest() from pop queue overflow
396 if (pendingRequests() >= QueueCapacity)
397 throw Ipc::OneToOneUniQueue::Full();
398
f5591061 399 if (queue->push(diskId, ipcIo))
fa61cefe 400 Notify(diskId); // must notify disker
28bd45ba 401 trackPendingRequest(ipcIo.requestId, pending);
f5591061 402 } catch (const Queue::Full &) {
82ed74dc
AR
403 debugs(47, DBG_IMPORTANT, "ERROR: worker I/O push queue for " <<
404 dbName << " overflow: " <<
fa61cefe
AR
405 SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
406 // TODO: grow queue size
d0c5cf9e
CG
407 if (ipcIo.page)
408 Ipc::Mem::PutPage(ipcIo.page);
9199139f 409
aee3523a 410 pending->completeIo(nullptr);
8ed94021
DK
411 delete pending;
412 } catch (const TextException &e) {
82ed74dc 413 debugs(47, DBG_IMPORTANT, "ERROR: " << dbName << " exception: " << e.what());
aee3523a 414 pending->completeIo(nullptr);
7a907247 415 delete pending;
9a51593d 416 }
254912f3
AR
417}
418
0a11e039
AR
419/// whether we think there is enough time to complete the I/O
420bool
9199139f
AR
421IpcIoFile::canWait() const
422{
43ebbac3 423 if (!config.ioTimeout)
0a11e039
AR
424 return true; // no timeout specified
425
426 IpcIoMsg oldestIo;
5aac671b 427 if (!queue->findOldest(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
0a11e039
AR
428 return true; // we cannot estimate expected wait time; assume it is OK
429
55939a01
AR
430 const int oldestWait = tvSubMsec(oldestIo.start, current_time);
431
432 int rateWait = -1; // time in millisecons
6c6a656a 433 const int ioRate = queue->rateLimit(diskId).load();
55939a01
AR
434 if (ioRate > 0) {
435 // if there are N requests pending, the new one will wait at
436 // least N/max-swap-rate seconds
75017bc9 437 rateWait = static_cast<int>(1e3 * queue->outSize(diskId) / ioRate);
55939a01
AR
438 // adjust N/max-swap-rate value based on the queue "balance"
439 // member, in case we have been borrowing time against future
440 // I/O already
441 rateWait += queue->balance(diskId);
442 }
443
444 const int expectedWait = max(oldestWait, rateWait);
0a11e039 445 if (expectedWait < 0 ||
43ebbac3 446 static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
2f8abb64 447 return true; // expected wait time is acceptable
0a11e039 448
bf95c10a 449 debugs(47,2, "cannot wait: " << expectedWait <<
c792f7fc 450 " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
0a11e039
AR
451 return false; // do not want to wait that long
452}
453
9a51593d 454/// called when coordinator responds to worker open request
254912f3 455void
6ccfd70a 456IpcIoFile::HandleOpenResponse(const Ipc::StrandMessage &response)
254912f3 457{
bf95c10a 458 debugs(47, 7, "coordinator response to open request");
b2aa0934 459 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
9199139f 460 i != WaitingForOpen.end(); ++i) {
b2aa0934
DK
461 if (response.strand.tag == (*i)->dbName) {
462 (*i)->openCompleted(&response);
463 WaitingForOpen.erase(i);
464 return;
465 }
466 }
467
bf95c10a 468 debugs(47, 4, "LATE disker response to open for " <<
b2aa0934
DK
469 response.strand.tag);
470 // nothing we can do about it; completeIo() has been called already
9a51593d 471}
254912f3 472
9a51593d 473void
f5591061 474IpcIoFile::HandleResponses(const char *const when)
fa61cefe 475{
bf95c10a 476 debugs(47, 4, "popping all " << when);
fa61cefe
AR
477 IpcIoMsg ipcIo;
478 // get all responses we can: since we are not pushing, this will stop
f5591061
DK
479 int diskId;
480 while (queue->pop(diskId, ipcIo)) {
481 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
5faec1a1
EB
482 if (i == IpcIoFiles.end()) {
483 debugs(47, 5, "ignoring disk response " << SipcIo(KidIdentifier, ipcIo, diskId) << ": the file is not open");
484 continue;
485 }
f5591061
DK
486 i->second->handleResponse(ipcIo);
487 }
9a51593d 488}
254912f3 489
9a51593d 490void
8ed94021 491IpcIoFile::handleResponse(IpcIoMsg &ipcIo)
9a51593d
DK
492{
493 const int requestId = ipcIo.requestId;
fa61cefe 494
9a51593d
DK
495 Must(requestId);
496 if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
ccfbe8f4
AR
497 CallBack(pending->codeContext, [&] {
498 debugs(47, 7, "popped disker response to " << SipcIo(KidIdentifier, ipcIo, diskId));
30fe12bc
EB
499 if (myPid == ipcIo.workerPid)
500 pending->completeIo(&ipcIo);
501 else
502 debugs(47, 5, "ignoring response meant for our predecessor PID: " << ipcIo.workerPid);
ccfbe8f4
AR
503 delete pending; // XXX: leaking if throwing
504 });
caca86d7 505 } else {
ccfbe8f4 506 debugs(47, 4, "LATE disker response to " << SipcIo(KidIdentifier, ipcIo, diskId));
caca86d7
AR
507 // nothing we can do about it; completeIo() has been called already
508 }
254912f3
AR
509}
510
254912f3 511void
9a51593d 512IpcIoFile::Notify(const int peerId)
254912f3 513{
fa61cefe 514 // TODO: Count and report the total number of notifications, pops, pushes.
bf95c10a 515 debugs(47, 7, "kid" << peerId);
9a51593d
DK
516 Ipc::TypedMsgHdr msg;
517 msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
518 msg.putInt(KidIdentifier);
1ee292b7 519 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, peerId);
9a51593d
DK
520 Ipc::SendMessage(addr, msg);
521}
522
523void
524IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
525{
fa61cefe 526 const int from = msg.getInt();
bf95c10a 527 debugs(47, 7, "from " << from);
f5591061
DK
528 queue->clearReaderSignal(from);
529 if (IamDiskProcess())
530 DiskerHandleRequests();
531 else
532 HandleResponses("after notification");
9a51593d
DK
533}
534
5faec1a1
EB
535/// \copydoc CollapsedForwarding::HandleNewDataAtStart()
536void
537IpcIoFile::HandleMessagesAtStart()
538{
539 /// \sa CollapsedForwarding::HandleNewDataAtStart() -- duplicates this logic
540 queue->clearAllReaderSignals();
541 if (IamDiskProcess())
542 DiskerHandleRequests();
543 else
544 HandleResponses("at start");
545}
546
d8ee9e8d
EB
547void
548IpcIoFile::StatQueue(std::ostream &os)
549{
550 if (queue.get()) {
551 os << "SMP disk I/O queues:\n";
552 queue->stat<IpcIoMsg>(os);
553 }
554}
555
b2aa0934
DK
556/// handles open request timeout
557void
558IpcIoFile::OpenTimeout(void *const param)
559{
560 Must(param);
561 // the pointer is used for comparison only and not dereferenced
562 const IpcIoFile *const ipcIoFile =
563 reinterpret_cast<const IpcIoFile *>(param);
564 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
9199139f 565 i != WaitingForOpen.end(); ++i) {
b2aa0934 566 if (*i == ipcIoFile) {
aee3523a 567 (*i)->openCompleted(nullptr);
b2aa0934
DK
568 WaitingForOpen.erase(i);
569 break;
570 }
571 }
572}
573
9a51593d
DK
574/// IpcIoFile::checkTimeouts wrapper
575void
576IpcIoFile::CheckTimeouts(void *const param)
577{
9a51593d 578 Must(param);
b2aa0934 579 const int diskId = reinterpret_cast<uintptr_t>(param);
bf95c10a 580 debugs(47, 7, "diskId=" << diskId);
b2aa0934
DK
581 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
582 if (i != IpcIoFiles.end())
583 i->second->checkTimeouts();
9a51593d
DK
584}
585
586void
587IpcIoFile::checkTimeouts()
588{
9a51593d 589 timeoutCheckScheduled = false;
254912f3 590
0ef0509e
AR
591 // last chance to recover in case a notification message was lost, etc.
592 const RequestMap::size_type timeoutsBefore = olderRequests->size();
593 HandleResponses("before timeout");
594 const RequestMap::size_type timeoutsNow = olderRequests->size();
595
596 if (timeoutsBefore > timeoutsNow) { // some requests were rescued
597 // notification message lost or significantly delayed?
82ed74dc
AR
598 debugs(47, DBG_IMPORTANT, "WARNING: communication with " << dbName <<
599 " may be too slow or disrupted for about " <<
0ef0509e
AR
600 Timeout << "s; rescued " << (timeoutsBefore - timeoutsNow) <<
601 " out of " << timeoutsBefore << " I/Os");
602 }
603
604 if (timeoutsNow) {
605 debugs(47, DBG_IMPORTANT, "WARNING: abandoning " <<
82ed74dc 606 timeoutsNow << ' ' << dbName << " I/Os after at least " <<
0ef0509e
AR
607 Timeout << "s timeout");
608 }
609
caca86d7
AR
610 // any old request would have timed out by now
611 typedef RequestMap::const_iterator RMCI;
9a51593d
DK
612 for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
613 IpcIoPendingRequest *const pending = i->second;
ccfbe8f4
AR
614 CallBack(pending->codeContext, [&] {
615 const auto requestId = i->first;
616 debugs(47, 7, "disker timeout; ipcIo" << KidIdentifier << '.' << requestId);
617 pending->completeIo(nullptr); // no response
618 delete pending; // XXX: leaking if throwing
619 });
9a51593d
DK
620 }
621 olderRequests->clear();
caca86d7 622
9a51593d 623 swap(olderRequests, newerRequests); // switches pointers around
0ef0509e 624 if (!olderRequests->empty() && !timeoutCheckScheduled)
9a51593d 625 scheduleTimeoutCheck();
254912f3
AR
626}
627
caca86d7 628/// prepare to check for timeouts in a little while
254912f3 629void
9a51593d 630IpcIoFile::scheduleTimeoutCheck()
254912f3 631{
ccfbe8f4
AR
632 // We may be running in an I/O requestor CodeContext, but are scheduling
633 // one-for-all CheckTimeouts() that is not specific to any request.
634 CallService(nullptr, [&] {
635 // we check all older requests at once so some may be wait for 2*Timeout
636 eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
b6388dfd 637 reinterpret_cast<void *>(diskId), Timeout, 0, false);
ccfbe8f4
AR
638 timeoutCheckScheduled = true;
639 });
254912f3
AR
640}
641
642/// returns and forgets the right IpcIoFile pending request
643IpcIoPendingRequest *
9a51593d 644IpcIoFile::dequeueRequest(const unsigned int requestId)
254912f3 645{
254912f3 646 Must(requestId != 0);
caca86d7 647
aee3523a 648 RequestMap *map = nullptr;
9a51593d 649 RequestMap::iterator i = requestMap1.find(requestId);
caca86d7 650
9a51593d
DK
651 if (i != requestMap1.end())
652 map = &requestMap1;
caca86d7 653 else {
9a51593d
DK
654 i = requestMap2.find(requestId);
655 if (i != requestMap2.end())
656 map = &requestMap2;
caca86d7
AR
657 }
658
659 if (!map) // not found in both maps
aee3523a 660 return nullptr;
caca86d7
AR
661
662 IpcIoPendingRequest *pending = i->second;
663 map->erase(i);
664 return pending;
254912f3
AR
665}
666
667int
9a51593d 668IpcIoFile::getFD() const
254912f3
AR
669{
670 assert(false); // not supported; TODO: remove this method from API
671 return -1;
672}
673
9a51593d 674/* IpcIoMsg */
254912f3 675
9a51593d 676IpcIoMsg::IpcIoMsg():
f53969cc
SM
677 requestId(0),
678 offset(0),
679 len(0),
30fe12bc 680 workerPid(-1), // Unix-like systems use process IDs starting from 0
f53969cc
SM
681 command(IpcIo::cmdNone),
682 xerrno(0)
254912f3 683{
0a11e039 684 start.tv_sec = 0;
e19994df 685 start.tv_usec = 0;
254912f3
AR
686}
687
d8ee9e8d
EB
688void
689IpcIoMsg::stat(std::ostream &os)
690{
691 timeval elapsedTime;
692 tvSub(elapsedTime, start, current_time);
693 os << "id: " << requestId <<
70ac5b29 694 ", offset: " << offset <<
695 ", size: " << len <<
696 ", workerPid: " << workerPid <<
697 ", page: " << page <<
698 ", command: " << command <<
699 ", start: " << start <<
700 ", elapsed: " << elapsedTime <<
701 ", errno: " << xerrno;
d8ee9e8d
EB
702}
703
caca86d7 704/* IpcIoPendingRequest */
254912f3
AR
705
706IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
ccfbe8f4
AR
707 file(aFile),
708 readRequest(nullptr),
709 writeRequest(nullptr),
710 codeContext(CodeContext::Current())
254912f3
AR
711{
712}
713
caca86d7 714void
8ed94021 715IpcIoPendingRequest::completeIo(IpcIoMsg *const response)
caca86d7 716{
caca86d7
AR
717 if (readRequest)
718 file->readCompleted(readRequest, response);
9199139f 719 else if (writeRequest)
caca86d7 720 file->writeCompleted(writeRequest, response);
9a51593d
DK
721 else {
722 Must(!response); // only timeouts are handled here
aee3523a 723 file->openCompleted(nullptr);
9a51593d 724 }
caca86d7
AR
725}
726
254912f3
AR
727/* XXX: disker code that should probably be moved elsewhere */
728
7015a149 729static SBuf DbName; ///< full db file name
254912f3
AR
730static int TheFile = -1; ///< db file descriptor
731
9a51593d
DK
732static void
733diskerRead(IpcIoMsg &ipcIo)
254912f3 734{
551f8a18 735 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
8ed94021 736 ipcIo.len = 0;
bf95c10a 737 debugs(47,2, "run out of shared memory pages for IPC I/O");
8ed94021
DK
738 return;
739 }
740
741 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
742 const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
e4f1fdae 743 ++statCounter.syscalls.disk.reads;
5baf2c78 744 fd_bytes(TheFile, read, IoDirection::Read);
2d338731 745
254912f3 746 if (read >= 0) {
9a51593d
DK
747 ipcIo.xerrno = 0;
748 const size_t len = static_cast<size_t>(read); // safe because read > 0
bf95c10a 749 debugs(47,8, "disker" << KidIdentifier << " read " <<
9199139f 750 (len == ipcIo.len ? "all " : "just ") << read);
9a51593d
DK
751 ipcIo.len = len;
752 } else {
753 ipcIo.xerrno = errno;
754 ipcIo.len = 0;
bf95c10a 755 debugs(47,5, "disker" << KidIdentifier << " read error: " <<
9199139f 756 ipcIo.xerrno);
9a51593d 757 }
254912f3
AR
758}
759
9d4e9cfb 760/// Tries to write buffer to disk (a few times if needed);
4e4f7fd9 761/// sets ipcIo results, but does no cleanup. The caller must cleanup.
9a51593d 762static void
4e4f7fd9
AR
763diskerWriteAttempts(IpcIoMsg &ipcIo)
764{
765 const char *buf = Ipc::Mem::PagePointer(ipcIo.page);
766 size_t toWrite = min(ipcIo.len, Ipc::Mem::PageSize());
767 size_t wroteSoFar = 0;
768 off_t offset = ipcIo.offset;
769 // Partial writes to disk do happen. It is unlikely that the caller can
770 // handle partial writes by doing something other than writing leftovers
771 // again, so we try to write them ourselves to minimize overheads.
772 const int attemptLimit = 10;
773 for (int attempts = 1; attempts <= attemptLimit; ++attempts) {
774 const ssize_t result = pwrite(TheFile, buf, toWrite, offset);
775 ++statCounter.syscalls.disk.writes;
5baf2c78 776 fd_bytes(TheFile, result, IoDirection::Write);
4e4f7fd9
AR
777
778 if (result < 0) {
779 ipcIo.xerrno = errno;
780 assert(ipcIo.xerrno);
82ed74dc
AR
781 debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " failure" <<
782 " writing " << toWrite << '/' << ipcIo.len <<
4e4f7fd9
AR
783 " at " << ipcIo.offset << '+' << wroteSoFar <<
784 " on " << attempts << " try: " << xstrerr(ipcIo.xerrno));
785 ipcIo.len = wroteSoFar;
786 return; // bail on error
787 }
2d338731 788
4e4f7fd9 789 const size_t wroteNow = static_cast<size_t>(result); // result >= 0
9a51593d 790 ipcIo.xerrno = 0;
4e4f7fd9
AR
791
792 debugs(47,3, "disker" << KidIdentifier << " wrote " <<
793 (wroteNow >= toWrite ? "all " : "just ") << wroteNow <<
794 " out of " << toWrite << '/' << ipcIo.len << " at " <<
795 ipcIo.offset << '+' << wroteSoFar << " on " << attempts <<
796 " try");
797
798 wroteSoFar += wroteNow;
799
800 if (wroteNow >= toWrite) {
801 ipcIo.xerrno = 0;
802 ipcIo.len = wroteSoFar;
803 return; // wrote everything there was to write
804 }
805
806 buf += wroteNow;
807 offset += wroteNow;
808 toWrite -= wroteNow;
9a51593d 809 }
8ed94021 810
82ed74dc
AR
811 debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " exhausted all " <<
812 attemptLimit << " attempts while writing " <<
4e4f7fd9
AR
813 toWrite << '/' << ipcIo.len << " at " << ipcIo.offset << '+' <<
814 wroteSoFar);
815 return; // not a fatal I/O error, unless the caller treats it as such
816}
817
818static void
819diskerWrite(IpcIoMsg &ipcIo)
820{
821 diskerWriteAttempts(ipcIo); // may fail
8ed94021 822 Ipc::Mem::PutPage(ipcIo.page);
254912f3
AR
823}
824
c792f7fc 825void
df881a0f 826IpcIoFile::DiskerHandleMoreRequests(void *source)
c792f7fc 827{
bf95c10a 828 debugs(47, 7, "resuming handling requests after " <<
df881a0f 829 static_cast<const char *>(source));
c792f7fc
AR
830 DiskerHandleMoreRequestsScheduled = false;
831 IpcIoFile::DiskerHandleRequests();
832}
833
df881a0f
AR
834bool
835IpcIoFile::WaitBeforePop()
836{
6c6a656a 837 const int ioRate = queue->localRateLimit().load();
df881a0f
AR
838 const double maxRate = ioRate/1e3; // req/ms
839
840 // do we need to enforce configured I/O rate?
841 if (maxRate <= 0)
842 return false;
843
844 // is there an I/O request we could potentially delay?
30fe12bc 845 int kidId;
1e614370 846 IpcIoMsg ipcIo;
30fe12bc 847 if (!queue->peek(kidId, ipcIo)) {
1e614370 848 // unlike pop(), peek() is not reliable and does not block reader
df881a0f
AR
849 // so we must proceed with pop() even if it is likely to fail
850 return false;
851 }
852
9a585a30 853 static timeval LastIo = current_time;
df881a0f
AR
854
855 const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
856 // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
857 const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
858
859 const double credit = ioDuration; // what the last I/O should have cost us
860 const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
861 LastIo = current_time;
862
863 Ipc::QueueReader::Balance &balance = queue->localBalance();
864 balance += static_cast<int64_t>(credit - debit);
865
bf95c10a 866 debugs(47, 7, "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
df881a0f 867
1e614370
DK
868 if (ipcIo.command == IpcIo::cmdWrite && balance > maxImbalance) {
869 // if the next request is (likely) write and we accumulated
870 // too much time for future slow I/Os, then shed accumulated
871 // time to keep just half of the excess
df881a0f 872 const int64_t toSpend = balance - maxImbalance/2;
69321ae9
AR
873
874 if (toSpend/1e3 > Timeout)
82ed74dc
AR
875 debugs(47, DBG_IMPORTANT, "WARNING: " << DbName << " delays " <<
876 "I/O requests for " << (toSpend/1e3) << " seconds " <<
877 "to obey " << ioRate << "/sec rate limit");
69321ae9 878
bf95c10a 879 debugs(47, 3, "rate limiting by " << toSpend << " ms to get" <<
df881a0f
AR
880 (1e3*maxRate) << "/sec rate");
881 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
882 &IpcIoFile::DiskerHandleMoreRequests,
883 const_cast<char*>("rate limiting"),
884 toSpend/1e3, 0, false);
885 DiskerHandleMoreRequestsScheduled = true;
886 return true;
e29ccb57 887 } else if (balance < -maxImbalance) {
df881a0f
AR
888 // do not owe "too much" to avoid "too large" bursts of I/O
889 balance = -maxImbalance;
890 }
891
892 return false;
893}
894
254912f3 895void
f5591061 896IpcIoFile::DiskerHandleRequests()
254912f3 897{
c792f7fc 898 // Balance our desire to maximize the number of concurrent I/O requests
fdb3059b 899 // (reordred by OS to minimize seek time) with a requirement to
c792f7fc 900 // send 1st-I/O notification messages, process Coordinator events, etc.
fdb3059b
AR
901 const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms
902 const timeval loopStart = current_time;
903
c792f7fc 904 int popped = 0;
fa61cefe
AR
905 int workerId = 0;
906 IpcIoMsg ipcIo;
df881a0f 907 while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
fdb3059b
AR
908 ++popped;
909
910 // at least one I/O per call is guaranteed if the queue is not empty
fa61cefe
AR
911 DiskerHandleRequest(workerId, ipcIo);
912
fdb3059b
AR
913 getCurrentTime();
914 const double elapsedMsec = tvSubMsec(loopStart, current_time);
915 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
916 if (!DiskerHandleMoreRequestsScheduled) {
917 // the gap must be positive for select(2) to be given a chance
918 const double minBreakSecs = 0.001;
919 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
9199139f 920 &IpcIoFile::DiskerHandleMoreRequests,
df881a0f
AR
921 const_cast<char*>("long I/O loop"),
922 minBreakSecs, 0, false);
fdb3059b
AR
923 DiskerHandleMoreRequestsScheduled = true;
924 }
bf95c10a 925 debugs(47, 3, "pausing after " << popped << " I/Os in " <<
9199139f 926 elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O");
fdb3059b
AR
927 break;
928 }
c792f7fc
AR
929 }
930
fdb3059b 931 // TODO: consider using O_DIRECT with "elevator" optimization where we pop
fa61cefe
AR
932 // requests first, then reorder the popped requests to optimize seek time,
933 // then do I/O, then take a break, and come back for the next set of I/O
934 // requests.
9a51593d 935}
254912f3 936
9a51593d
DK
937/// called when disker receives an I/O request
938void
939IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
940{
9a51593d 941 if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
82ed74dc 942 debugs(0, DBG_CRITICAL, "ERROR: " << DbName <<
9a51593d
DK
943 " should not receive " << ipcIo.command <<
944 " ipcIo" << workerId << '.' << ipcIo.requestId);
945 return;
946 }
947
bf95c10a 948 debugs(47,5, "disker" << KidIdentifier <<
9a51593d
DK
949 (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
950 ipcIo.len << " at " << ipcIo.offset <<
951 " ipcIo" << workerId << '.' << ipcIo.requestId);
952
30fe12bc
EB
953 const auto workerPid = ipcIo.workerPid;
954 assert(workerPid >= 0);
955
9a51593d
DK
956 if (ipcIo.command == IpcIo::cmdRead)
957 diskerRead(ipcIo);
958 else // ipcIo.command == IpcIo::cmdWrite
959 diskerWrite(ipcIo);
960
30fe12bc
EB
961 assert(ipcIo.workerPid == workerPid);
962
bf95c10a 963 debugs(47, 7, "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
fa61cefe 964
7a907247 965 try {
f5591061 966 if (queue->push(workerId, ipcIo))
5e44782e 967 Notify(workerId); // must notify worker
f5591061 968 } catch (const Queue::Full &) {
387e1d45
EB
969 // The worker pop queue should not overflow because the worker can
970 // push only if pendingRequests() is less than QueueCapacity.
d816f28d 971 debugs(47, DBG_IMPORTANT, "ERROR: Squid BUG: Worker I/O pop queue for " <<
82ed74dc 972 DbName << " overflow: " <<
5e44782e 973 SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
fa61cefe
AR
974
975 // the I/O request we could not push will timeout
7a907247 976 }
254912f3
AR
977}
978
979static bool
ced8def3 980DiskerOpen(const SBuf &path, int flags, mode_t)
254912f3
AR
981{
982 assert(TheFile < 0);
983
82ed74dc 984 DbName = path;
7015a149 985 TheFile = file_open(DbName.c_str(), flags);
254912f3
AR
986
987 if (TheFile < 0) {
988 const int xerrno = errno;
82ed74dc 989 debugs(47, DBG_CRITICAL, "ERROR: cannot open " << DbName << ": " <<
254912f3
AR
990 xstrerr(xerrno));
991 return false;
9a51593d 992 }
254912f3 993
cb4185f1 994 ++store_open_disk_fd;
7015a149 995 debugs(79,3, "rock db opened " << DbName << ": FD " << TheFile);
9a51593d 996 return true;
254912f3
AR
997}
998
999static void
7015a149 1000DiskerClose(const SBuf &path)
254912f3
AR
1001{
1002 if (TheFile >= 0) {
1003 file_close(TheFile);
bf95c10a 1004 debugs(79,3, "rock db closed " << path << ": FD " << TheFile);
254912f3 1005 TheFile = -1;
5e263176 1006 --store_open_disk_fd;
a1c98830 1007 }
7015a149 1008 DbName.clear();
254912f3 1009}
f5591061 1010
ea2cdeb6 1011/// reports our needs for shared memory pages to Ipc::Mem::Pages
21b7990f
AR
1012/// and initializes shared memory segments used by IpcIoFile
1013class IpcIoRr: public Ipc::Mem::RegisteredRunner
ea2cdeb6
DK
1014{
1015public:
1016 /* RegisteredRunner API */
aee3523a 1017 IpcIoRr(): owner(nullptr) {}
337b9aa4
AR
1018 ~IpcIoRr() override;
1019 void claimMemoryNeeds() override;
21b7990f
AR
1020
1021protected:
1022 /* Ipc::Mem::RegisteredRunner API */
337b9aa4 1023 void create() override;
21b7990f
AR
1024
1025private:
1026 Ipc::FewToFewBiQueue::Owner *owner;
ea2cdeb6
DK
1027};
1028
230d4410 1029DefineRunnerRegistrator(IpcIoRr);
ea2cdeb6 1030
ea2cdeb6 1031void
21b7990f 1032IpcIoRr::claimMemoryNeeds()
ea2cdeb6
DK
1033{
1034 const int itemsCount = Ipc::FewToFewBiQueue::MaxItemsCount(
c11211d9 1035 ::Config.workers, ::Config.cacheSwap.n_strands, QueueCapacity);
ea2cdeb6
DK
1036 // the maximum number of shared I/O pages is approximately the
1037 // number of queue slots, we add a fudge factor to that to account
1038 // for corner cases where I/O pages are created before queue
1039 // limits are checked or destroyed long after the I/O is dequeued
29a9ff4e
DK
1040 Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage,
1041 static_cast<int>(itemsCount * 1.1));
ea2cdeb6
DK
1042}
1043
21b7990f
AR
1044void
1045IpcIoRr::create()
f5591061 1046{
e9f68413 1047 if (Config.cacheSwap.n_strands <= 0)
f5591061
DK
1048 return;
1049
4404f1c5 1050 Must(!owner);
4404f1c5 1051 owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1,
3b581957 1052 Config.cacheSwap.n_strands,
4404f1c5 1053 1 + Config.workers, sizeof(IpcIoMsg),
ea2cdeb6 1054 QueueCapacity);
f5591061
DK
1055}
1056
1057IpcIoRr::~IpcIoRr()
1058{
1059 delete owner;
1060}
f53969cc 1061