]> git.ipfire.org Git - thirdparty/squid.git/blame - src/DiskIO/IpcIo/IpcIoFile.cc
Source Format Enforcement (#532)
[thirdparty/squid.git] / src / DiskIO / IpcIo / IpcIoFile.cc
CommitLineData
254912f3 1/*
77b1029d 2 * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
bbc27441
AJ
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
254912f3
AR
7 */
8
bbc27441
AJ
9/* DEBUG: section 47 Store Directory Routines */
10
f7f3304a 11#include "squid.h"
ccfbe8f4 12#include "base/CodeContext.h"
f5591061 13#include "base/RunnersRegistry.h"
254912f3
AR
14#include "base/TextException.h"
15#include "DiskIO/IORequestor.h"
16#include "DiskIO/IpcIo/IpcIoFile.h"
17#include "DiskIO/ReadRequest.h"
18#include "DiskIO/WriteRequest.h"
c4ad1349 19#include "fd.h"
b3f7fd88 20#include "fs_io.h"
582c2af2 21#include "globals.h"
21d845b1 22#include "ipc/mem/Pages.h"
254912f3
AR
23#include "ipc/Messages.h"
24#include "ipc/Port.h"
f5591061 25#include "ipc/Queue.h"
9a51593d 26#include "ipc/StrandSearch.h"
254912f3 27#include "ipc/UdsOp.h"
65e41a45 28#include "sbuf/SBuf.h"
4d5904f7 29#include "SquidConfig.h"
21d845b1
FC
30#include "SquidTime.h"
31#include "StatCounters.h"
5bed43d6 32#include "tools.h"
254912f3 33
1a30fdf5
AJ
34#include <cerrno>
35
254912f3
AR
36CBDATA_CLASS_INIT(IpcIoFile);
37
f5591061
DK
38/// shared memory segment path to use for IpcIoFile maps
39static const char *const ShmLabel = "io_file";
ea2cdeb6
DK
40/// a single worker-to-disker or disker-to-worker queue capacity; up
41/// to 2*QueueCapacity I/O requests queued between a single worker and
42/// a single disker
43// TODO: make configurable or compute from squid.conf settings if possible
44static const int QueueCapacity = 1024;
f5591061 45
fa61cefe 46const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
b2aa0934
DK
47IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
48IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
829030b5 49std::unique_ptr<IpcIoFile::Queue> IpcIoFile::queue;
254912f3 50
c792f7fc
AR
51bool IpcIoFile::DiskerHandleMoreRequestsScheduled = false;
52
7015a149
AR
53static bool DiskerOpen(const SBuf &path, int flags, mode_t mode);
54static void DiskerClose(const SBuf &path);
254912f3 55
fa61cefe
AR
56/// IpcIo wrapper for debugs() streams; XXX: find a better class name
57struct SipcIo {
58 SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
f53969cc 59 worker(aWorker), msg(aMsg), disker(aDisker) {}
fa61cefe
AR
60
61 int worker;
62 const IpcIoMsg &msg;
63 int disker;
64};
65
66std::ostream &
67operator <<(std::ostream &os, const SipcIo &sio)
68{
69 return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
9199139f 70 (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker;
fa61cefe
AR
71}
72
254912f3 73IpcIoFile::IpcIoFile(char const *aDb):
f53969cc
SM
74 dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
75 olderRequests(&requestMap1), newerRequests(&requestMap2),
76 timeoutCheckScheduled(false)
254912f3
AR
77{
78}
79
80IpcIoFile::~IpcIoFile()
81{
ebaabe74
AR
82 SWALLOW_EXCEPTIONS({
83 if (diskId >= 0) {
84 const auto i = IpcIoFiles.find(diskId);
85 Must(i != IpcIoFiles.end());
86 Must(i->second == this);
87 IpcIoFiles.erase(i);
88 }
89 });
254912f3
AR
90}
91
43ebbac3
AR
92void
93IpcIoFile::configure(const Config &cfg)
94{
95 DiskFile::configure(cfg);
96 config = cfg;
97}
98
254912f3
AR
99void
100IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
101{
102 ioRequestor = callback;
103 Must(diskId < 0); // we do not know our disker yet
f5591061
DK
104
105 if (!queue.get())
106 queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier));
254912f3
AR
107
108 if (IamDiskProcess()) {
7015a149 109 error_ = !DiskerOpen(SBuf(dbName.termedBuf()), flags, mode);
f7091279
DK
110 if (error_)
111 return;
112
b2aa0934
DK
113 diskId = KidIdentifier;
114 const bool inserted =
115 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
116 Must(inserted);
9a51593d 117
6c6a656a 118 queue->localRateLimit().store(config.ioRate);
df881a0f 119
f7091279
DK
120 Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
121 ann.strand.tag = dbName;
122 Ipc::TypedMsgHdr message;
123 ann.pack(message);
1ee292b7 124 SendMessage(Ipc::Port::CoordinatorAddr(), message);
f7091279 125
e528e570 126 ioRequestor->ioCompletedNotification();
254912f3 127 return;
9a51593d 128 }
254912f3 129
9a51593d
DK
130 Ipc::StrandSearchRequest request;
131 request.requestorId = KidIdentifier;
132 request.tag = dbName;
254912f3 133
9a51593d
DK
134 Ipc::TypedMsgHdr msg;
135 request.pack(msg);
1ee292b7 136 Ipc::SendMessage(Ipc::Port::CoordinatorAddr(), msg);
9a51593d 137
b2aa0934
DK
138 WaitingForOpen.push_back(this);
139
140 eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
141 this, Timeout, 0, false); // "this" pointer is used as id
254912f3
AR
142}
143
144void
9199139f
AR
145IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response)
146{
9a51593d 147 Must(diskId < 0); // we do not know our disker yet
9a51593d
DK
148
149 if (!response) {
82ed74dc
AR
150 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " communication " <<
151 "channel establishment timeout");
caca86d7 152 error_ = true;
9a51593d
DK
153 } else {
154 diskId = response->strand.kidId;
155 if (diskId >= 0) {
b2aa0934
DK
156 const bool inserted =
157 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
158 Must(inserted);
9a51593d 159 } else {
254912f3 160 error_ = true;
82ed74dc
AR
161 debugs(79, DBG_IMPORTANT, "ERROR: no disker claimed " <<
162 "responsibility for " << dbName);
9a51593d
DK
163 }
164 }
254912f3
AR
165
166 ioRequestor->ioCompletedNotification();
167}
168
169/**
170 * Alias for IpcIoFile::open(...)
171 \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
172 */
173void
174IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
175{
176 assert(false); // check
177 /* We use the same logic path for open */
178 open(flags, mode, callback);
179}
180
181void
182IpcIoFile::close()
183{
184 assert(ioRequestor != NULL);
185
186 if (IamDiskProcess())
7015a149 187 DiskerClose(SBuf(dbName.termedBuf()));
9a51593d 188 // XXX: else nothing to do?
254912f3
AR
189
190 ioRequestor->closeCompleted();
191}
192
193bool
194IpcIoFile::canRead() const
195{
7e2d5ef6 196 return diskId >= 0 && !error_ && canWait();
254912f3
AR
197}
198
199bool
200IpcIoFile::canWrite() const
201{
7e2d5ef6 202 return diskId >= 0 && !error_ && canWait();
254912f3
AR
203}
204
205bool
206IpcIoFile::error() const
207{
208 return error_;
209}
210
211void
212IpcIoFile::read(ReadRequest *readRequest)
213{
214 debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " <<
9199139f 215 readRequest->offset << ")");
254912f3
AR
216
217 assert(ioRequestor != NULL);
254912f3
AR
218 assert(readRequest->offset >= 0);
219 Must(!error_);
220
221 //assert(minOffset < 0 || minOffset <= readRequest->offset);
222 //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
223
9a51593d 224 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
254912f3 225 pending->readRequest = readRequest;
7a907247 226 push(pending);
254912f3
AR
227}
228
229void
230IpcIoFile::readCompleted(ReadRequest *readRequest,
8ed94021 231 IpcIoMsg *const response)
254912f3 232{
caca86d7 233 bool ioError = false;
9a51593d 234 if (!response) {
0ef0509e 235 debugs(79, 3, HERE << "error: timeout");
caca86d7 236 ioError = true; // I/O timeout does not warrant setting error_?
9a51593d 237 } else {
70e3f706 238 if (response->xerrno) {
82ed74dc
AR
239 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read: " <<
240 xstrerr(response->xerrno));
70e3f706 241 ioError = error_ = true;
9199139f 242 } else if (!response->page) {
82ed74dc
AR
243 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read ran " <<
244 "out of shared memory pages");
70e3f706
DK
245 ioError = true;
246 } else {
247 const char *const buf = Ipc::Mem::PagePointer(response->page);
248 memcpy(readRequest->buf, buf, response->len);
249 }
254912f3 250
70e3f706
DK
251 Ipc::Mem::PutPage(response->page);
252 }
8ed94021 253
caca86d7 254 const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
9199139f 255 const int errflag = ioError ? DISK_ERROR :DISK_OK;
20b0e1fe 256 ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
254912f3
AR
257}
258
259void
260IpcIoFile::write(WriteRequest *writeRequest)
261{
262 debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " <<
9199139f 263 writeRequest->offset << ")");
254912f3
AR
264
265 assert(ioRequestor != NULL);
254912f3
AR
266 assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
267 assert(writeRequest->offset >= 0);
268 Must(!error_);
269
270 //assert(minOffset < 0 || minOffset <= writeRequest->offset);
271 //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
272
9a51593d 273 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
254912f3 274 pending->writeRequest = writeRequest;
7a907247 275 push(pending);
254912f3
AR
276}
277
278void
279IpcIoFile::writeCompleted(WriteRequest *writeRequest,
9a51593d 280 const IpcIoMsg *const response)
254912f3 281{
caca86d7 282 bool ioError = false;
9a51593d 283 if (!response) {
4e4f7fd9 284 debugs(79, 3, "disker " << diskId << " timeout");
caca86d7 285 ioError = true; // I/O timeout does not warrant setting error_?
9199139f 286 } else if (response->xerrno) {
4e4f7fd9
AR
287 debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId <<
288 " error writing " << writeRequest->len << " bytes at " <<
289 writeRequest->offset << ": " << xstrerr(response->xerrno) <<
290 "; this worker will stop using " << dbName);
9a51593d 291 ioError = error_ = true;
9199139f 292 } else if (response->len != writeRequest->len) {
4e4f7fd9
AR
293 debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId << " wrote " <<
294 response->len << " instead of " << writeRequest->len <<
295 " bytes (offset " << writeRequest->offset << "); " <<
296 "this worker will stop using " << dbName);
254912f3
AR
297 error_ = true;
298 }
299
300 if (writeRequest->free_func)
301 (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
302
caca86d7 303 if (!ioError) {
254912f3 304 debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
9199139f
AR
305 diskId << " at " << writeRequest->offset);
306 }
254912f3 307
caca86d7 308 const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
9199139f 309 const int errflag = ioError ? DISK_ERROR :DISK_OK;
254912f3
AR
310 ioRequestor->writeCompleted(errflag, rlen, writeRequest);
311}
312
313bool
314IpcIoFile::ioInProgress() const
315{
9a51593d 316 return !olderRequests->empty() || !newerRequests->empty();
254912f3
AR
317}
318
9a51593d 319/// track a new pending request
254912f3 320void
28bd45ba 321IpcIoFile::trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
254912f3 322{
28bd45ba
AR
323 const std::pair<RequestMap::iterator,bool> result =
324 newerRequests->insert(std::make_pair(id, pending));
325 Must(result.second); // failures means that id was not unique
9a51593d
DK
326 if (!timeoutCheckScheduled)
327 scheduleTimeoutCheck();
328}
254912f3 329
9a51593d
DK
330/// push an I/O request to disker
331void
7a907247 332IpcIoFile::push(IpcIoPendingRequest *const pending)
9a51593d 333{
fa61cefe 334 // prevent queue overflows: check for responses to earlier requests
28bd45ba 335 // warning: this call may result in indirect push() recursion
ccfbe8f4
AR
336 CallService(nullptr, [] {
337 HandleResponses("before push");
338 });
fa61cefe 339
a1c98830 340 debugs(47, 7, HERE);
9a51593d 341 Must(diskId >= 0);
7a907247
DK
342 Must(pending);
343 Must(pending->readRequest || pending->writeRequest);
9a51593d
DK
344
345 IpcIoMsg ipcIo;
8ed94021 346 try {
28bd45ba
AR
347 if (++lastRequestId == 0) // don't use zero value as requestId
348 ++lastRequestId;
8ed94021 349 ipcIo.requestId = lastRequestId;
0a11e039 350 ipcIo.start = current_time;
8ed94021
DK
351 if (pending->readRequest) {
352 ipcIo.command = IpcIo::cmdRead;
353 ipcIo.offset = pending->readRequest->offset;
354 ipcIo.len = pending->readRequest->len;
355 } else { // pending->writeRequest
356 Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
551f8a18 357 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
8ed94021 358 ipcIo.len = 0;
551f8a18 359 throw TexcHere("run out of shared memory pages for IPC I/O");
8ed94021
DK
360 }
361 ipcIo.command = IpcIo::cmdWrite;
362 ipcIo.offset = pending->writeRequest->offset;
363 ipcIo.len = pending->writeRequest->len;
364 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
365 memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
366 }
254912f3 367
f5591061 368 debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
fa61cefe 369
387e1d45
EB
370 // protect DiskerHandleRequest() from pop queue overflow
371 if (pendingRequests() >= QueueCapacity)
372 throw Ipc::OneToOneUniQueue::Full();
373
f5591061 374 if (queue->push(diskId, ipcIo))
fa61cefe 375 Notify(diskId); // must notify disker
28bd45ba 376 trackPendingRequest(ipcIo.requestId, pending);
f5591061 377 } catch (const Queue::Full &) {
82ed74dc
AR
378 debugs(47, DBG_IMPORTANT, "ERROR: worker I/O push queue for " <<
379 dbName << " overflow: " <<
fa61cefe
AR
380 SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
381 // TODO: grow queue size
d0c5cf9e
CG
382 if (ipcIo.page)
383 Ipc::Mem::PutPage(ipcIo.page);
9199139f 384
0ef0509e 385 pending->completeIo(NULL);
8ed94021
DK
386 delete pending;
387 } catch (const TextException &e) {
82ed74dc 388 debugs(47, DBG_IMPORTANT, "ERROR: " << dbName << " exception: " << e.what());
0ef0509e 389 pending->completeIo(NULL);
7a907247 390 delete pending;
9a51593d 391 }
254912f3
AR
392}
393
0a11e039
AR
394/// whether we think there is enough time to complete the I/O
395bool
9199139f
AR
396IpcIoFile::canWait() const
397{
43ebbac3 398 if (!config.ioTimeout)
0a11e039
AR
399 return true; // no timeout specified
400
401 IpcIoMsg oldestIo;
5aac671b 402 if (!queue->findOldest(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
0a11e039
AR
403 return true; // we cannot estimate expected wait time; assume it is OK
404
55939a01
AR
405 const int oldestWait = tvSubMsec(oldestIo.start, current_time);
406
407 int rateWait = -1; // time in millisecons
6c6a656a 408 const int ioRate = queue->rateLimit(diskId).load();
55939a01
AR
409 if (ioRate > 0) {
410 // if there are N requests pending, the new one will wait at
411 // least N/max-swap-rate seconds
75017bc9 412 rateWait = static_cast<int>(1e3 * queue->outSize(diskId) / ioRate);
55939a01
AR
413 // adjust N/max-swap-rate value based on the queue "balance"
414 // member, in case we have been borrowing time against future
415 // I/O already
416 rateWait += queue->balance(diskId);
417 }
418
419 const int expectedWait = max(oldestWait, rateWait);
0a11e039 420 if (expectedWait < 0 ||
43ebbac3 421 static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
0a11e039
AR
422 return true; // expected wait time is acceptible
423
c792f7fc
AR
424 debugs(47,2, HERE << "cannot wait: " << expectedWait <<
425 " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
0a11e039
AR
426 return false; // do not want to wait that long
427}
428
9a51593d 429/// called when coordinator responds to worker open request
254912f3 430void
9a51593d 431IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
254912f3 432{
9a51593d 433 debugs(47, 7, HERE << "coordinator response to open request");
b2aa0934 434 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
9199139f 435 i != WaitingForOpen.end(); ++i) {
b2aa0934
DK
436 if (response.strand.tag == (*i)->dbName) {
437 (*i)->openCompleted(&response);
438 WaitingForOpen.erase(i);
439 return;
440 }
441 }
442
443 debugs(47, 4, HERE << "LATE disker response to open for " <<
444 response.strand.tag);
445 // nothing we can do about it; completeIo() has been called already
9a51593d 446}
254912f3 447
9a51593d 448void
f5591061 449IpcIoFile::HandleResponses(const char *const when)
fa61cefe
AR
450{
451 debugs(47, 4, HERE << "popping all " << when);
fa61cefe
AR
452 IpcIoMsg ipcIo;
453 // get all responses we can: since we are not pushing, this will stop
f5591061
DK
454 int diskId;
455 while (queue->pop(diskId, ipcIo)) {
456 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
457 Must(i != IpcIoFiles.end()); // TODO: warn but continue
458 i->second->handleResponse(ipcIo);
459 }
9a51593d 460}
254912f3 461
9a51593d 462void
8ed94021 463IpcIoFile::handleResponse(IpcIoMsg &ipcIo)
9a51593d
DK
464{
465 const int requestId = ipcIo.requestId;
fa61cefe 466
9a51593d
DK
467 Must(requestId);
468 if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
ccfbe8f4
AR
469 CallBack(pending->codeContext, [&] {
470 debugs(47, 7, "popped disker response to " << SipcIo(KidIdentifier, ipcIo, diskId));
471 pending->completeIo(&ipcIo);
472 delete pending; // XXX: leaking if throwing
473 });
caca86d7 474 } else {
ccfbe8f4 475 debugs(47, 4, "LATE disker response to " << SipcIo(KidIdentifier, ipcIo, diskId));
caca86d7
AR
476 // nothing we can do about it; completeIo() has been called already
477 }
254912f3
AR
478}
479
254912f3 480void
9a51593d 481IpcIoFile::Notify(const int peerId)
254912f3 482{
fa61cefe 483 // TODO: Count and report the total number of notifications, pops, pushes.
a1c98830 484 debugs(47, 7, HERE << "kid" << peerId);
9a51593d
DK
485 Ipc::TypedMsgHdr msg;
486 msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
487 msg.putInt(KidIdentifier);
1ee292b7 488 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, peerId);
9a51593d
DK
489 Ipc::SendMessage(addr, msg);
490}
491
492void
493IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
494{
fa61cefe
AR
495 const int from = msg.getInt();
496 debugs(47, 7, HERE << "from " << from);
f5591061
DK
497 queue->clearReaderSignal(from);
498 if (IamDiskProcess())
499 DiskerHandleRequests();
500 else
501 HandleResponses("after notification");
9a51593d
DK
502}
503
b2aa0934
DK
504/// handles open request timeout
505void
506IpcIoFile::OpenTimeout(void *const param)
507{
508 Must(param);
509 // the pointer is used for comparison only and not dereferenced
510 const IpcIoFile *const ipcIoFile =
511 reinterpret_cast<const IpcIoFile *>(param);
512 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
9199139f 513 i != WaitingForOpen.end(); ++i) {
b2aa0934
DK
514 if (*i == ipcIoFile) {
515 (*i)->openCompleted(NULL);
516 WaitingForOpen.erase(i);
517 break;
518 }
519 }
520}
521
9a51593d
DK
522/// IpcIoFile::checkTimeouts wrapper
523void
524IpcIoFile::CheckTimeouts(void *const param)
525{
9a51593d 526 Must(param);
b2aa0934
DK
527 const int diskId = reinterpret_cast<uintptr_t>(param);
528 debugs(47, 7, HERE << "diskId=" << diskId);
529 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
530 if (i != IpcIoFiles.end())
531 i->second->checkTimeouts();
9a51593d
DK
532}
533
534void
535IpcIoFile::checkTimeouts()
536{
9a51593d 537 timeoutCheckScheduled = false;
254912f3 538
0ef0509e
AR
539 // last chance to recover in case a notification message was lost, etc.
540 const RequestMap::size_type timeoutsBefore = olderRequests->size();
541 HandleResponses("before timeout");
542 const RequestMap::size_type timeoutsNow = olderRequests->size();
543
544 if (timeoutsBefore > timeoutsNow) { // some requests were rescued
545 // notification message lost or significantly delayed?
82ed74dc
AR
546 debugs(47, DBG_IMPORTANT, "WARNING: communication with " << dbName <<
547 " may be too slow or disrupted for about " <<
0ef0509e
AR
548 Timeout << "s; rescued " << (timeoutsBefore - timeoutsNow) <<
549 " out of " << timeoutsBefore << " I/Os");
550 }
551
552 if (timeoutsNow) {
553 debugs(47, DBG_IMPORTANT, "WARNING: abandoning " <<
82ed74dc 554 timeoutsNow << ' ' << dbName << " I/Os after at least " <<
0ef0509e
AR
555 Timeout << "s timeout");
556 }
557
caca86d7
AR
558 // any old request would have timed out by now
559 typedef RequestMap::const_iterator RMCI;
9a51593d
DK
560 for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
561 IpcIoPendingRequest *const pending = i->second;
ccfbe8f4
AR
562 CallBack(pending->codeContext, [&] {
563 const auto requestId = i->first;
564 debugs(47, 7, "disker timeout; ipcIo" << KidIdentifier << '.' << requestId);
565 pending->completeIo(nullptr); // no response
566 delete pending; // XXX: leaking if throwing
567 });
9a51593d
DK
568 }
569 olderRequests->clear();
caca86d7 570
9a51593d 571 swap(olderRequests, newerRequests); // switches pointers around
0ef0509e 572 if (!olderRequests->empty() && !timeoutCheckScheduled)
9a51593d 573 scheduleTimeoutCheck();
254912f3
AR
574}
575
caca86d7 576/// prepare to check for timeouts in a little while
254912f3 577void
9a51593d 578IpcIoFile::scheduleTimeoutCheck()
254912f3 579{
ccfbe8f4
AR
580 // We may be running in an I/O requestor CodeContext, but are scheduling
581 // one-for-all CheckTimeouts() that is not specific to any request.
582 CallService(nullptr, [&] {
583 // we check all older requests at once so some may be wait for 2*Timeout
584 eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
585 reinterpret_cast<void *>(diskId), Timeout, 0, false);
586 timeoutCheckScheduled = true;
587 });
254912f3
AR
588}
589
590/// returns and forgets the right IpcIoFile pending request
591IpcIoPendingRequest *
9a51593d 592IpcIoFile::dequeueRequest(const unsigned int requestId)
254912f3 593{
254912f3 594 Must(requestId != 0);
caca86d7 595
9a51593d
DK
596 RequestMap *map = NULL;
597 RequestMap::iterator i = requestMap1.find(requestId);
caca86d7 598
9a51593d
DK
599 if (i != requestMap1.end())
600 map = &requestMap1;
caca86d7 601 else {
9a51593d
DK
602 i = requestMap2.find(requestId);
603 if (i != requestMap2.end())
604 map = &requestMap2;
caca86d7
AR
605 }
606
607 if (!map) // not found in both maps
608 return NULL;
609
610 IpcIoPendingRequest *pending = i->second;
611 map->erase(i);
612 return pending;
254912f3
AR
613}
614
615int
9a51593d 616IpcIoFile::getFD() const
254912f3
AR
617{
618 assert(false); // not supported; TODO: remove this method from API
619 return -1;
620}
621
9a51593d 622/* IpcIoMsg */
254912f3 623
9a51593d 624IpcIoMsg::IpcIoMsg():
f53969cc
SM
625 requestId(0),
626 offset(0),
627 len(0),
628 command(IpcIo::cmdNone),
629 xerrno(0)
254912f3 630{
0a11e039 631 start.tv_sec = 0;
e19994df 632 start.tv_usec = 0;
254912f3
AR
633}
634
caca86d7 635/* IpcIoPendingRequest */
254912f3
AR
636
637IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
ccfbe8f4
AR
638 file(aFile),
639 readRequest(nullptr),
640 writeRequest(nullptr),
641 codeContext(CodeContext::Current())
254912f3
AR
642{
643}
644
caca86d7 645void
8ed94021 646IpcIoPendingRequest::completeIo(IpcIoMsg *const response)
caca86d7 647{
caca86d7
AR
648 if (readRequest)
649 file->readCompleted(readRequest, response);
9199139f 650 else if (writeRequest)
caca86d7 651 file->writeCompleted(writeRequest, response);
9a51593d
DK
652 else {
653 Must(!response); // only timeouts are handled here
654 file->openCompleted(NULL);
655 }
caca86d7
AR
656}
657
254912f3
AR
658/* XXX: disker code that should probably be moved elsewhere */
659
7015a149 660static SBuf DbName; ///< full db file name
254912f3
AR
661static int TheFile = -1; ///< db file descriptor
662
9a51593d
DK
663static void
664diskerRead(IpcIoMsg &ipcIo)
254912f3 665{
551f8a18 666 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
8ed94021 667 ipcIo.len = 0;
c792f7fc 668 debugs(47,2, HERE << "run out of shared memory pages for IPC I/O");
8ed94021
DK
669 return;
670 }
671
672 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
673 const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
e4f1fdae 674 ++statCounter.syscalls.disk.reads;
2d338731
AR
675 fd_bytes(TheFile, read, FD_READ);
676
254912f3 677 if (read >= 0) {
9a51593d
DK
678 ipcIo.xerrno = 0;
679 const size_t len = static_cast<size_t>(read); // safe because read > 0
254912f3 680 debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
9199139f 681 (len == ipcIo.len ? "all " : "just ") << read);
9a51593d
DK
682 ipcIo.len = len;
683 } else {
684 ipcIo.xerrno = errno;
685 ipcIo.len = 0;
254912f3 686 debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
9199139f 687 ipcIo.xerrno);
9a51593d 688 }
254912f3
AR
689}
690
9d4e9cfb 691/// Tries to write buffer to disk (a few times if needed);
4e4f7fd9 692/// sets ipcIo results, but does no cleanup. The caller must cleanup.
9a51593d 693static void
4e4f7fd9
AR
694diskerWriteAttempts(IpcIoMsg &ipcIo)
695{
696 const char *buf = Ipc::Mem::PagePointer(ipcIo.page);
697 size_t toWrite = min(ipcIo.len, Ipc::Mem::PageSize());
698 size_t wroteSoFar = 0;
699 off_t offset = ipcIo.offset;
700 // Partial writes to disk do happen. It is unlikely that the caller can
701 // handle partial writes by doing something other than writing leftovers
702 // again, so we try to write them ourselves to minimize overheads.
703 const int attemptLimit = 10;
704 for (int attempts = 1; attempts <= attemptLimit; ++attempts) {
705 const ssize_t result = pwrite(TheFile, buf, toWrite, offset);
706 ++statCounter.syscalls.disk.writes;
707 fd_bytes(TheFile, result, FD_WRITE);
708
709 if (result < 0) {
710 ipcIo.xerrno = errno;
711 assert(ipcIo.xerrno);
82ed74dc
AR
712 debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " failure" <<
713 " writing " << toWrite << '/' << ipcIo.len <<
4e4f7fd9
AR
714 " at " << ipcIo.offset << '+' << wroteSoFar <<
715 " on " << attempts << " try: " << xstrerr(ipcIo.xerrno));
716 ipcIo.len = wroteSoFar;
717 return; // bail on error
718 }
2d338731 719
4e4f7fd9 720 const size_t wroteNow = static_cast<size_t>(result); // result >= 0
9a51593d 721 ipcIo.xerrno = 0;
4e4f7fd9
AR
722
723 debugs(47,3, "disker" << KidIdentifier << " wrote " <<
724 (wroteNow >= toWrite ? "all " : "just ") << wroteNow <<
725 " out of " << toWrite << '/' << ipcIo.len << " at " <<
726 ipcIo.offset << '+' << wroteSoFar << " on " << attempts <<
727 " try");
728
729 wroteSoFar += wroteNow;
730
731 if (wroteNow >= toWrite) {
732 ipcIo.xerrno = 0;
733 ipcIo.len = wroteSoFar;
734 return; // wrote everything there was to write
735 }
736
737 buf += wroteNow;
738 offset += wroteNow;
739 toWrite -= wroteNow;
9a51593d 740 }
8ed94021 741
82ed74dc
AR
742 debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " exhausted all " <<
743 attemptLimit << " attempts while writing " <<
4e4f7fd9
AR
744 toWrite << '/' << ipcIo.len << " at " << ipcIo.offset << '+' <<
745 wroteSoFar);
746 return; // not a fatal I/O error, unless the caller treats it as such
747}
748
749static void
750diskerWrite(IpcIoMsg &ipcIo)
751{
752 diskerWriteAttempts(ipcIo); // may fail
8ed94021 753 Ipc::Mem::PutPage(ipcIo.page);
254912f3
AR
754}
755
c792f7fc 756void
df881a0f 757IpcIoFile::DiskerHandleMoreRequests(void *source)
c792f7fc 758{
df881a0f
AR
759 debugs(47, 7, HERE << "resuming handling requests after " <<
760 static_cast<const char *>(source));
c792f7fc
AR
761 DiskerHandleMoreRequestsScheduled = false;
762 IpcIoFile::DiskerHandleRequests();
763}
764
df881a0f
AR
765bool
766IpcIoFile::WaitBeforePop()
767{
6c6a656a 768 const int ioRate = queue->localRateLimit().load();
df881a0f
AR
769 const double maxRate = ioRate/1e3; // req/ms
770
771 // do we need to enforce configured I/O rate?
772 if (maxRate <= 0)
773 return false;
774
775 // is there an I/O request we could potentially delay?
1e614370
DK
776 int processId;
777 IpcIoMsg ipcIo;
778 if (!queue->peek(processId, ipcIo)) {
779 // unlike pop(), peek() is not reliable and does not block reader
df881a0f
AR
780 // so we must proceed with pop() even if it is likely to fail
781 return false;
782 }
783
9a585a30 784 static timeval LastIo = current_time;
df881a0f
AR
785
786 const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
787 // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
788 const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
789
790 const double credit = ioDuration; // what the last I/O should have cost us
791 const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
792 LastIo = current_time;
793
794 Ipc::QueueReader::Balance &balance = queue->localBalance();
795 balance += static_cast<int64_t>(credit - debit);
796
797 debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
798
1e614370
DK
799 if (ipcIo.command == IpcIo::cmdWrite && balance > maxImbalance) {
800 // if the next request is (likely) write and we accumulated
801 // too much time for future slow I/Os, then shed accumulated
802 // time to keep just half of the excess
df881a0f 803 const int64_t toSpend = balance - maxImbalance/2;
69321ae9
AR
804
805 if (toSpend/1e3 > Timeout)
82ed74dc
AR
806 debugs(47, DBG_IMPORTANT, "WARNING: " << DbName << " delays " <<
807 "I/O requests for " << (toSpend/1e3) << " seconds " <<
808 "to obey " << ioRate << "/sec rate limit");
69321ae9 809
df881a0f
AR
810 debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" <<
811 (1e3*maxRate) << "/sec rate");
812 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
813 &IpcIoFile::DiskerHandleMoreRequests,
814 const_cast<char*>("rate limiting"),
815 toSpend/1e3, 0, false);
816 DiskerHandleMoreRequestsScheduled = true;
817 return true;
e29ccb57 818 } else if (balance < -maxImbalance) {
df881a0f
AR
819 // do not owe "too much" to avoid "too large" bursts of I/O
820 balance = -maxImbalance;
821 }
822
823 return false;
824}
825
254912f3 826void
f5591061 827IpcIoFile::DiskerHandleRequests()
254912f3 828{
c792f7fc 829 // Balance our desire to maximize the number of concurrent I/O requests
fdb3059b 830 // (reordred by OS to minimize seek time) with a requirement to
c792f7fc 831 // send 1st-I/O notification messages, process Coordinator events, etc.
fdb3059b
AR
832 const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms
833 const timeval loopStart = current_time;
834
c792f7fc 835 int popped = 0;
fa61cefe
AR
836 int workerId = 0;
837 IpcIoMsg ipcIo;
df881a0f 838 while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
fdb3059b
AR
839 ++popped;
840
841 // at least one I/O per call is guaranteed if the queue is not empty
fa61cefe
AR
842 DiskerHandleRequest(workerId, ipcIo);
843
fdb3059b
AR
844 getCurrentTime();
845 const double elapsedMsec = tvSubMsec(loopStart, current_time);
846 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
847 if (!DiskerHandleMoreRequestsScheduled) {
848 // the gap must be positive for select(2) to be given a chance
849 const double minBreakSecs = 0.001;
850 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
9199139f 851 &IpcIoFile::DiskerHandleMoreRequests,
df881a0f
AR
852 const_cast<char*>("long I/O loop"),
853 minBreakSecs, 0, false);
fdb3059b
AR
854 DiskerHandleMoreRequestsScheduled = true;
855 }
856 debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " <<
9199139f 857 elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O");
fdb3059b
AR
858 break;
859 }
c792f7fc
AR
860 }
861
fdb3059b 862 // TODO: consider using O_DIRECT with "elevator" optimization where we pop
fa61cefe
AR
863 // requests first, then reorder the popped requests to optimize seek time,
864 // then do I/O, then take a break, and come back for the next set of I/O
865 // requests.
9a51593d 866}
254912f3 867
9a51593d
DK
868/// called when disker receives an I/O request
869void
870IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
871{
9a51593d 872 if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
82ed74dc 873 debugs(0, DBG_CRITICAL, "ERROR: " << DbName <<
9a51593d
DK
874 " should not receive " << ipcIo.command <<
875 " ipcIo" << workerId << '.' << ipcIo.requestId);
876 return;
877 }
878
879 debugs(47,5, HERE << "disker" << KidIdentifier <<
880 (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
881 ipcIo.len << " at " << ipcIo.offset <<
882 " ipcIo" << workerId << '.' << ipcIo.requestId);
883
884 if (ipcIo.command == IpcIo::cmdRead)
885 diskerRead(ipcIo);
886 else // ipcIo.command == IpcIo::cmdWrite
887 diskerWrite(ipcIo);
888
f5591061 889 debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
fa61cefe 890
7a907247 891 try {
f5591061 892 if (queue->push(workerId, ipcIo))
5e44782e 893 Notify(workerId); // must notify worker
f5591061 894 } catch (const Queue::Full &) {
387e1d45
EB
895 // The worker pop queue should not overflow because the worker can
896 // push only if pendingRequests() is less than QueueCapacity.
82ed74dc
AR
897 debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue for " <<
898 DbName << " overflow: " <<
5e44782e 899 SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
fa61cefe
AR
900
901 // the I/O request we could not push will timeout
7a907247 902 }
254912f3
AR
903}
904
905static bool
ced8def3 906DiskerOpen(const SBuf &path, int flags, mode_t)
254912f3
AR
907{
908 assert(TheFile < 0);
909
82ed74dc 910 DbName = path;
7015a149 911 TheFile = file_open(DbName.c_str(), flags);
254912f3
AR
912
913 if (TheFile < 0) {
914 const int xerrno = errno;
82ed74dc 915 debugs(47, DBG_CRITICAL, "ERROR: cannot open " << DbName << ": " <<
254912f3
AR
916 xstrerr(xerrno));
917 return false;
9a51593d 918 }
254912f3 919
cb4185f1 920 ++store_open_disk_fd;
7015a149 921 debugs(79,3, "rock db opened " << DbName << ": FD " << TheFile);
9a51593d 922 return true;
254912f3
AR
923}
924
925static void
7015a149 926DiskerClose(const SBuf &path)
254912f3
AR
927{
928 if (TheFile >= 0) {
929 file_close(TheFile);
930 debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
931 TheFile = -1;
5e263176 932 --store_open_disk_fd;
a1c98830 933 }
7015a149 934 DbName.clear();
254912f3 935}
f5591061 936
ea2cdeb6 937/// reports our needs for shared memory pages to Ipc::Mem::Pages
21b7990f
AR
938/// and initializes shared memory segments used by IpcIoFile
939class IpcIoRr: public Ipc::Mem::RegisteredRunner
ea2cdeb6
DK
940{
941public:
942 /* RegisteredRunner API */
21b7990f
AR
943 IpcIoRr(): owner(NULL) {}
944 virtual ~IpcIoRr();
945 virtual void claimMemoryNeeds();
946
947protected:
948 /* Ipc::Mem::RegisteredRunner API */
949 virtual void create();
950
951private:
952 Ipc::FewToFewBiQueue::Owner *owner;
ea2cdeb6
DK
953};
954
21b7990f 955RunnerRegistrationEntry(IpcIoRr);
ea2cdeb6 956
ea2cdeb6 957void
21b7990f 958IpcIoRr::claimMemoryNeeds()
ea2cdeb6
DK
959{
960 const int itemsCount = Ipc::FewToFewBiQueue::MaxItemsCount(
c11211d9 961 ::Config.workers, ::Config.cacheSwap.n_strands, QueueCapacity);
ea2cdeb6
DK
962 // the maximum number of shared I/O pages is approximately the
963 // number of queue slots, we add a fudge factor to that to account
964 // for corner cases where I/O pages are created before queue
965 // limits are checked or destroyed long after the I/O is dequeued
29a9ff4e
DK
966 Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage,
967 static_cast<int>(itemsCount * 1.1));
ea2cdeb6
DK
968}
969
21b7990f
AR
970void
971IpcIoRr::create()
f5591061 972{
e9f68413 973 if (Config.cacheSwap.n_strands <= 0)
f5591061
DK
974 return;
975
4404f1c5 976 Must(!owner);
4404f1c5 977 owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1,
3b581957 978 Config.cacheSwap.n_strands,
4404f1c5 979 1 + Config.workers, sizeof(IpcIoMsg),
ea2cdeb6 980 QueueCapacity);
f5591061
DK
981}
982
983IpcIoRr::~IpcIoRr()
984{
985 delete owner;
986}
f53969cc 987