]> git.ipfire.org Git - thirdparty/squid.git/blame - src/DiskIO/IpcIo/IpcIoFile.cc
Do not send keep-alive in 101 (Switching Protocols) responses (#709)
[thirdparty/squid.git] / src / DiskIO / IpcIo / IpcIoFile.cc
CommitLineData
254912f3 1/*
77b1029d 2 * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
bbc27441
AJ
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
254912f3
AR
7 */
8
bbc27441
AJ
9/* DEBUG: section 47 Store Directory Routines */
10
f7f3304a 11#include "squid.h"
ccfbe8f4 12#include "base/CodeContext.h"
f5591061 13#include "base/RunnersRegistry.h"
254912f3
AR
14#include "base/TextException.h"
15#include "DiskIO/IORequestor.h"
16#include "DiskIO/IpcIo/IpcIoFile.h"
17#include "DiskIO/ReadRequest.h"
18#include "DiskIO/WriteRequest.h"
c4ad1349 19#include "fd.h"
b3f7fd88 20#include "fs_io.h"
582c2af2 21#include "globals.h"
21d845b1 22#include "ipc/mem/Pages.h"
254912f3
AR
23#include "ipc/Messages.h"
24#include "ipc/Port.h"
f5591061 25#include "ipc/Queue.h"
9a51593d 26#include "ipc/StrandSearch.h"
254912f3 27#include "ipc/UdsOp.h"
65e41a45 28#include "sbuf/SBuf.h"
4d5904f7 29#include "SquidConfig.h"
21d845b1
FC
30#include "SquidTime.h"
31#include "StatCounters.h"
5bed43d6 32#include "tools.h"
254912f3 33
1a30fdf5
AJ
34#include <cerrno>
35
254912f3
AR
36CBDATA_CLASS_INIT(IpcIoFile);
37
f5591061
DK
38/// shared memory segment path to use for IpcIoFile maps
39static const char *const ShmLabel = "io_file";
ea2cdeb6
DK
40/// a single worker-to-disker or disker-to-worker queue capacity; up
41/// to 2*QueueCapacity I/O requests queued between a single worker and
42/// a single disker
43// TODO: make configurable or compute from squid.conf settings if possible
44static const int QueueCapacity = 1024;
f5591061 45
fa61cefe 46const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
b2aa0934
DK
47IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
48IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
829030b5 49std::unique_ptr<IpcIoFile::Queue> IpcIoFile::queue;
254912f3 50
c792f7fc
AR
51bool IpcIoFile::DiskerHandleMoreRequestsScheduled = false;
52
7015a149
AR
53static bool DiskerOpen(const SBuf &path, int flags, mode_t mode);
54static void DiskerClose(const SBuf &path);
254912f3 55
fa61cefe
AR
56/// IpcIo wrapper for debugs() streams; XXX: find a better class name
57struct SipcIo {
58 SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
f53969cc 59 worker(aWorker), msg(aMsg), disker(aDisker) {}
fa61cefe
AR
60
61 int worker;
62 const IpcIoMsg &msg;
63 int disker;
64};
65
66std::ostream &
67operator <<(std::ostream &os, const SipcIo &sio)
68{
69 return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
d8ee9e8d 70 sio.msg.command << sio.disker;
fa61cefe
AR
71}
72
d8ee9e8d
EB
73/* IpcIo::Command */
74
75std::ostream &
76operator <<(std::ostream &os, const IpcIo::Command command)
77{
78 switch (command) {
79 case IpcIo::cmdNone:
80 return os << '-';
81 case IpcIo::cmdOpen:
82 return os << 'o';
83 case IpcIo::cmdRead:
84 return os << 'r';
85 case IpcIo::cmdWrite:
86 return os << 'w';
87 }
88 // unreachable code
89 return os << static_cast<int>(command);
90}
91
92/* IpcIoFile */
93
254912f3 94IpcIoFile::IpcIoFile(char const *aDb):
f53969cc
SM
95 dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
96 olderRequests(&requestMap1), newerRequests(&requestMap2),
97 timeoutCheckScheduled(false)
254912f3
AR
98{
99}
100
101IpcIoFile::~IpcIoFile()
102{
ebaabe74
AR
103 SWALLOW_EXCEPTIONS({
104 if (diskId >= 0) {
105 const auto i = IpcIoFiles.find(diskId);
106 Must(i != IpcIoFiles.end());
107 Must(i->second == this);
108 IpcIoFiles.erase(i);
109 }
110 });
254912f3
AR
111}
112
43ebbac3
AR
113void
114IpcIoFile::configure(const Config &cfg)
115{
116 DiskFile::configure(cfg);
117 config = cfg;
118}
119
254912f3
AR
120void
121IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
122{
123 ioRequestor = callback;
124 Must(diskId < 0); // we do not know our disker yet
f5591061
DK
125
126 if (!queue.get())
127 queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier));
254912f3
AR
128
129 if (IamDiskProcess()) {
7015a149 130 error_ = !DiskerOpen(SBuf(dbName.termedBuf()), flags, mode);
f7091279
DK
131 if (error_)
132 return;
133
b2aa0934
DK
134 diskId = KidIdentifier;
135 const bool inserted =
136 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
137 Must(inserted);
9a51593d 138
6c6a656a 139 queue->localRateLimit().store(config.ioRate);
df881a0f 140
f7091279
DK
141 Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
142 ann.strand.tag = dbName;
143 Ipc::TypedMsgHdr message;
144 ann.pack(message);
1ee292b7 145 SendMessage(Ipc::Port::CoordinatorAddr(), message);
f7091279 146
e528e570 147 ioRequestor->ioCompletedNotification();
254912f3 148 return;
9a51593d 149 }
254912f3 150
9a51593d
DK
151 Ipc::StrandSearchRequest request;
152 request.requestorId = KidIdentifier;
153 request.tag = dbName;
254912f3 154
9a51593d
DK
155 Ipc::TypedMsgHdr msg;
156 request.pack(msg);
1ee292b7 157 Ipc::SendMessage(Ipc::Port::CoordinatorAddr(), msg);
9a51593d 158
b2aa0934
DK
159 WaitingForOpen.push_back(this);
160
161 eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
162 this, Timeout, 0, false); // "this" pointer is used as id
254912f3
AR
163}
164
165void
9199139f
AR
166IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response)
167{
9a51593d 168 Must(diskId < 0); // we do not know our disker yet
9a51593d
DK
169
170 if (!response) {
82ed74dc
AR
171 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " communication " <<
172 "channel establishment timeout");
caca86d7 173 error_ = true;
9a51593d
DK
174 } else {
175 diskId = response->strand.kidId;
176 if (diskId >= 0) {
b2aa0934
DK
177 const bool inserted =
178 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
179 Must(inserted);
9a51593d 180 } else {
254912f3 181 error_ = true;
82ed74dc
AR
182 debugs(79, DBG_IMPORTANT, "ERROR: no disker claimed " <<
183 "responsibility for " << dbName);
9a51593d
DK
184 }
185 }
254912f3
AR
186
187 ioRequestor->ioCompletedNotification();
188}
189
190/**
191 * Alias for IpcIoFile::open(...)
192 \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
193 */
194void
195IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
196{
197 assert(false); // check
198 /* We use the same logic path for open */
199 open(flags, mode, callback);
200}
201
202void
203IpcIoFile::close()
204{
205 assert(ioRequestor != NULL);
206
207 if (IamDiskProcess())
7015a149 208 DiskerClose(SBuf(dbName.termedBuf()));
9a51593d 209 // XXX: else nothing to do?
254912f3
AR
210
211 ioRequestor->closeCompleted();
212}
213
214bool
215IpcIoFile::canRead() const
216{
7e2d5ef6 217 return diskId >= 0 && !error_ && canWait();
254912f3
AR
218}
219
220bool
221IpcIoFile::canWrite() const
222{
7e2d5ef6 223 return diskId >= 0 && !error_ && canWait();
254912f3
AR
224}
225
226bool
227IpcIoFile::error() const
228{
229 return error_;
230}
231
232void
233IpcIoFile::read(ReadRequest *readRequest)
234{
235 debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " <<
9199139f 236 readRequest->offset << ")");
254912f3
AR
237
238 assert(ioRequestor != NULL);
254912f3
AR
239 assert(readRequest->offset >= 0);
240 Must(!error_);
241
242 //assert(minOffset < 0 || minOffset <= readRequest->offset);
243 //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
244
9a51593d 245 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
254912f3 246 pending->readRequest = readRequest;
7a907247 247 push(pending);
254912f3
AR
248}
249
250void
251IpcIoFile::readCompleted(ReadRequest *readRequest,
8ed94021 252 IpcIoMsg *const response)
254912f3 253{
caca86d7 254 bool ioError = false;
9a51593d 255 if (!response) {
0ef0509e 256 debugs(79, 3, HERE << "error: timeout");
caca86d7 257 ioError = true; // I/O timeout does not warrant setting error_?
9a51593d 258 } else {
70e3f706 259 if (response->xerrno) {
82ed74dc
AR
260 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read: " <<
261 xstrerr(response->xerrno));
70e3f706 262 ioError = error_ = true;
9199139f 263 } else if (!response->page) {
82ed74dc
AR
264 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read ran " <<
265 "out of shared memory pages");
70e3f706
DK
266 ioError = true;
267 } else {
268 const char *const buf = Ipc::Mem::PagePointer(response->page);
269 memcpy(readRequest->buf, buf, response->len);
270 }
254912f3 271
70e3f706
DK
272 Ipc::Mem::PutPage(response->page);
273 }
8ed94021 274
caca86d7 275 const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
9199139f 276 const int errflag = ioError ? DISK_ERROR :DISK_OK;
20b0e1fe 277 ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
254912f3
AR
278}
279
280void
281IpcIoFile::write(WriteRequest *writeRequest)
282{
283 debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " <<
9199139f 284 writeRequest->offset << ")");
254912f3
AR
285
286 assert(ioRequestor != NULL);
254912f3
AR
287 assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
288 assert(writeRequest->offset >= 0);
289 Must(!error_);
290
291 //assert(minOffset < 0 || minOffset <= writeRequest->offset);
292 //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
293
9a51593d 294 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
254912f3 295 pending->writeRequest = writeRequest;
7a907247 296 push(pending);
254912f3
AR
297}
298
299void
300IpcIoFile::writeCompleted(WriteRequest *writeRequest,
9a51593d 301 const IpcIoMsg *const response)
254912f3 302{
caca86d7 303 bool ioError = false;
9a51593d 304 if (!response) {
4e4f7fd9 305 debugs(79, 3, "disker " << diskId << " timeout");
caca86d7 306 ioError = true; // I/O timeout does not warrant setting error_?
9199139f 307 } else if (response->xerrno) {
4e4f7fd9
AR
308 debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId <<
309 " error writing " << writeRequest->len << " bytes at " <<
310 writeRequest->offset << ": " << xstrerr(response->xerrno) <<
311 "; this worker will stop using " << dbName);
9a51593d 312 ioError = error_ = true;
9199139f 313 } else if (response->len != writeRequest->len) {
4e4f7fd9
AR
314 debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId << " wrote " <<
315 response->len << " instead of " << writeRequest->len <<
316 " bytes (offset " << writeRequest->offset << "); " <<
317 "this worker will stop using " << dbName);
254912f3
AR
318 error_ = true;
319 }
320
321 if (writeRequest->free_func)
322 (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
323
caca86d7 324 if (!ioError) {
254912f3 325 debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
9199139f
AR
326 diskId << " at " << writeRequest->offset);
327 }
254912f3 328
caca86d7 329 const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
9199139f 330 const int errflag = ioError ? DISK_ERROR :DISK_OK;
254912f3
AR
331 ioRequestor->writeCompleted(errflag, rlen, writeRequest);
332}
333
334bool
335IpcIoFile::ioInProgress() const
336{
9a51593d 337 return !olderRequests->empty() || !newerRequests->empty();
254912f3
AR
338}
339
9a51593d 340/// track a new pending request
254912f3 341void
28bd45ba 342IpcIoFile::trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
254912f3 343{
28bd45ba
AR
344 const std::pair<RequestMap::iterator,bool> result =
345 newerRequests->insert(std::make_pair(id, pending));
346 Must(result.second); // failures means that id was not unique
9a51593d
DK
347 if (!timeoutCheckScheduled)
348 scheduleTimeoutCheck();
349}
254912f3 350
9a51593d
DK
351/// push an I/O request to disker
352void
7a907247 353IpcIoFile::push(IpcIoPendingRequest *const pending)
9a51593d 354{
fa61cefe 355 // prevent queue overflows: check for responses to earlier requests
28bd45ba 356 // warning: this call may result in indirect push() recursion
ccfbe8f4
AR
357 CallService(nullptr, [] {
358 HandleResponses("before push");
359 });
fa61cefe 360
a1c98830 361 debugs(47, 7, HERE);
9a51593d 362 Must(diskId >= 0);
7a907247
DK
363 Must(pending);
364 Must(pending->readRequest || pending->writeRequest);
9a51593d
DK
365
366 IpcIoMsg ipcIo;
8ed94021 367 try {
28bd45ba
AR
368 if (++lastRequestId == 0) // don't use zero value as requestId
369 ++lastRequestId;
8ed94021 370 ipcIo.requestId = lastRequestId;
0a11e039 371 ipcIo.start = current_time;
8ed94021
DK
372 if (pending->readRequest) {
373 ipcIo.command = IpcIo::cmdRead;
374 ipcIo.offset = pending->readRequest->offset;
375 ipcIo.len = pending->readRequest->len;
376 } else { // pending->writeRequest
377 Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
551f8a18 378 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
8ed94021 379 ipcIo.len = 0;
551f8a18 380 throw TexcHere("run out of shared memory pages for IPC I/O");
8ed94021
DK
381 }
382 ipcIo.command = IpcIo::cmdWrite;
383 ipcIo.offset = pending->writeRequest->offset;
384 ipcIo.len = pending->writeRequest->len;
385 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
386 memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
387 }
254912f3 388
f5591061 389 debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
fa61cefe 390
387e1d45
EB
391 // protect DiskerHandleRequest() from pop queue overflow
392 if (pendingRequests() >= QueueCapacity)
393 throw Ipc::OneToOneUniQueue::Full();
394
f5591061 395 if (queue->push(diskId, ipcIo))
fa61cefe 396 Notify(diskId); // must notify disker
28bd45ba 397 trackPendingRequest(ipcIo.requestId, pending);
f5591061 398 } catch (const Queue::Full &) {
82ed74dc
AR
399 debugs(47, DBG_IMPORTANT, "ERROR: worker I/O push queue for " <<
400 dbName << " overflow: " <<
fa61cefe
AR
401 SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
402 // TODO: grow queue size
d0c5cf9e
CG
403 if (ipcIo.page)
404 Ipc::Mem::PutPage(ipcIo.page);
9199139f 405
0ef0509e 406 pending->completeIo(NULL);
8ed94021
DK
407 delete pending;
408 } catch (const TextException &e) {
82ed74dc 409 debugs(47, DBG_IMPORTANT, "ERROR: " << dbName << " exception: " << e.what());
0ef0509e 410 pending->completeIo(NULL);
7a907247 411 delete pending;
9a51593d 412 }
254912f3
AR
413}
414
0a11e039
AR
415/// whether we think there is enough time to complete the I/O
416bool
9199139f
AR
417IpcIoFile::canWait() const
418{
43ebbac3 419 if (!config.ioTimeout)
0a11e039
AR
420 return true; // no timeout specified
421
422 IpcIoMsg oldestIo;
5aac671b 423 if (!queue->findOldest(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
0a11e039
AR
424 return true; // we cannot estimate expected wait time; assume it is OK
425
55939a01
AR
426 const int oldestWait = tvSubMsec(oldestIo.start, current_time);
427
428 int rateWait = -1; // time in millisecons
6c6a656a 429 const int ioRate = queue->rateLimit(diskId).load();
55939a01
AR
430 if (ioRate > 0) {
431 // if there are N requests pending, the new one will wait at
432 // least N/max-swap-rate seconds
75017bc9 433 rateWait = static_cast<int>(1e3 * queue->outSize(diskId) / ioRate);
55939a01
AR
434 // adjust N/max-swap-rate value based on the queue "balance"
435 // member, in case we have been borrowing time against future
436 // I/O already
437 rateWait += queue->balance(diskId);
438 }
439
440 const int expectedWait = max(oldestWait, rateWait);
0a11e039 441 if (expectedWait < 0 ||
43ebbac3 442 static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
2f8abb64 443 return true; // expected wait time is acceptable
0a11e039 444
c792f7fc
AR
445 debugs(47,2, HERE << "cannot wait: " << expectedWait <<
446 " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
0a11e039
AR
447 return false; // do not want to wait that long
448}
449
9a51593d 450/// called when coordinator responds to worker open request
254912f3 451void
9a51593d 452IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
254912f3 453{
9a51593d 454 debugs(47, 7, HERE << "coordinator response to open request");
b2aa0934 455 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
9199139f 456 i != WaitingForOpen.end(); ++i) {
b2aa0934
DK
457 if (response.strand.tag == (*i)->dbName) {
458 (*i)->openCompleted(&response);
459 WaitingForOpen.erase(i);
460 return;
461 }
462 }
463
464 debugs(47, 4, HERE << "LATE disker response to open for " <<
465 response.strand.tag);
466 // nothing we can do about it; completeIo() has been called already
9a51593d 467}
254912f3 468
9a51593d 469void
f5591061 470IpcIoFile::HandleResponses(const char *const when)
fa61cefe
AR
471{
472 debugs(47, 4, HERE << "popping all " << when);
fa61cefe
AR
473 IpcIoMsg ipcIo;
474 // get all responses we can: since we are not pushing, this will stop
f5591061
DK
475 int diskId;
476 while (queue->pop(diskId, ipcIo)) {
477 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
478 Must(i != IpcIoFiles.end()); // TODO: warn but continue
479 i->second->handleResponse(ipcIo);
480 }
9a51593d 481}
254912f3 482
9a51593d 483void
8ed94021 484IpcIoFile::handleResponse(IpcIoMsg &ipcIo)
9a51593d
DK
485{
486 const int requestId = ipcIo.requestId;
fa61cefe 487
9a51593d
DK
488 Must(requestId);
489 if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
ccfbe8f4
AR
490 CallBack(pending->codeContext, [&] {
491 debugs(47, 7, "popped disker response to " << SipcIo(KidIdentifier, ipcIo, diskId));
492 pending->completeIo(&ipcIo);
493 delete pending; // XXX: leaking if throwing
494 });
caca86d7 495 } else {
ccfbe8f4 496 debugs(47, 4, "LATE disker response to " << SipcIo(KidIdentifier, ipcIo, diskId));
caca86d7
AR
497 // nothing we can do about it; completeIo() has been called already
498 }
254912f3
AR
499}
500
254912f3 501void
9a51593d 502IpcIoFile::Notify(const int peerId)
254912f3 503{
fa61cefe 504 // TODO: Count and report the total number of notifications, pops, pushes.
a1c98830 505 debugs(47, 7, HERE << "kid" << peerId);
9a51593d
DK
506 Ipc::TypedMsgHdr msg;
507 msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
508 msg.putInt(KidIdentifier);
1ee292b7 509 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, peerId);
9a51593d
DK
510 Ipc::SendMessage(addr, msg);
511}
512
513void
514IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
515{
fa61cefe
AR
516 const int from = msg.getInt();
517 debugs(47, 7, HERE << "from " << from);
f5591061
DK
518 queue->clearReaderSignal(from);
519 if (IamDiskProcess())
520 DiskerHandleRequests();
521 else
522 HandleResponses("after notification");
9a51593d
DK
523}
524
d8ee9e8d
EB
525void
526IpcIoFile::StatQueue(std::ostream &os)
527{
528 if (queue.get()) {
529 os << "SMP disk I/O queues:\n";
530 queue->stat<IpcIoMsg>(os);
531 }
532}
533
b2aa0934
DK
534/// handles open request timeout
535void
536IpcIoFile::OpenTimeout(void *const param)
537{
538 Must(param);
539 // the pointer is used for comparison only and not dereferenced
540 const IpcIoFile *const ipcIoFile =
541 reinterpret_cast<const IpcIoFile *>(param);
542 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
9199139f 543 i != WaitingForOpen.end(); ++i) {
b2aa0934
DK
544 if (*i == ipcIoFile) {
545 (*i)->openCompleted(NULL);
546 WaitingForOpen.erase(i);
547 break;
548 }
549 }
550}
551
9a51593d
DK
552/// IpcIoFile::checkTimeouts wrapper
553void
554IpcIoFile::CheckTimeouts(void *const param)
555{
9a51593d 556 Must(param);
b2aa0934
DK
557 const int diskId = reinterpret_cast<uintptr_t>(param);
558 debugs(47, 7, HERE << "diskId=" << diskId);
559 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
560 if (i != IpcIoFiles.end())
561 i->second->checkTimeouts();
9a51593d
DK
562}
563
564void
565IpcIoFile::checkTimeouts()
566{
9a51593d 567 timeoutCheckScheduled = false;
254912f3 568
0ef0509e
AR
569 // last chance to recover in case a notification message was lost, etc.
570 const RequestMap::size_type timeoutsBefore = olderRequests->size();
571 HandleResponses("before timeout");
572 const RequestMap::size_type timeoutsNow = olderRequests->size();
573
574 if (timeoutsBefore > timeoutsNow) { // some requests were rescued
575 // notification message lost or significantly delayed?
82ed74dc
AR
576 debugs(47, DBG_IMPORTANT, "WARNING: communication with " << dbName <<
577 " may be too slow or disrupted for about " <<
0ef0509e
AR
578 Timeout << "s; rescued " << (timeoutsBefore - timeoutsNow) <<
579 " out of " << timeoutsBefore << " I/Os");
580 }
581
582 if (timeoutsNow) {
583 debugs(47, DBG_IMPORTANT, "WARNING: abandoning " <<
82ed74dc 584 timeoutsNow << ' ' << dbName << " I/Os after at least " <<
0ef0509e
AR
585 Timeout << "s timeout");
586 }
587
caca86d7
AR
588 // any old request would have timed out by now
589 typedef RequestMap::const_iterator RMCI;
9a51593d
DK
590 for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
591 IpcIoPendingRequest *const pending = i->second;
ccfbe8f4
AR
592 CallBack(pending->codeContext, [&] {
593 const auto requestId = i->first;
594 debugs(47, 7, "disker timeout; ipcIo" << KidIdentifier << '.' << requestId);
595 pending->completeIo(nullptr); // no response
596 delete pending; // XXX: leaking if throwing
597 });
9a51593d
DK
598 }
599 olderRequests->clear();
caca86d7 600
9a51593d 601 swap(olderRequests, newerRequests); // switches pointers around
0ef0509e 602 if (!olderRequests->empty() && !timeoutCheckScheduled)
9a51593d 603 scheduleTimeoutCheck();
254912f3
AR
604}
605
caca86d7 606/// prepare to check for timeouts in a little while
254912f3 607void
9a51593d 608IpcIoFile::scheduleTimeoutCheck()
254912f3 609{
ccfbe8f4
AR
610 // We may be running in an I/O requestor CodeContext, but are scheduling
611 // one-for-all CheckTimeouts() that is not specific to any request.
612 CallService(nullptr, [&] {
613 // we check all older requests at once so some may be wait for 2*Timeout
614 eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
615 reinterpret_cast<void *>(diskId), Timeout, 0, false);
616 timeoutCheckScheduled = true;
617 });
254912f3
AR
618}
619
620/// returns and forgets the right IpcIoFile pending request
621IpcIoPendingRequest *
9a51593d 622IpcIoFile::dequeueRequest(const unsigned int requestId)
254912f3 623{
254912f3 624 Must(requestId != 0);
caca86d7 625
9a51593d
DK
626 RequestMap *map = NULL;
627 RequestMap::iterator i = requestMap1.find(requestId);
caca86d7 628
9a51593d
DK
629 if (i != requestMap1.end())
630 map = &requestMap1;
caca86d7 631 else {
9a51593d
DK
632 i = requestMap2.find(requestId);
633 if (i != requestMap2.end())
634 map = &requestMap2;
caca86d7
AR
635 }
636
637 if (!map) // not found in both maps
638 return NULL;
639
640 IpcIoPendingRequest *pending = i->second;
641 map->erase(i);
642 return pending;
254912f3
AR
643}
644
645int
9a51593d 646IpcIoFile::getFD() const
254912f3
AR
647{
648 assert(false); // not supported; TODO: remove this method from API
649 return -1;
650}
651
9a51593d 652/* IpcIoMsg */
254912f3 653
9a51593d 654IpcIoMsg::IpcIoMsg():
f53969cc
SM
655 requestId(0),
656 offset(0),
657 len(0),
658 command(IpcIo::cmdNone),
659 xerrno(0)
254912f3 660{
0a11e039 661 start.tv_sec = 0;
e19994df 662 start.tv_usec = 0;
254912f3
AR
663}
664
d8ee9e8d
EB
665void
666IpcIoMsg::stat(std::ostream &os)
667{
668 timeval elapsedTime;
669 tvSub(elapsedTime, start, current_time);
670 os << "id: " << requestId <<
671 ", offset: " << offset <<
672 ", size: " << len <<
673 ", page: " << page <<
674 ", command: " << command <<
675 ", start: " << start <<
676 ", elapsed: " << elapsedTime <<
677 ", errno: " << xerrno;
678}
679
caca86d7 680/* IpcIoPendingRequest */
254912f3
AR
681
682IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
ccfbe8f4
AR
683 file(aFile),
684 readRequest(nullptr),
685 writeRequest(nullptr),
686 codeContext(CodeContext::Current())
254912f3
AR
687{
688}
689
caca86d7 690void
8ed94021 691IpcIoPendingRequest::completeIo(IpcIoMsg *const response)
caca86d7 692{
caca86d7
AR
693 if (readRequest)
694 file->readCompleted(readRequest, response);
9199139f 695 else if (writeRequest)
caca86d7 696 file->writeCompleted(writeRequest, response);
9a51593d
DK
697 else {
698 Must(!response); // only timeouts are handled here
699 file->openCompleted(NULL);
700 }
caca86d7
AR
701}
702
254912f3
AR
703/* XXX: disker code that should probably be moved elsewhere */
704
7015a149 705static SBuf DbName; ///< full db file name
254912f3
AR
706static int TheFile = -1; ///< db file descriptor
707
9a51593d
DK
708static void
709diskerRead(IpcIoMsg &ipcIo)
254912f3 710{
551f8a18 711 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
8ed94021 712 ipcIo.len = 0;
c792f7fc 713 debugs(47,2, HERE << "run out of shared memory pages for IPC I/O");
8ed94021
DK
714 return;
715 }
716
717 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
718 const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
e4f1fdae 719 ++statCounter.syscalls.disk.reads;
2d338731
AR
720 fd_bytes(TheFile, read, FD_READ);
721
254912f3 722 if (read >= 0) {
9a51593d
DK
723 ipcIo.xerrno = 0;
724 const size_t len = static_cast<size_t>(read); // safe because read > 0
254912f3 725 debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
9199139f 726 (len == ipcIo.len ? "all " : "just ") << read);
9a51593d
DK
727 ipcIo.len = len;
728 } else {
729 ipcIo.xerrno = errno;
730 ipcIo.len = 0;
254912f3 731 debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
9199139f 732 ipcIo.xerrno);
9a51593d 733 }
254912f3
AR
734}
735
9d4e9cfb 736/// Tries to write buffer to disk (a few times if needed);
4e4f7fd9 737/// sets ipcIo results, but does no cleanup. The caller must cleanup.
9a51593d 738static void
4e4f7fd9
AR
739diskerWriteAttempts(IpcIoMsg &ipcIo)
740{
741 const char *buf = Ipc::Mem::PagePointer(ipcIo.page);
742 size_t toWrite = min(ipcIo.len, Ipc::Mem::PageSize());
743 size_t wroteSoFar = 0;
744 off_t offset = ipcIo.offset;
745 // Partial writes to disk do happen. It is unlikely that the caller can
746 // handle partial writes by doing something other than writing leftovers
747 // again, so we try to write them ourselves to minimize overheads.
748 const int attemptLimit = 10;
749 for (int attempts = 1; attempts <= attemptLimit; ++attempts) {
750 const ssize_t result = pwrite(TheFile, buf, toWrite, offset);
751 ++statCounter.syscalls.disk.writes;
752 fd_bytes(TheFile, result, FD_WRITE);
753
754 if (result < 0) {
755 ipcIo.xerrno = errno;
756 assert(ipcIo.xerrno);
82ed74dc
AR
757 debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " failure" <<
758 " writing " << toWrite << '/' << ipcIo.len <<
4e4f7fd9
AR
759 " at " << ipcIo.offset << '+' << wroteSoFar <<
760 " on " << attempts << " try: " << xstrerr(ipcIo.xerrno));
761 ipcIo.len = wroteSoFar;
762 return; // bail on error
763 }
2d338731 764
4e4f7fd9 765 const size_t wroteNow = static_cast<size_t>(result); // result >= 0
9a51593d 766 ipcIo.xerrno = 0;
4e4f7fd9
AR
767
768 debugs(47,3, "disker" << KidIdentifier << " wrote " <<
769 (wroteNow >= toWrite ? "all " : "just ") << wroteNow <<
770 " out of " << toWrite << '/' << ipcIo.len << " at " <<
771 ipcIo.offset << '+' << wroteSoFar << " on " << attempts <<
772 " try");
773
774 wroteSoFar += wroteNow;
775
776 if (wroteNow >= toWrite) {
777 ipcIo.xerrno = 0;
778 ipcIo.len = wroteSoFar;
779 return; // wrote everything there was to write
780 }
781
782 buf += wroteNow;
783 offset += wroteNow;
784 toWrite -= wroteNow;
9a51593d 785 }
8ed94021 786
82ed74dc
AR
787 debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " exhausted all " <<
788 attemptLimit << " attempts while writing " <<
4e4f7fd9
AR
789 toWrite << '/' << ipcIo.len << " at " << ipcIo.offset << '+' <<
790 wroteSoFar);
791 return; // not a fatal I/O error, unless the caller treats it as such
792}
793
794static void
795diskerWrite(IpcIoMsg &ipcIo)
796{
797 diskerWriteAttempts(ipcIo); // may fail
8ed94021 798 Ipc::Mem::PutPage(ipcIo.page);
254912f3
AR
799}
800
c792f7fc 801void
df881a0f 802IpcIoFile::DiskerHandleMoreRequests(void *source)
c792f7fc 803{
df881a0f
AR
804 debugs(47, 7, HERE << "resuming handling requests after " <<
805 static_cast<const char *>(source));
c792f7fc
AR
806 DiskerHandleMoreRequestsScheduled = false;
807 IpcIoFile::DiskerHandleRequests();
808}
809
df881a0f
AR
810bool
811IpcIoFile::WaitBeforePop()
812{
6c6a656a 813 const int ioRate = queue->localRateLimit().load();
df881a0f
AR
814 const double maxRate = ioRate/1e3; // req/ms
815
816 // do we need to enforce configured I/O rate?
817 if (maxRate <= 0)
818 return false;
819
820 // is there an I/O request we could potentially delay?
1e614370
DK
821 int processId;
822 IpcIoMsg ipcIo;
823 if (!queue->peek(processId, ipcIo)) {
824 // unlike pop(), peek() is not reliable and does not block reader
df881a0f
AR
825 // so we must proceed with pop() even if it is likely to fail
826 return false;
827 }
828
9a585a30 829 static timeval LastIo = current_time;
df881a0f
AR
830
831 const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
832 // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
833 const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
834
835 const double credit = ioDuration; // what the last I/O should have cost us
836 const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
837 LastIo = current_time;
838
839 Ipc::QueueReader::Balance &balance = queue->localBalance();
840 balance += static_cast<int64_t>(credit - debit);
841
842 debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
843
1e614370
DK
844 if (ipcIo.command == IpcIo::cmdWrite && balance > maxImbalance) {
845 // if the next request is (likely) write and we accumulated
846 // too much time for future slow I/Os, then shed accumulated
847 // time to keep just half of the excess
df881a0f 848 const int64_t toSpend = balance - maxImbalance/2;
69321ae9
AR
849
850 if (toSpend/1e3 > Timeout)
82ed74dc
AR
851 debugs(47, DBG_IMPORTANT, "WARNING: " << DbName << " delays " <<
852 "I/O requests for " << (toSpend/1e3) << " seconds " <<
853 "to obey " << ioRate << "/sec rate limit");
69321ae9 854
df881a0f
AR
855 debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" <<
856 (1e3*maxRate) << "/sec rate");
857 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
858 &IpcIoFile::DiskerHandleMoreRequests,
859 const_cast<char*>("rate limiting"),
860 toSpend/1e3, 0, false);
861 DiskerHandleMoreRequestsScheduled = true;
862 return true;
e29ccb57 863 } else if (balance < -maxImbalance) {
df881a0f
AR
864 // do not owe "too much" to avoid "too large" bursts of I/O
865 balance = -maxImbalance;
866 }
867
868 return false;
869}
870
254912f3 871void
f5591061 872IpcIoFile::DiskerHandleRequests()
254912f3 873{
c792f7fc 874 // Balance our desire to maximize the number of concurrent I/O requests
fdb3059b 875 // (reordred by OS to minimize seek time) with a requirement to
c792f7fc 876 // send 1st-I/O notification messages, process Coordinator events, etc.
fdb3059b
AR
877 const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms
878 const timeval loopStart = current_time;
879
c792f7fc 880 int popped = 0;
fa61cefe
AR
881 int workerId = 0;
882 IpcIoMsg ipcIo;
df881a0f 883 while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
fdb3059b
AR
884 ++popped;
885
886 // at least one I/O per call is guaranteed if the queue is not empty
fa61cefe
AR
887 DiskerHandleRequest(workerId, ipcIo);
888
fdb3059b
AR
889 getCurrentTime();
890 const double elapsedMsec = tvSubMsec(loopStart, current_time);
891 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
892 if (!DiskerHandleMoreRequestsScheduled) {
893 // the gap must be positive for select(2) to be given a chance
894 const double minBreakSecs = 0.001;
895 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
9199139f 896 &IpcIoFile::DiskerHandleMoreRequests,
df881a0f
AR
897 const_cast<char*>("long I/O loop"),
898 minBreakSecs, 0, false);
fdb3059b
AR
899 DiskerHandleMoreRequestsScheduled = true;
900 }
901 debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " <<
9199139f 902 elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O");
fdb3059b
AR
903 break;
904 }
c792f7fc
AR
905 }
906
fdb3059b 907 // TODO: consider using O_DIRECT with "elevator" optimization where we pop
fa61cefe
AR
908 // requests first, then reorder the popped requests to optimize seek time,
909 // then do I/O, then take a break, and come back for the next set of I/O
910 // requests.
9a51593d 911}
254912f3 912
9a51593d
DK
913/// called when disker receives an I/O request
914void
915IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
916{
9a51593d 917 if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
82ed74dc 918 debugs(0, DBG_CRITICAL, "ERROR: " << DbName <<
9a51593d
DK
919 " should not receive " << ipcIo.command <<
920 " ipcIo" << workerId << '.' << ipcIo.requestId);
921 return;
922 }
923
924 debugs(47,5, HERE << "disker" << KidIdentifier <<
925 (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
926 ipcIo.len << " at " << ipcIo.offset <<
927 " ipcIo" << workerId << '.' << ipcIo.requestId);
928
929 if (ipcIo.command == IpcIo::cmdRead)
930 diskerRead(ipcIo);
931 else // ipcIo.command == IpcIo::cmdWrite
932 diskerWrite(ipcIo);
933
f5591061 934 debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
fa61cefe 935
7a907247 936 try {
f5591061 937 if (queue->push(workerId, ipcIo))
5e44782e 938 Notify(workerId); // must notify worker
f5591061 939 } catch (const Queue::Full &) {
387e1d45
EB
940 // The worker pop queue should not overflow because the worker can
941 // push only if pendingRequests() is less than QueueCapacity.
82ed74dc
AR
942 debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue for " <<
943 DbName << " overflow: " <<
5e44782e 944 SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
fa61cefe
AR
945
946 // the I/O request we could not push will timeout
7a907247 947 }
254912f3
AR
948}
949
950static bool
ced8def3 951DiskerOpen(const SBuf &path, int flags, mode_t)
254912f3
AR
952{
953 assert(TheFile < 0);
954
82ed74dc 955 DbName = path;
7015a149 956 TheFile = file_open(DbName.c_str(), flags);
254912f3
AR
957
958 if (TheFile < 0) {
959 const int xerrno = errno;
82ed74dc 960 debugs(47, DBG_CRITICAL, "ERROR: cannot open " << DbName << ": " <<
254912f3
AR
961 xstrerr(xerrno));
962 return false;
9a51593d 963 }
254912f3 964
cb4185f1 965 ++store_open_disk_fd;
7015a149 966 debugs(79,3, "rock db opened " << DbName << ": FD " << TheFile);
9a51593d 967 return true;
254912f3
AR
968}
969
970static void
7015a149 971DiskerClose(const SBuf &path)
254912f3
AR
972{
973 if (TheFile >= 0) {
974 file_close(TheFile);
975 debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
976 TheFile = -1;
5e263176 977 --store_open_disk_fd;
a1c98830 978 }
7015a149 979 DbName.clear();
254912f3 980}
f5591061 981
ea2cdeb6 982/// reports our needs for shared memory pages to Ipc::Mem::Pages
21b7990f
AR
983/// and initializes shared memory segments used by IpcIoFile
984class IpcIoRr: public Ipc::Mem::RegisteredRunner
ea2cdeb6
DK
985{
986public:
987 /* RegisteredRunner API */
21b7990f
AR
988 IpcIoRr(): owner(NULL) {}
989 virtual ~IpcIoRr();
990 virtual void claimMemoryNeeds();
991
992protected:
993 /* Ipc::Mem::RegisteredRunner API */
994 virtual void create();
995
996private:
997 Ipc::FewToFewBiQueue::Owner *owner;
ea2cdeb6
DK
998};
999
21b7990f 1000RunnerRegistrationEntry(IpcIoRr);
ea2cdeb6 1001
ea2cdeb6 1002void
21b7990f 1003IpcIoRr::claimMemoryNeeds()
ea2cdeb6
DK
1004{
1005 const int itemsCount = Ipc::FewToFewBiQueue::MaxItemsCount(
c11211d9 1006 ::Config.workers, ::Config.cacheSwap.n_strands, QueueCapacity);
ea2cdeb6
DK
1007 // the maximum number of shared I/O pages is approximately the
1008 // number of queue slots, we add a fudge factor to that to account
1009 // for corner cases where I/O pages are created before queue
1010 // limits are checked or destroyed long after the I/O is dequeued
29a9ff4e
DK
1011 Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage,
1012 static_cast<int>(itemsCount * 1.1));
ea2cdeb6
DK
1013}
1014
21b7990f
AR
1015void
1016IpcIoRr::create()
f5591061 1017{
e9f68413 1018 if (Config.cacheSwap.n_strands <= 0)
f5591061
DK
1019 return;
1020
4404f1c5 1021 Must(!owner);
4404f1c5 1022 owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1,
3b581957 1023 Config.cacheSwap.n_strands,
4404f1c5 1024 1 + Config.workers, sizeof(IpcIoMsg),
ea2cdeb6 1025 QueueCapacity);
f5591061
DK
1026}
1027
1028IpcIoRr::~IpcIoRr()
1029{
1030 delete owner;
1031}
f53969cc 1032