]> git.ipfire.org Git - thirdparty/squid.git/blame - src/DiskIO/IpcIo/IpcIoFile.cc
Source Format Enforcement (#763)
[thirdparty/squid.git] / src / DiskIO / IpcIo / IpcIoFile.cc
CommitLineData
254912f3 1/*
f70aedc4 2 * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
bbc27441
AJ
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
254912f3
AR
7 */
8
bbc27441
AJ
9/* DEBUG: section 47 Store Directory Routines */
10
f7f3304a 11#include "squid.h"
ccfbe8f4 12#include "base/CodeContext.h"
f5591061 13#include "base/RunnersRegistry.h"
254912f3
AR
14#include "base/TextException.h"
15#include "DiskIO/IORequestor.h"
16#include "DiskIO/IpcIo/IpcIoFile.h"
17#include "DiskIO/ReadRequest.h"
18#include "DiskIO/WriteRequest.h"
c4ad1349 19#include "fd.h"
b3f7fd88 20#include "fs_io.h"
582c2af2 21#include "globals.h"
21d845b1 22#include "ipc/mem/Pages.h"
254912f3
AR
23#include "ipc/Messages.h"
24#include "ipc/Port.h"
f5591061 25#include "ipc/Queue.h"
6ccfd70a 26#include "ipc/StrandCoord.h"
9a51593d 27#include "ipc/StrandSearch.h"
254912f3 28#include "ipc/UdsOp.h"
65e41a45 29#include "sbuf/SBuf.h"
4d5904f7 30#include "SquidConfig.h"
21d845b1
FC
31#include "SquidTime.h"
32#include "StatCounters.h"
5bed43d6 33#include "tools.h"
254912f3 34
1a30fdf5
AJ
35#include <cerrno>
36
254912f3
AR
37CBDATA_CLASS_INIT(IpcIoFile);
38
f5591061
DK
39/// shared memory segment path to use for IpcIoFile maps
40static const char *const ShmLabel = "io_file";
ea2cdeb6
DK
41/// a single worker-to-disker or disker-to-worker queue capacity; up
42/// to 2*QueueCapacity I/O requests queued between a single worker and
43/// a single disker
44// TODO: make configurable or compute from squid.conf settings if possible
45static const int QueueCapacity = 1024;
f5591061 46
fa61cefe 47const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
b2aa0934
DK
48IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
49IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
829030b5 50std::unique_ptr<IpcIoFile::Queue> IpcIoFile::queue;
254912f3 51
c792f7fc
AR
52bool IpcIoFile::DiskerHandleMoreRequestsScheduled = false;
53
7015a149
AR
54static bool DiskerOpen(const SBuf &path, int flags, mode_t mode);
55static void DiskerClose(const SBuf &path);
254912f3 56
fa61cefe
AR
57/// IpcIo wrapper for debugs() streams; XXX: find a better class name
58struct SipcIo {
59 SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
f53969cc 60 worker(aWorker), msg(aMsg), disker(aDisker) {}
fa61cefe
AR
61
62 int worker;
63 const IpcIoMsg &msg;
64 int disker;
65};
66
67std::ostream &
68operator <<(std::ostream &os, const SipcIo &sio)
69{
70 return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
d8ee9e8d 71 sio.msg.command << sio.disker;
fa61cefe
AR
72}
73
d8ee9e8d
EB
74/* IpcIo::Command */
75
76std::ostream &
77operator <<(std::ostream &os, const IpcIo::Command command)
78{
79 switch (command) {
80 case IpcIo::cmdNone:
81 return os << '-';
82 case IpcIo::cmdOpen:
83 return os << 'o';
84 case IpcIo::cmdRead:
85 return os << 'r';
86 case IpcIo::cmdWrite:
87 return os << 'w';
88 }
89 // unreachable code
90 return os << static_cast<int>(command);
91}
92
93/* IpcIoFile */
94
254912f3 95IpcIoFile::IpcIoFile(char const *aDb):
30fe12bc
EB
96 dbName(aDb),
97 myPid(getpid()),
98 diskId(-1),
99 error_(false),
100 lastRequestId(0),
f53969cc
SM
101 olderRequests(&requestMap1), newerRequests(&requestMap2),
102 timeoutCheckScheduled(false)
254912f3 103{
30fe12bc 104 assert(myPid >= 0);
254912f3
AR
105}
106
107IpcIoFile::~IpcIoFile()
108{
ebaabe74
AR
109 SWALLOW_EXCEPTIONS({
110 if (diskId >= 0) {
111 const auto i = IpcIoFiles.find(diskId);
112 Must(i != IpcIoFiles.end());
113 Must(i->second == this);
114 IpcIoFiles.erase(i);
115 }
116 });
254912f3
AR
117}
118
43ebbac3
AR
119void
120IpcIoFile::configure(const Config &cfg)
121{
122 DiskFile::configure(cfg);
123 config = cfg;
124}
125
254912f3
AR
126void
127IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
128{
129 ioRequestor = callback;
130 Must(diskId < 0); // we do not know our disker yet
f5591061
DK
131
132 if (!queue.get())
133 queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier));
254912f3
AR
134
135 if (IamDiskProcess()) {
7015a149 136 error_ = !DiskerOpen(SBuf(dbName.termedBuf()), flags, mode);
f7091279
DK
137 if (error_)
138 return;
139
b2aa0934
DK
140 diskId = KidIdentifier;
141 const bool inserted =
142 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
143 Must(inserted);
9a51593d 144
6c6a656a 145 queue->localRateLimit().store(config.ioRate);
df881a0f 146
6ccfd70a 147 Ipc::StrandMessage::NotifyCoordinator(Ipc::mtRegisterStrand, dbName.termedBuf());
f7091279 148
e528e570 149 ioRequestor->ioCompletedNotification();
254912f3 150 return;
9a51593d 151 }
254912f3 152
4c218615 153 const Ipc::StrandSearchRequest request(dbName);
9a51593d
DK
154 Ipc::TypedMsgHdr msg;
155 request.pack(msg);
1ee292b7 156 Ipc::SendMessage(Ipc::Port::CoordinatorAddr(), msg);
9a51593d 157
b2aa0934
DK
158 WaitingForOpen.push_back(this);
159
160 eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
161 this, Timeout, 0, false); // "this" pointer is used as id
254912f3
AR
162}
163
164void
6ccfd70a 165IpcIoFile::openCompleted(const Ipc::StrandMessage *const response)
9199139f 166{
9a51593d 167 Must(diskId < 0); // we do not know our disker yet
9a51593d
DK
168
169 if (!response) {
82ed74dc
AR
170 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " communication " <<
171 "channel establishment timeout");
caca86d7 172 error_ = true;
9a51593d
DK
173 } else {
174 diskId = response->strand.kidId;
175 if (diskId >= 0) {
b2aa0934
DK
176 const bool inserted =
177 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
178 Must(inserted);
9a51593d 179 } else {
254912f3 180 error_ = true;
82ed74dc
AR
181 debugs(79, DBG_IMPORTANT, "ERROR: no disker claimed " <<
182 "responsibility for " << dbName);
9a51593d
DK
183 }
184 }
254912f3
AR
185
186 ioRequestor->ioCompletedNotification();
187}
188
189/**
190 * Alias for IpcIoFile::open(...)
191 \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
192 */
193void
194IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
195{
196 assert(false); // check
197 /* We use the same logic path for open */
198 open(flags, mode, callback);
199}
200
201void
202IpcIoFile::close()
203{
204 assert(ioRequestor != NULL);
205
206 if (IamDiskProcess())
7015a149 207 DiskerClose(SBuf(dbName.termedBuf()));
9a51593d 208 // XXX: else nothing to do?
254912f3
AR
209
210 ioRequestor->closeCompleted();
211}
212
213bool
214IpcIoFile::canRead() const
215{
7e2d5ef6 216 return diskId >= 0 && !error_ && canWait();
254912f3
AR
217}
218
219bool
220IpcIoFile::canWrite() const
221{
7e2d5ef6 222 return diskId >= 0 && !error_ && canWait();
254912f3
AR
223}
224
225bool
226IpcIoFile::error() const
227{
228 return error_;
229}
230
231void
232IpcIoFile::read(ReadRequest *readRequest)
233{
234 debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " <<
9199139f 235 readRequest->offset << ")");
254912f3
AR
236
237 assert(ioRequestor != NULL);
254912f3
AR
238 assert(readRequest->offset >= 0);
239 Must(!error_);
240
241 //assert(minOffset < 0 || minOffset <= readRequest->offset);
242 //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
243
9a51593d 244 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
254912f3 245 pending->readRequest = readRequest;
7a907247 246 push(pending);
254912f3
AR
247}
248
249void
250IpcIoFile::readCompleted(ReadRequest *readRequest,
8ed94021 251 IpcIoMsg *const response)
254912f3 252{
caca86d7 253 bool ioError = false;
9a51593d 254 if (!response) {
0ef0509e 255 debugs(79, 3, HERE << "error: timeout");
caca86d7 256 ioError = true; // I/O timeout does not warrant setting error_?
9a51593d 257 } else {
70e3f706 258 if (response->xerrno) {
82ed74dc
AR
259 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read: " <<
260 xstrerr(response->xerrno));
70e3f706 261 ioError = error_ = true;
9199139f 262 } else if (!response->page) {
82ed74dc
AR
263 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read ran " <<
264 "out of shared memory pages");
70e3f706
DK
265 ioError = true;
266 } else {
267 const char *const buf = Ipc::Mem::PagePointer(response->page);
268 memcpy(readRequest->buf, buf, response->len);
269 }
254912f3 270
70e3f706
DK
271 Ipc::Mem::PutPage(response->page);
272 }
8ed94021 273
caca86d7 274 const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
9199139f 275 const int errflag = ioError ? DISK_ERROR :DISK_OK;
20b0e1fe 276 ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
254912f3
AR
277}
278
279void
280IpcIoFile::write(WriteRequest *writeRequest)
281{
282 debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " <<
9199139f 283 writeRequest->offset << ")");
254912f3
AR
284
285 assert(ioRequestor != NULL);
254912f3
AR
286 assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
287 assert(writeRequest->offset >= 0);
288 Must(!error_);
289
290 //assert(minOffset < 0 || minOffset <= writeRequest->offset);
291 //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
292
9a51593d 293 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
254912f3 294 pending->writeRequest = writeRequest;
7a907247 295 push(pending);
254912f3
AR
296}
297
298void
299IpcIoFile::writeCompleted(WriteRequest *writeRequest,
9a51593d 300 const IpcIoMsg *const response)
254912f3 301{
caca86d7 302 bool ioError = false;
9a51593d 303 if (!response) {
4e4f7fd9 304 debugs(79, 3, "disker " << diskId << " timeout");
caca86d7 305 ioError = true; // I/O timeout does not warrant setting error_?
9199139f 306 } else if (response->xerrno) {
4e4f7fd9
AR
307 debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId <<
308 " error writing " << writeRequest->len << " bytes at " <<
309 writeRequest->offset << ": " << xstrerr(response->xerrno) <<
310 "; this worker will stop using " << dbName);
9a51593d 311 ioError = error_ = true;
9199139f 312 } else if (response->len != writeRequest->len) {
4e4f7fd9
AR
313 debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId << " wrote " <<
314 response->len << " instead of " << writeRequest->len <<
315 " bytes (offset " << writeRequest->offset << "); " <<
316 "this worker will stop using " << dbName);
254912f3
AR
317 error_ = true;
318 }
319
320 if (writeRequest->free_func)
321 (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
322
caca86d7 323 if (!ioError) {
254912f3 324 debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
9199139f
AR
325 diskId << " at " << writeRequest->offset);
326 }
254912f3 327
caca86d7 328 const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
9199139f 329 const int errflag = ioError ? DISK_ERROR :DISK_OK;
254912f3
AR
330 ioRequestor->writeCompleted(errflag, rlen, writeRequest);
331}
332
333bool
334IpcIoFile::ioInProgress() const
335{
9a51593d 336 return !olderRequests->empty() || !newerRequests->empty();
254912f3
AR
337}
338
9a51593d 339/// track a new pending request
254912f3 340void
28bd45ba 341IpcIoFile::trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
254912f3 342{
28bd45ba
AR
343 const std::pair<RequestMap::iterator,bool> result =
344 newerRequests->insert(std::make_pair(id, pending));
345 Must(result.second); // failures means that id was not unique
9a51593d
DK
346 if (!timeoutCheckScheduled)
347 scheduleTimeoutCheck();
348}
254912f3 349
9a51593d
DK
350/// push an I/O request to disker
351void
7a907247 352IpcIoFile::push(IpcIoPendingRequest *const pending)
9a51593d 353{
fa61cefe 354 // prevent queue overflows: check for responses to earlier requests
28bd45ba 355 // warning: this call may result in indirect push() recursion
ccfbe8f4
AR
356 CallService(nullptr, [] {
357 HandleResponses("before push");
358 });
fa61cefe 359
a1c98830 360 debugs(47, 7, HERE);
9a51593d 361 Must(diskId >= 0);
7a907247
DK
362 Must(pending);
363 Must(pending->readRequest || pending->writeRequest);
9a51593d
DK
364
365 IpcIoMsg ipcIo;
8ed94021 366 try {
28bd45ba
AR
367 if (++lastRequestId == 0) // don't use zero value as requestId
368 ++lastRequestId;
8ed94021 369 ipcIo.requestId = lastRequestId;
0a11e039 370 ipcIo.start = current_time;
30fe12bc 371 ipcIo.workerPid = myPid;
8ed94021
DK
372 if (pending->readRequest) {
373 ipcIo.command = IpcIo::cmdRead;
374 ipcIo.offset = pending->readRequest->offset;
375 ipcIo.len = pending->readRequest->len;
376 } else { // pending->writeRequest
377 Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
551f8a18 378 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
8ed94021 379 ipcIo.len = 0;
551f8a18 380 throw TexcHere("run out of shared memory pages for IPC I/O");
8ed94021
DK
381 }
382 ipcIo.command = IpcIo::cmdWrite;
383 ipcIo.offset = pending->writeRequest->offset;
384 ipcIo.len = pending->writeRequest->len;
385 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
386 memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
387 }
254912f3 388
f5591061 389 debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
fa61cefe 390
387e1d45
EB
391 // protect DiskerHandleRequest() from pop queue overflow
392 if (pendingRequests() >= QueueCapacity)
393 throw Ipc::OneToOneUniQueue::Full();
394
f5591061 395 if (queue->push(diskId, ipcIo))
fa61cefe 396 Notify(diskId); // must notify disker
28bd45ba 397 trackPendingRequest(ipcIo.requestId, pending);
f5591061 398 } catch (const Queue::Full &) {
82ed74dc
AR
399 debugs(47, DBG_IMPORTANT, "ERROR: worker I/O push queue for " <<
400 dbName << " overflow: " <<
fa61cefe
AR
401 SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
402 // TODO: grow queue size
d0c5cf9e
CG
403 if (ipcIo.page)
404 Ipc::Mem::PutPage(ipcIo.page);
9199139f 405
0ef0509e 406 pending->completeIo(NULL);
8ed94021
DK
407 delete pending;
408 } catch (const TextException &e) {
82ed74dc 409 debugs(47, DBG_IMPORTANT, "ERROR: " << dbName << " exception: " << e.what());
0ef0509e 410 pending->completeIo(NULL);
7a907247 411 delete pending;
9a51593d 412 }
254912f3
AR
413}
414
0a11e039
AR
415/// whether we think there is enough time to complete the I/O
416bool
9199139f
AR
417IpcIoFile::canWait() const
418{
43ebbac3 419 if (!config.ioTimeout)
0a11e039
AR
420 return true; // no timeout specified
421
422 IpcIoMsg oldestIo;
5aac671b 423 if (!queue->findOldest(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
0a11e039
AR
424 return true; // we cannot estimate expected wait time; assume it is OK
425
55939a01
AR
426 const int oldestWait = tvSubMsec(oldestIo.start, current_time);
427
428 int rateWait = -1; // time in millisecons
6c6a656a 429 const int ioRate = queue->rateLimit(diskId).load();
55939a01
AR
430 if (ioRate > 0) {
431 // if there are N requests pending, the new one will wait at
432 // least N/max-swap-rate seconds
75017bc9 433 rateWait = static_cast<int>(1e3 * queue->outSize(diskId) / ioRate);
55939a01
AR
434 // adjust N/max-swap-rate value based on the queue "balance"
435 // member, in case we have been borrowing time against future
436 // I/O already
437 rateWait += queue->balance(diskId);
438 }
439
440 const int expectedWait = max(oldestWait, rateWait);
0a11e039 441 if (expectedWait < 0 ||
43ebbac3 442 static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
2f8abb64 443 return true; // expected wait time is acceptable
0a11e039 444
c792f7fc
AR
445 debugs(47,2, HERE << "cannot wait: " << expectedWait <<
446 " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
0a11e039
AR
447 return false; // do not want to wait that long
448}
449
9a51593d 450/// called when coordinator responds to worker open request
254912f3 451void
6ccfd70a 452IpcIoFile::HandleOpenResponse(const Ipc::StrandMessage &response)
254912f3 453{
9a51593d 454 debugs(47, 7, HERE << "coordinator response to open request");
b2aa0934 455 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
9199139f 456 i != WaitingForOpen.end(); ++i) {
b2aa0934
DK
457 if (response.strand.tag == (*i)->dbName) {
458 (*i)->openCompleted(&response);
459 WaitingForOpen.erase(i);
460 return;
461 }
462 }
463
464 debugs(47, 4, HERE << "LATE disker response to open for " <<
465 response.strand.tag);
466 // nothing we can do about it; completeIo() has been called already
9a51593d 467}
254912f3 468
9a51593d 469void
f5591061 470IpcIoFile::HandleResponses(const char *const when)
fa61cefe
AR
471{
472 debugs(47, 4, HERE << "popping all " << when);
fa61cefe
AR
473 IpcIoMsg ipcIo;
474 // get all responses we can: since we are not pushing, this will stop
f5591061
DK
475 int diskId;
476 while (queue->pop(diskId, ipcIo)) {
477 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
478 Must(i != IpcIoFiles.end()); // TODO: warn but continue
479 i->second->handleResponse(ipcIo);
480 }
9a51593d 481}
254912f3 482
9a51593d 483void
8ed94021 484IpcIoFile::handleResponse(IpcIoMsg &ipcIo)
9a51593d
DK
485{
486 const int requestId = ipcIo.requestId;
fa61cefe 487
9a51593d
DK
488 Must(requestId);
489 if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
ccfbe8f4
AR
490 CallBack(pending->codeContext, [&] {
491 debugs(47, 7, "popped disker response to " << SipcIo(KidIdentifier, ipcIo, diskId));
30fe12bc
EB
492 if (myPid == ipcIo.workerPid)
493 pending->completeIo(&ipcIo);
494 else
495 debugs(47, 5, "ignoring response meant for our predecessor PID: " << ipcIo.workerPid);
ccfbe8f4
AR
496 delete pending; // XXX: leaking if throwing
497 });
caca86d7 498 } else {
ccfbe8f4 499 debugs(47, 4, "LATE disker response to " << SipcIo(KidIdentifier, ipcIo, diskId));
caca86d7
AR
500 // nothing we can do about it; completeIo() has been called already
501 }
254912f3
AR
502}
503
254912f3 504void
9a51593d 505IpcIoFile::Notify(const int peerId)
254912f3 506{
fa61cefe 507 // TODO: Count and report the total number of notifications, pops, pushes.
a1c98830 508 debugs(47, 7, HERE << "kid" << peerId);
9a51593d
DK
509 Ipc::TypedMsgHdr msg;
510 msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
511 msg.putInt(KidIdentifier);
1ee292b7 512 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, peerId);
9a51593d
DK
513 Ipc::SendMessage(addr, msg);
514}
515
516void
517IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
518{
fa61cefe
AR
519 const int from = msg.getInt();
520 debugs(47, 7, HERE << "from " << from);
f5591061
DK
521 queue->clearReaderSignal(from);
522 if (IamDiskProcess())
523 DiskerHandleRequests();
524 else
525 HandleResponses("after notification");
9a51593d
DK
526}
527
d8ee9e8d
EB
528void
529IpcIoFile::StatQueue(std::ostream &os)
530{
531 if (queue.get()) {
532 os << "SMP disk I/O queues:\n";
533 queue->stat<IpcIoMsg>(os);
534 }
535}
536
b2aa0934
DK
537/// handles open request timeout
538void
539IpcIoFile::OpenTimeout(void *const param)
540{
541 Must(param);
542 // the pointer is used for comparison only and not dereferenced
543 const IpcIoFile *const ipcIoFile =
544 reinterpret_cast<const IpcIoFile *>(param);
545 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
9199139f 546 i != WaitingForOpen.end(); ++i) {
b2aa0934
DK
547 if (*i == ipcIoFile) {
548 (*i)->openCompleted(NULL);
549 WaitingForOpen.erase(i);
550 break;
551 }
552 }
553}
554
9a51593d
DK
555/// IpcIoFile::checkTimeouts wrapper
556void
557IpcIoFile::CheckTimeouts(void *const param)
558{
9a51593d 559 Must(param);
b2aa0934
DK
560 const int diskId = reinterpret_cast<uintptr_t>(param);
561 debugs(47, 7, HERE << "diskId=" << diskId);
562 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
563 if (i != IpcIoFiles.end())
564 i->second->checkTimeouts();
9a51593d
DK
565}
566
567void
568IpcIoFile::checkTimeouts()
569{
9a51593d 570 timeoutCheckScheduled = false;
254912f3 571
0ef0509e
AR
572 // last chance to recover in case a notification message was lost, etc.
573 const RequestMap::size_type timeoutsBefore = olderRequests->size();
574 HandleResponses("before timeout");
575 const RequestMap::size_type timeoutsNow = olderRequests->size();
576
577 if (timeoutsBefore > timeoutsNow) { // some requests were rescued
578 // notification message lost or significantly delayed?
82ed74dc
AR
579 debugs(47, DBG_IMPORTANT, "WARNING: communication with " << dbName <<
580 " may be too slow or disrupted for about " <<
0ef0509e
AR
581 Timeout << "s; rescued " << (timeoutsBefore - timeoutsNow) <<
582 " out of " << timeoutsBefore << " I/Os");
583 }
584
585 if (timeoutsNow) {
586 debugs(47, DBG_IMPORTANT, "WARNING: abandoning " <<
82ed74dc 587 timeoutsNow << ' ' << dbName << " I/Os after at least " <<
0ef0509e
AR
588 Timeout << "s timeout");
589 }
590
caca86d7
AR
591 // any old request would have timed out by now
592 typedef RequestMap::const_iterator RMCI;
9a51593d
DK
593 for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
594 IpcIoPendingRequest *const pending = i->second;
ccfbe8f4
AR
595 CallBack(pending->codeContext, [&] {
596 const auto requestId = i->first;
597 debugs(47, 7, "disker timeout; ipcIo" << KidIdentifier << '.' << requestId);
598 pending->completeIo(nullptr); // no response
599 delete pending; // XXX: leaking if throwing
600 });
9a51593d
DK
601 }
602 olderRequests->clear();
caca86d7 603
9a51593d 604 swap(olderRequests, newerRequests); // switches pointers around
0ef0509e 605 if (!olderRequests->empty() && !timeoutCheckScheduled)
9a51593d 606 scheduleTimeoutCheck();
254912f3
AR
607}
608
caca86d7 609/// prepare to check for timeouts in a little while
254912f3 610void
9a51593d 611IpcIoFile::scheduleTimeoutCheck()
254912f3 612{
ccfbe8f4
AR
613 // We may be running in an I/O requestor CodeContext, but are scheduling
614 // one-for-all CheckTimeouts() that is not specific to any request.
615 CallService(nullptr, [&] {
616 // we check all older requests at once so some may be wait for 2*Timeout
617 eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
618 reinterpret_cast<void *>(diskId), Timeout, 0, false);
619 timeoutCheckScheduled = true;
620 });
254912f3
AR
621}
622
623/// returns and forgets the right IpcIoFile pending request
624IpcIoPendingRequest *
9a51593d 625IpcIoFile::dequeueRequest(const unsigned int requestId)
254912f3 626{
254912f3 627 Must(requestId != 0);
caca86d7 628
9a51593d
DK
629 RequestMap *map = NULL;
630 RequestMap::iterator i = requestMap1.find(requestId);
caca86d7 631
9a51593d
DK
632 if (i != requestMap1.end())
633 map = &requestMap1;
caca86d7 634 else {
9a51593d
DK
635 i = requestMap2.find(requestId);
636 if (i != requestMap2.end())
637 map = &requestMap2;
caca86d7
AR
638 }
639
640 if (!map) // not found in both maps
641 return NULL;
642
643 IpcIoPendingRequest *pending = i->second;
644 map->erase(i);
645 return pending;
254912f3
AR
646}
647
648int
9a51593d 649IpcIoFile::getFD() const
254912f3
AR
650{
651 assert(false); // not supported; TODO: remove this method from API
652 return -1;
653}
654
9a51593d 655/* IpcIoMsg */
254912f3 656
9a51593d 657IpcIoMsg::IpcIoMsg():
f53969cc
SM
658 requestId(0),
659 offset(0),
660 len(0),
30fe12bc 661 workerPid(-1), // Unix-like systems use process IDs starting from 0
f53969cc
SM
662 command(IpcIo::cmdNone),
663 xerrno(0)
254912f3 664{
0a11e039 665 start.tv_sec = 0;
e19994df 666 start.tv_usec = 0;
254912f3
AR
667}
668
d8ee9e8d
EB
669void
670IpcIoMsg::stat(std::ostream &os)
671{
672 timeval elapsedTime;
673 tvSub(elapsedTime, start, current_time);
674 os << "id: " << requestId <<
70ac5b29 675 ", offset: " << offset <<
676 ", size: " << len <<
677 ", workerPid: " << workerPid <<
678 ", page: " << page <<
679 ", command: " << command <<
680 ", start: " << start <<
681 ", elapsed: " << elapsedTime <<
682 ", errno: " << xerrno;
d8ee9e8d
EB
683}
684
caca86d7 685/* IpcIoPendingRequest */
254912f3
AR
686
687IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
ccfbe8f4
AR
688 file(aFile),
689 readRequest(nullptr),
690 writeRequest(nullptr),
691 codeContext(CodeContext::Current())
254912f3
AR
692{
693}
694
caca86d7 695void
8ed94021 696IpcIoPendingRequest::completeIo(IpcIoMsg *const response)
caca86d7 697{
caca86d7
AR
698 if (readRequest)
699 file->readCompleted(readRequest, response);
9199139f 700 else if (writeRequest)
caca86d7 701 file->writeCompleted(writeRequest, response);
9a51593d
DK
702 else {
703 Must(!response); // only timeouts are handled here
704 file->openCompleted(NULL);
705 }
caca86d7
AR
706}
707
254912f3
AR
708/* XXX: disker code that should probably be moved elsewhere */
709
7015a149 710static SBuf DbName; ///< full db file name
254912f3
AR
711static int TheFile = -1; ///< db file descriptor
712
9a51593d
DK
713static void
714diskerRead(IpcIoMsg &ipcIo)
254912f3 715{
551f8a18 716 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
8ed94021 717 ipcIo.len = 0;
c792f7fc 718 debugs(47,2, HERE << "run out of shared memory pages for IPC I/O");
8ed94021
DK
719 return;
720 }
721
722 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
723 const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
e4f1fdae 724 ++statCounter.syscalls.disk.reads;
2d338731
AR
725 fd_bytes(TheFile, read, FD_READ);
726
254912f3 727 if (read >= 0) {
9a51593d
DK
728 ipcIo.xerrno = 0;
729 const size_t len = static_cast<size_t>(read); // safe because read > 0
254912f3 730 debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
9199139f 731 (len == ipcIo.len ? "all " : "just ") << read);
9a51593d
DK
732 ipcIo.len = len;
733 } else {
734 ipcIo.xerrno = errno;
735 ipcIo.len = 0;
254912f3 736 debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
9199139f 737 ipcIo.xerrno);
9a51593d 738 }
254912f3
AR
739}
740
9d4e9cfb 741/// Tries to write buffer to disk (a few times if needed);
4e4f7fd9 742/// sets ipcIo results, but does no cleanup. The caller must cleanup.
9a51593d 743static void
4e4f7fd9
AR
744diskerWriteAttempts(IpcIoMsg &ipcIo)
745{
746 const char *buf = Ipc::Mem::PagePointer(ipcIo.page);
747 size_t toWrite = min(ipcIo.len, Ipc::Mem::PageSize());
748 size_t wroteSoFar = 0;
749 off_t offset = ipcIo.offset;
750 // Partial writes to disk do happen. It is unlikely that the caller can
751 // handle partial writes by doing something other than writing leftovers
752 // again, so we try to write them ourselves to minimize overheads.
753 const int attemptLimit = 10;
754 for (int attempts = 1; attempts <= attemptLimit; ++attempts) {
755 const ssize_t result = pwrite(TheFile, buf, toWrite, offset);
756 ++statCounter.syscalls.disk.writes;
757 fd_bytes(TheFile, result, FD_WRITE);
758
759 if (result < 0) {
760 ipcIo.xerrno = errno;
761 assert(ipcIo.xerrno);
82ed74dc
AR
762 debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " failure" <<
763 " writing " << toWrite << '/' << ipcIo.len <<
4e4f7fd9
AR
764 " at " << ipcIo.offset << '+' << wroteSoFar <<
765 " on " << attempts << " try: " << xstrerr(ipcIo.xerrno));
766 ipcIo.len = wroteSoFar;
767 return; // bail on error
768 }
2d338731 769
4e4f7fd9 770 const size_t wroteNow = static_cast<size_t>(result); // result >= 0
9a51593d 771 ipcIo.xerrno = 0;
4e4f7fd9
AR
772
773 debugs(47,3, "disker" << KidIdentifier << " wrote " <<
774 (wroteNow >= toWrite ? "all " : "just ") << wroteNow <<
775 " out of " << toWrite << '/' << ipcIo.len << " at " <<
776 ipcIo.offset << '+' << wroteSoFar << " on " << attempts <<
777 " try");
778
779 wroteSoFar += wroteNow;
780
781 if (wroteNow >= toWrite) {
782 ipcIo.xerrno = 0;
783 ipcIo.len = wroteSoFar;
784 return; // wrote everything there was to write
785 }
786
787 buf += wroteNow;
788 offset += wroteNow;
789 toWrite -= wroteNow;
9a51593d 790 }
8ed94021 791
82ed74dc
AR
792 debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " exhausted all " <<
793 attemptLimit << " attempts while writing " <<
4e4f7fd9
AR
794 toWrite << '/' << ipcIo.len << " at " << ipcIo.offset << '+' <<
795 wroteSoFar);
796 return; // not a fatal I/O error, unless the caller treats it as such
797}
798
799static void
800diskerWrite(IpcIoMsg &ipcIo)
801{
802 diskerWriteAttempts(ipcIo); // may fail
8ed94021 803 Ipc::Mem::PutPage(ipcIo.page);
254912f3
AR
804}
805
c792f7fc 806void
df881a0f 807IpcIoFile::DiskerHandleMoreRequests(void *source)
c792f7fc 808{
df881a0f
AR
809 debugs(47, 7, HERE << "resuming handling requests after " <<
810 static_cast<const char *>(source));
c792f7fc
AR
811 DiskerHandleMoreRequestsScheduled = false;
812 IpcIoFile::DiskerHandleRequests();
813}
814
df881a0f
AR
815bool
816IpcIoFile::WaitBeforePop()
817{
6c6a656a 818 const int ioRate = queue->localRateLimit().load();
df881a0f
AR
819 const double maxRate = ioRate/1e3; // req/ms
820
821 // do we need to enforce configured I/O rate?
822 if (maxRate <= 0)
823 return false;
824
825 // is there an I/O request we could potentially delay?
30fe12bc 826 int kidId;
1e614370 827 IpcIoMsg ipcIo;
30fe12bc 828 if (!queue->peek(kidId, ipcIo)) {
1e614370 829 // unlike pop(), peek() is not reliable and does not block reader
df881a0f
AR
830 // so we must proceed with pop() even if it is likely to fail
831 return false;
832 }
833
9a585a30 834 static timeval LastIo = current_time;
df881a0f
AR
835
836 const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
837 // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
838 const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
839
840 const double credit = ioDuration; // what the last I/O should have cost us
841 const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
842 LastIo = current_time;
843
844 Ipc::QueueReader::Balance &balance = queue->localBalance();
845 balance += static_cast<int64_t>(credit - debit);
846
847 debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
848
1e614370
DK
849 if (ipcIo.command == IpcIo::cmdWrite && balance > maxImbalance) {
850 // if the next request is (likely) write and we accumulated
851 // too much time for future slow I/Os, then shed accumulated
852 // time to keep just half of the excess
df881a0f 853 const int64_t toSpend = balance - maxImbalance/2;
69321ae9
AR
854
855 if (toSpend/1e3 > Timeout)
82ed74dc
AR
856 debugs(47, DBG_IMPORTANT, "WARNING: " << DbName << " delays " <<
857 "I/O requests for " << (toSpend/1e3) << " seconds " <<
858 "to obey " << ioRate << "/sec rate limit");
69321ae9 859
df881a0f
AR
860 debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" <<
861 (1e3*maxRate) << "/sec rate");
862 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
863 &IpcIoFile::DiskerHandleMoreRequests,
864 const_cast<char*>("rate limiting"),
865 toSpend/1e3, 0, false);
866 DiskerHandleMoreRequestsScheduled = true;
867 return true;
e29ccb57 868 } else if (balance < -maxImbalance) {
df881a0f
AR
869 // do not owe "too much" to avoid "too large" bursts of I/O
870 balance = -maxImbalance;
871 }
872
873 return false;
874}
875
254912f3 876void
f5591061 877IpcIoFile::DiskerHandleRequests()
254912f3 878{
c792f7fc 879 // Balance our desire to maximize the number of concurrent I/O requests
fdb3059b 880 // (reordred by OS to minimize seek time) with a requirement to
c792f7fc 881 // send 1st-I/O notification messages, process Coordinator events, etc.
fdb3059b
AR
882 const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms
883 const timeval loopStart = current_time;
884
c792f7fc 885 int popped = 0;
fa61cefe
AR
886 int workerId = 0;
887 IpcIoMsg ipcIo;
df881a0f 888 while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
fdb3059b
AR
889 ++popped;
890
891 // at least one I/O per call is guaranteed if the queue is not empty
fa61cefe
AR
892 DiskerHandleRequest(workerId, ipcIo);
893
fdb3059b
AR
894 getCurrentTime();
895 const double elapsedMsec = tvSubMsec(loopStart, current_time);
896 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
897 if (!DiskerHandleMoreRequestsScheduled) {
898 // the gap must be positive for select(2) to be given a chance
899 const double minBreakSecs = 0.001;
900 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
9199139f 901 &IpcIoFile::DiskerHandleMoreRequests,
df881a0f
AR
902 const_cast<char*>("long I/O loop"),
903 minBreakSecs, 0, false);
fdb3059b
AR
904 DiskerHandleMoreRequestsScheduled = true;
905 }
906 debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " <<
9199139f 907 elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O");
fdb3059b
AR
908 break;
909 }
c792f7fc
AR
910 }
911
fdb3059b 912 // TODO: consider using O_DIRECT with "elevator" optimization where we pop
fa61cefe
AR
913 // requests first, then reorder the popped requests to optimize seek time,
914 // then do I/O, then take a break, and come back for the next set of I/O
915 // requests.
9a51593d 916}
254912f3 917
9a51593d
DK
918/// called when disker receives an I/O request
919void
920IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
921{
9a51593d 922 if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
82ed74dc 923 debugs(0, DBG_CRITICAL, "ERROR: " << DbName <<
9a51593d
DK
924 " should not receive " << ipcIo.command <<
925 " ipcIo" << workerId << '.' << ipcIo.requestId);
926 return;
927 }
928
929 debugs(47,5, HERE << "disker" << KidIdentifier <<
930 (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
931 ipcIo.len << " at " << ipcIo.offset <<
932 " ipcIo" << workerId << '.' << ipcIo.requestId);
933
30fe12bc
EB
934 const auto workerPid = ipcIo.workerPid;
935 assert(workerPid >= 0);
936
9a51593d
DK
937 if (ipcIo.command == IpcIo::cmdRead)
938 diskerRead(ipcIo);
939 else // ipcIo.command == IpcIo::cmdWrite
940 diskerWrite(ipcIo);
941
30fe12bc
EB
942 assert(ipcIo.workerPid == workerPid);
943
f5591061 944 debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
fa61cefe 945
7a907247 946 try {
f5591061 947 if (queue->push(workerId, ipcIo))
5e44782e 948 Notify(workerId); // must notify worker
f5591061 949 } catch (const Queue::Full &) {
387e1d45
EB
950 // The worker pop queue should not overflow because the worker can
951 // push only if pendingRequests() is less than QueueCapacity.
82ed74dc
AR
952 debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue for " <<
953 DbName << " overflow: " <<
5e44782e 954 SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
fa61cefe
AR
955
956 // the I/O request we could not push will timeout
7a907247 957 }
254912f3
AR
958}
959
960static bool
ced8def3 961DiskerOpen(const SBuf &path, int flags, mode_t)
254912f3
AR
962{
963 assert(TheFile < 0);
964
82ed74dc 965 DbName = path;
7015a149 966 TheFile = file_open(DbName.c_str(), flags);
254912f3
AR
967
968 if (TheFile < 0) {
969 const int xerrno = errno;
82ed74dc 970 debugs(47, DBG_CRITICAL, "ERROR: cannot open " << DbName << ": " <<
254912f3
AR
971 xstrerr(xerrno));
972 return false;
9a51593d 973 }
254912f3 974
cb4185f1 975 ++store_open_disk_fd;
7015a149 976 debugs(79,3, "rock db opened " << DbName << ": FD " << TheFile);
9a51593d 977 return true;
254912f3
AR
978}
979
980static void
7015a149 981DiskerClose(const SBuf &path)
254912f3
AR
982{
983 if (TheFile >= 0) {
984 file_close(TheFile);
985 debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
986 TheFile = -1;
5e263176 987 --store_open_disk_fd;
a1c98830 988 }
7015a149 989 DbName.clear();
254912f3 990}
f5591061 991
ea2cdeb6 992/// reports our needs for shared memory pages to Ipc::Mem::Pages
21b7990f
AR
993/// and initializes shared memory segments used by IpcIoFile
994class IpcIoRr: public Ipc::Mem::RegisteredRunner
ea2cdeb6
DK
995{
996public:
997 /* RegisteredRunner API */
21b7990f
AR
998 IpcIoRr(): owner(NULL) {}
999 virtual ~IpcIoRr();
1000 virtual void claimMemoryNeeds();
1001
1002protected:
1003 /* Ipc::Mem::RegisteredRunner API */
1004 virtual void create();
1005
1006private:
1007 Ipc::FewToFewBiQueue::Owner *owner;
ea2cdeb6
DK
1008};
1009
21b7990f 1010RunnerRegistrationEntry(IpcIoRr);
ea2cdeb6 1011
ea2cdeb6 1012void
21b7990f 1013IpcIoRr::claimMemoryNeeds()
ea2cdeb6
DK
1014{
1015 const int itemsCount = Ipc::FewToFewBiQueue::MaxItemsCount(
c11211d9 1016 ::Config.workers, ::Config.cacheSwap.n_strands, QueueCapacity);
ea2cdeb6
DK
1017 // the maximum number of shared I/O pages is approximately the
1018 // number of queue slots, we add a fudge factor to that to account
1019 // for corner cases where I/O pages are created before queue
1020 // limits are checked or destroyed long after the I/O is dequeued
29a9ff4e
DK
1021 Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage,
1022 static_cast<int>(itemsCount * 1.1));
ea2cdeb6
DK
1023}
1024
21b7990f
AR
1025void
1026IpcIoRr::create()
f5591061 1027{
e9f68413 1028 if (Config.cacheSwap.n_strands <= 0)
f5591061
DK
1029 return;
1030
4404f1c5 1031 Must(!owner);
4404f1c5 1032 owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1,
3b581957 1033 Config.cacheSwap.n_strands,
4404f1c5 1034 1 + Config.workers, sizeof(IpcIoMsg),
ea2cdeb6 1035 QueueCapacity);
f5591061
DK
1036}
1037
1038IpcIoRr::~IpcIoRr()
1039{
1040 delete owner;
1041}
f53969cc 1042