]> git.ipfire.org Git - thirdparty/squid.git/blame - src/DiskIO/IpcIo/IpcIoFile.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / DiskIO / IpcIo / IpcIoFile.cc
CommitLineData
254912f3
AR
1/*
2 * $Id$
3 *
4 * DEBUG: section 47 Store Directory Routines
5 */
6
f7f3304a 7#include "squid.h"
f5591061 8#include "base/RunnersRegistry.h"
254912f3
AR
9#include "base/TextException.h"
10#include "DiskIO/IORequestor.h"
11#include "DiskIO/IpcIo/IpcIoFile.h"
12#include "DiskIO/ReadRequest.h"
13#include "DiskIO/WriteRequest.h"
582c2af2 14#include "globals.h"
21d845b1 15#include "ipc/mem/Pages.h"
254912f3
AR
16#include "ipc/Messages.h"
17#include "ipc/Port.h"
f5591061 18#include "ipc/Queue.h"
9a51593d 19#include "ipc/StrandSearch.h"
254912f3 20#include "ipc/UdsOp.h"
582c2af2 21#include "protos.h"
21d845b1
FC
22#include "SquidTime.h"
23#include "StatCounters.h"
254912f3 24
21d845b1
FC
25#if HAVE_ERRNO_H
26#include <errno.h>
27#endif
254912f3
AR
28CBDATA_CLASS_INIT(IpcIoFile);
29
f5591061
DK
30/// shared memory segment path to use for IpcIoFile maps
31static const char *const ShmLabel = "io_file";
ea2cdeb6
DK
32/// a single worker-to-disker or disker-to-worker queue capacity; up
33/// to 2*QueueCapacity I/O requests queued between a single worker and
34/// a single disker
35// TODO: make configurable or compute from squid.conf settings if possible
36static const int QueueCapacity = 1024;
f5591061 37
fa61cefe 38const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
b2aa0934
DK
39IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
40IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
f5591061 41std::auto_ptr<IpcIoFile::Queue> IpcIoFile::queue;
254912f3 42
c792f7fc
AR
43bool IpcIoFile::DiskerHandleMoreRequestsScheduled = false;
44
254912f3
AR
45static bool DiskerOpen(const String &path, int flags, mode_t mode);
46static void DiskerClose(const String &path);
47
fa61cefe
AR
48/// IpcIo wrapper for debugs() streams; XXX: find a better class name
49struct SipcIo {
50 SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
9199139f 51 worker(aWorker), msg(aMsg), disker(aDisker) {}
fa61cefe
AR
52
53 int worker;
54 const IpcIoMsg &msg;
55 int disker;
56};
57
58std::ostream &
59operator <<(std::ostream &os, const SipcIo &sio)
60{
61 return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
9199139f 62 (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker;
fa61cefe
AR
63}
64
254912f3 65IpcIoFile::IpcIoFile(char const *aDb):
9199139f
AR
66 dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
67 olderRequests(&requestMap1), newerRequests(&requestMap2),
68 timeoutCheckScheduled(false)
254912f3
AR
69{
70}
71
72IpcIoFile::~IpcIoFile()
73{
b2aa0934
DK
74 if (diskId >= 0) {
75 const IpcIoFilesMap::iterator i = IpcIoFiles.find(diskId);
76 // XXX: warn and continue?
77 Must(i != IpcIoFiles.end());
78 Must(i->second == this);
79 IpcIoFiles.erase(i);
80 }
254912f3
AR
81}
82
43ebbac3
AR
83void
84IpcIoFile::configure(const Config &cfg)
85{
86 DiskFile::configure(cfg);
87 config = cfg;
88}
89
254912f3
AR
90void
91IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
92{
93 ioRequestor = callback;
94 Must(diskId < 0); // we do not know our disker yet
f5591061
DK
95
96 if (!queue.get())
97 queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier));
254912f3
AR
98
99 if (IamDiskProcess()) {
100 error_ = !DiskerOpen(dbName, flags, mode);
f7091279
DK
101 if (error_)
102 return;
103
b2aa0934
DK
104 diskId = KidIdentifier;
105 const bool inserted =
106 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
107 Must(inserted);
9a51593d 108
df881a0f
AR
109 queue->localRateLimit() =
110 static_cast<Ipc::QueueReader::Rate::Value>(config.ioRate);
111
f7091279
DK
112 Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
113 ann.strand.tag = dbName;
114 Ipc::TypedMsgHdr message;
115 ann.pack(message);
116 SendMessage(Ipc::coordinatorAddr, message);
117
e528e570 118 ioRequestor->ioCompletedNotification();
254912f3 119 return;
9a51593d 120 }
254912f3 121
9a51593d
DK
122 Ipc::StrandSearchRequest request;
123 request.requestorId = KidIdentifier;
124 request.tag = dbName;
254912f3 125
9a51593d
DK
126 Ipc::TypedMsgHdr msg;
127 request.pack(msg);
128 Ipc::SendMessage(Ipc::coordinatorAddr, msg);
129
b2aa0934
DK
130 WaitingForOpen.push_back(this);
131
132 eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
133 this, Timeout, 0, false); // "this" pointer is used as id
254912f3
AR
134}
135
136void
9199139f
AR
137IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response)
138{
9a51593d 139 Must(diskId < 0); // we do not know our disker yet
9a51593d
DK
140
141 if (!response) {
e0236918 142 debugs(79, DBG_IMPORTANT, HERE << "error: timeout");
caca86d7 143 error_ = true;
9a51593d
DK
144 } else {
145 diskId = response->strand.kidId;
146 if (diskId >= 0) {
b2aa0934
DK
147 const bool inserted =
148 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
149 Must(inserted);
9a51593d 150 } else {
254912f3 151 error_ = true;
e0236918 152 debugs(79, DBG_IMPORTANT, HERE << "error: no disker claimed " << dbName);
9a51593d
DK
153 }
154 }
254912f3
AR
155
156 ioRequestor->ioCompletedNotification();
157}
158
159/**
160 * Alias for IpcIoFile::open(...)
161 \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
162 */
163void
164IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
165{
166 assert(false); // check
167 /* We use the same logic path for open */
168 open(flags, mode, callback);
169}
170
171void
172IpcIoFile::close()
173{
174 assert(ioRequestor != NULL);
175
176 if (IamDiskProcess())
177 DiskerClose(dbName);
9a51593d 178 // XXX: else nothing to do?
254912f3
AR
179
180 ioRequestor->closeCompleted();
181}
182
183bool
184IpcIoFile::canRead() const
185{
0a11e039 186 return diskId >= 0 && canWait();
254912f3
AR
187}
188
189bool
190IpcIoFile::canWrite() const
191{
0a11e039 192 return diskId >= 0 && canWait();
254912f3
AR
193}
194
195bool
196IpcIoFile::error() const
197{
198 return error_;
199}
200
201void
202IpcIoFile::read(ReadRequest *readRequest)
203{
204 debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " <<
9199139f 205 readRequest->offset << ")");
254912f3
AR
206
207 assert(ioRequestor != NULL);
254912f3
AR
208 assert(readRequest->offset >= 0);
209 Must(!error_);
210
211 //assert(minOffset < 0 || minOffset <= readRequest->offset);
212 //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
213
9a51593d 214 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
254912f3 215 pending->readRequest = readRequest;
7a907247 216 push(pending);
254912f3
AR
217}
218
219void
220IpcIoFile::readCompleted(ReadRequest *readRequest,
8ed94021 221 IpcIoMsg *const response)
254912f3 222{
caca86d7 223 bool ioError = false;
9a51593d 224 if (!response) {
0ef0509e 225 debugs(79, 3, HERE << "error: timeout");
caca86d7 226 ioError = true; // I/O timeout does not warrant setting error_?
9a51593d 227 } else {
70e3f706 228 if (response->xerrno) {
e0236918 229 debugs(79, DBG_IMPORTANT, HERE << "error: " << xstrerr(response->xerrno));
70e3f706 230 ioError = error_ = true;
9199139f 231 } else if (!response->page) {
e0236918 232 debugs(79, DBG_IMPORTANT, HERE << "error: run out of shared memory pages");
70e3f706
DK
233 ioError = true;
234 } else {
235 const char *const buf = Ipc::Mem::PagePointer(response->page);
236 memcpy(readRequest->buf, buf, response->len);
237 }
254912f3 238
70e3f706
DK
239 Ipc::Mem::PutPage(response->page);
240 }
8ed94021 241
caca86d7 242 const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
9199139f 243 const int errflag = ioError ? DISK_ERROR :DISK_OK;
20b0e1fe 244 ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
254912f3
AR
245}
246
247void
248IpcIoFile::write(WriteRequest *writeRequest)
249{
250 debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " <<
9199139f 251 writeRequest->offset << ")");
254912f3
AR
252
253 assert(ioRequestor != NULL);
254912f3
AR
254 assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
255 assert(writeRequest->offset >= 0);
256 Must(!error_);
257
258 //assert(minOffset < 0 || minOffset <= writeRequest->offset);
259 //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
260
9a51593d 261 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
254912f3 262 pending->writeRequest = writeRequest;
7a907247 263 push(pending);
254912f3
AR
264}
265
266void
267IpcIoFile::writeCompleted(WriteRequest *writeRequest,
9a51593d 268 const IpcIoMsg *const response)
254912f3 269{
caca86d7 270 bool ioError = false;
9a51593d 271 if (!response) {
0ef0509e 272 debugs(79, 3, HERE << "error: timeout");
caca86d7 273 ioError = true; // I/O timeout does not warrant setting error_?
9199139f 274 } else if (response->xerrno) {
e0236918 275 debugs(79, DBG_IMPORTANT, HERE << "error: " << xstrerr(response->xerrno));
9a51593d 276 ioError = error_ = true;
9199139f 277 } else if (response->len != writeRequest->len) {
e0236918 278 debugs(79, DBG_IMPORTANT, HERE << "problem: " << response->len << " < " << writeRequest->len);
254912f3
AR
279 error_ = true;
280 }
281
282 if (writeRequest->free_func)
283 (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
284
caca86d7 285 if (!ioError) {
254912f3 286 debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
9199139f
AR
287 diskId << " at " << writeRequest->offset);
288 }
254912f3 289
caca86d7 290 const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
9199139f 291 const int errflag = ioError ? DISK_ERROR :DISK_OK;
254912f3
AR
292 ioRequestor->writeCompleted(errflag, rlen, writeRequest);
293}
294
295bool
296IpcIoFile::ioInProgress() const
297{
9a51593d 298 return !olderRequests->empty() || !newerRequests->empty();
254912f3
AR
299}
300
9a51593d 301/// track a new pending request
254912f3 302void
7a907247 303IpcIoFile::trackPendingRequest(IpcIoPendingRequest *const pending)
254912f3 304{
7a907247 305 newerRequests->insert(std::make_pair(lastRequestId, pending));
9a51593d
DK
306 if (!timeoutCheckScheduled)
307 scheduleTimeoutCheck();
308}
254912f3 309
9a51593d
DK
310/// push an I/O request to disker
311void
7a907247 312IpcIoFile::push(IpcIoPendingRequest *const pending)
9a51593d 313{
fa61cefe 314 // prevent queue overflows: check for responses to earlier requests
f5591061 315 HandleResponses("before push");
fa61cefe 316
a1c98830 317 debugs(47, 7, HERE);
9a51593d 318 Must(diskId >= 0);
7a907247
DK
319 Must(pending);
320 Must(pending->readRequest || pending->writeRequest);
9a51593d
DK
321
322 IpcIoMsg ipcIo;
8ed94021
DK
323 try {
324 ipcIo.requestId = lastRequestId;
0a11e039 325 ipcIo.start = current_time;
8ed94021
DK
326 if (pending->readRequest) {
327 ipcIo.command = IpcIo::cmdRead;
328 ipcIo.offset = pending->readRequest->offset;
329 ipcIo.len = pending->readRequest->len;
330 } else { // pending->writeRequest
331 Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
551f8a18 332 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
8ed94021 333 ipcIo.len = 0;
551f8a18 334 throw TexcHere("run out of shared memory pages for IPC I/O");
8ed94021
DK
335 }
336 ipcIo.command = IpcIo::cmdWrite;
337 ipcIo.offset = pending->writeRequest->offset;
338 ipcIo.len = pending->writeRequest->len;
339 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
340 memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
341 }
254912f3 342
f5591061 343 debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
fa61cefe 344
f5591061 345 if (queue->push(diskId, ipcIo))
fa61cefe 346 Notify(diskId); // must notify disker
7a907247 347 trackPendingRequest(pending);
f5591061 348 } catch (const Queue::Full &) {
fa61cefe
AR
349 debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " <<
350 SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
351 // TODO: grow queue size
9199139f 352
0ef0509e 353 pending->completeIo(NULL);
8ed94021
DK
354 delete pending;
355 } catch (const TextException &e) {
356 debugs(47, DBG_IMPORTANT, HERE << e.what());
0ef0509e 357 pending->completeIo(NULL);
7a907247 358 delete pending;
9a51593d 359 }
254912f3
AR
360}
361
0a11e039
AR
362/// whether we think there is enough time to complete the I/O
363bool
9199139f
AR
364IpcIoFile::canWait() const
365{
43ebbac3 366 if (!config.ioTimeout)
0a11e039
AR
367 return true; // no timeout specified
368
369 IpcIoMsg oldestIo;
5aac671b 370 if (!queue->findOldest(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
0a11e039
AR
371 return true; // we cannot estimate expected wait time; assume it is OK
372
55939a01
AR
373 const int oldestWait = tvSubMsec(oldestIo.start, current_time);
374
375 int rateWait = -1; // time in millisecons
376 const Ipc::QueueReader::Rate::Value ioRate = queue->rateLimit(diskId);
377 if (ioRate > 0) {
378 // if there are N requests pending, the new one will wait at
379 // least N/max-swap-rate seconds
75017bc9 380 rateWait = static_cast<int>(1e3 * queue->outSize(diskId) / ioRate);
55939a01
AR
381 // adjust N/max-swap-rate value based on the queue "balance"
382 // member, in case we have been borrowing time against future
383 // I/O already
384 rateWait += queue->balance(diskId);
385 }
386
387 const int expectedWait = max(oldestWait, rateWait);
0a11e039 388 if (expectedWait < 0 ||
43ebbac3 389 static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
0a11e039
AR
390 return true; // expected wait time is acceptible
391
c792f7fc
AR
392 debugs(47,2, HERE << "cannot wait: " << expectedWait <<
393 " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
0a11e039
AR
394 return false; // do not want to wait that long
395}
396
9a51593d 397/// called when coordinator responds to worker open request
254912f3 398void
9a51593d 399IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
254912f3 400{
9a51593d 401 debugs(47, 7, HERE << "coordinator response to open request");
b2aa0934 402 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
9199139f 403 i != WaitingForOpen.end(); ++i) {
b2aa0934
DK
404 if (response.strand.tag == (*i)->dbName) {
405 (*i)->openCompleted(&response);
406 WaitingForOpen.erase(i);
407 return;
408 }
409 }
410
411 debugs(47, 4, HERE << "LATE disker response to open for " <<
412 response.strand.tag);
413 // nothing we can do about it; completeIo() has been called already
9a51593d 414}
254912f3 415
9a51593d 416void
f5591061 417IpcIoFile::HandleResponses(const char *const when)
fa61cefe
AR
418{
419 debugs(47, 4, HERE << "popping all " << when);
fa61cefe
AR
420 IpcIoMsg ipcIo;
421 // get all responses we can: since we are not pushing, this will stop
f5591061
DK
422 int diskId;
423 while (queue->pop(diskId, ipcIo)) {
424 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
425 Must(i != IpcIoFiles.end()); // TODO: warn but continue
426 i->second->handleResponse(ipcIo);
427 }
9a51593d 428}
254912f3 429
9a51593d 430void
8ed94021 431IpcIoFile::handleResponse(IpcIoMsg &ipcIo)
9a51593d
DK
432{
433 const int requestId = ipcIo.requestId;
fa61cefe 434 debugs(47, 7, HERE << "popped disker response: " <<
9199139f 435 SipcIo(KidIdentifier, ipcIo, diskId));
fa61cefe 436
9a51593d
DK
437 Must(requestId);
438 if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
439 pending->completeIo(&ipcIo);
caca86d7
AR
440 delete pending; // XXX: leaking if throwing
441 } else {
9a51593d
DK
442 debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command <<
443 "; ipcIo" << KidIdentifier << '.' << requestId);
caca86d7
AR
444 // nothing we can do about it; completeIo() has been called already
445 }
254912f3
AR
446}
447
254912f3 448void
9a51593d 449IpcIoFile::Notify(const int peerId)
254912f3 450{
fa61cefe 451 // TODO: Count and report the total number of notifications, pops, pushes.
a1c98830 452 debugs(47, 7, HERE << "kid" << peerId);
9a51593d
DK
453 Ipc::TypedMsgHdr msg;
454 msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
455 msg.putInt(KidIdentifier);
456 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, peerId);
457 Ipc::SendMessage(addr, msg);
458}
459
460void
461IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
462{
fa61cefe
AR
463 const int from = msg.getInt();
464 debugs(47, 7, HERE << "from " << from);
f5591061
DK
465 queue->clearReaderSignal(from);
466 if (IamDiskProcess())
467 DiskerHandleRequests();
468 else
469 HandleResponses("after notification");
9a51593d
DK
470}
471
b2aa0934
DK
472/// handles open request timeout
473void
474IpcIoFile::OpenTimeout(void *const param)
475{
476 Must(param);
477 // the pointer is used for comparison only and not dereferenced
478 const IpcIoFile *const ipcIoFile =
479 reinterpret_cast<const IpcIoFile *>(param);
480 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
9199139f 481 i != WaitingForOpen.end(); ++i) {
b2aa0934
DK
482 if (*i == ipcIoFile) {
483 (*i)->openCompleted(NULL);
484 WaitingForOpen.erase(i);
485 break;
486 }
487 }
488}
489
9a51593d
DK
490/// IpcIoFile::checkTimeouts wrapper
491void
492IpcIoFile::CheckTimeouts(void *const param)
493{
9a51593d 494 Must(param);
b2aa0934
DK
495 const int diskId = reinterpret_cast<uintptr_t>(param);
496 debugs(47, 7, HERE << "diskId=" << diskId);
497 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
498 if (i != IpcIoFiles.end())
499 i->second->checkTimeouts();
9a51593d
DK
500}
501
502void
503IpcIoFile::checkTimeouts()
504{
9a51593d 505 timeoutCheckScheduled = false;
254912f3 506
0ef0509e
AR
507 // last chance to recover in case a notification message was lost, etc.
508 const RequestMap::size_type timeoutsBefore = olderRequests->size();
509 HandleResponses("before timeout");
510 const RequestMap::size_type timeoutsNow = olderRequests->size();
511
512 if (timeoutsBefore > timeoutsNow) { // some requests were rescued
513 // notification message lost or significantly delayed?
514 debugs(47, DBG_IMPORTANT, "WARNING: communication with disker " <<
515 "may be too slow or disrupted for about " <<
516 Timeout << "s; rescued " << (timeoutsBefore - timeoutsNow) <<
517 " out of " << timeoutsBefore << " I/Os");
518 }
519
520 if (timeoutsNow) {
521 debugs(47, DBG_IMPORTANT, "WARNING: abandoning " <<
522 timeoutsNow << " I/Os after at least " <<
523 Timeout << "s timeout");
524 }
525
caca86d7
AR
526 // any old request would have timed out by now
527 typedef RequestMap::const_iterator RMCI;
9a51593d
DK
528 for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
529 IpcIoPendingRequest *const pending = i->second;
254912f3 530
b2aa0934 531 const unsigned int requestId = i->first;
caca86d7 532 debugs(47, 7, HERE << "disker timeout; ipcIo" <<
b2aa0934 533 KidIdentifier << '.' << requestId);
254912f3 534
caca86d7
AR
535 pending->completeIo(NULL); // no response
536 delete pending; // XXX: leaking if throwing
9a51593d
DK
537 }
538 olderRequests->clear();
caca86d7 539
9a51593d 540 swap(olderRequests, newerRequests); // switches pointers around
0ef0509e 541 if (!olderRequests->empty() && !timeoutCheckScheduled)
9a51593d 542 scheduleTimeoutCheck();
254912f3
AR
543}
544
caca86d7 545/// prepare to check for timeouts in a little while
254912f3 546void
9a51593d 547IpcIoFile::scheduleTimeoutCheck()
254912f3 548{
b2aa0934 549 // we check all older requests at once so some may be wait for 2*Timeout
caca86d7 550 eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
b2aa0934 551 reinterpret_cast<void *>(diskId), Timeout, 0, false);
9a51593d 552 timeoutCheckScheduled = true;
254912f3
AR
553}
554
555/// returns and forgets the right IpcIoFile pending request
556IpcIoPendingRequest *
9a51593d 557IpcIoFile::dequeueRequest(const unsigned int requestId)
254912f3 558{
254912f3 559 Must(requestId != 0);
caca86d7 560
9a51593d
DK
561 RequestMap *map = NULL;
562 RequestMap::iterator i = requestMap1.find(requestId);
caca86d7 563
9a51593d
DK
564 if (i != requestMap1.end())
565 map = &requestMap1;
caca86d7 566 else {
9a51593d
DK
567 i = requestMap2.find(requestId);
568 if (i != requestMap2.end())
569 map = &requestMap2;
caca86d7
AR
570 }
571
572 if (!map) // not found in both maps
573 return NULL;
574
575 IpcIoPendingRequest *pending = i->second;
576 map->erase(i);
577 return pending;
254912f3
AR
578}
579
580int
9a51593d 581IpcIoFile::getFD() const
254912f3
AR
582{
583 assert(false); // not supported; TODO: remove this method from API
584 return -1;
585}
586
9a51593d 587/* IpcIoMsg */
254912f3 588
9a51593d 589IpcIoMsg::IpcIoMsg():
9199139f 590 requestId(0), offset(0), len(0), command(IpcIo::cmdNone), xerrno(0)
254912f3 591{
0a11e039 592 start.tv_sec = 0;
254912f3
AR
593}
594
caca86d7 595/* IpcIoPendingRequest */
254912f3
AR
596
597IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
9199139f 598 file(aFile), readRequest(NULL), writeRequest(NULL)
254912f3 599{
b2aa0934 600 Must(file != NULL);
9a51593d
DK
601 if (++file->lastRequestId == 0) // don't use zero value as requestId
602 ++file->lastRequestId;
254912f3
AR
603}
604
caca86d7 605void
8ed94021 606IpcIoPendingRequest::completeIo(IpcIoMsg *const response)
caca86d7 607{
caca86d7
AR
608 if (readRequest)
609 file->readCompleted(readRequest, response);
9199139f 610 else if (writeRequest)
caca86d7 611 file->writeCompleted(writeRequest, response);
9a51593d
DK
612 else {
613 Must(!response); // only timeouts are handled here
614 file->openCompleted(NULL);
615 }
caca86d7
AR
616}
617
254912f3
AR
618/* XXX: disker code that should probably be moved elsewhere */
619
620static int TheFile = -1; ///< db file descriptor
621
9a51593d
DK
622static void
623diskerRead(IpcIoMsg &ipcIo)
254912f3 624{
551f8a18 625 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
8ed94021 626 ipcIo.len = 0;
c792f7fc 627 debugs(47,2, HERE << "run out of shared memory pages for IPC I/O");
8ed94021
DK
628 return;
629 }
630
631 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
632 const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
e4f1fdae 633 ++statCounter.syscalls.disk.reads;
2d338731
AR
634 fd_bytes(TheFile, read, FD_READ);
635
254912f3 636 if (read >= 0) {
9a51593d
DK
637 ipcIo.xerrno = 0;
638 const size_t len = static_cast<size_t>(read); // safe because read > 0
254912f3 639 debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
9199139f 640 (len == ipcIo.len ? "all " : "just ") << read);
9a51593d
DK
641 ipcIo.len = len;
642 } else {
643 ipcIo.xerrno = errno;
644 ipcIo.len = 0;
254912f3 645 debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
9199139f 646 ipcIo.xerrno);
9a51593d 647 }
254912f3
AR
648}
649
9a51593d
DK
650static void
651diskerWrite(IpcIoMsg &ipcIo)
254912f3 652{
8ed94021
DK
653 const char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
654 const ssize_t wrote = pwrite(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
e4f1fdae 655 ++statCounter.syscalls.disk.writes;
2d338731
AR
656 fd_bytes(TheFile, wrote, FD_WRITE);
657
254912f3 658 if (wrote >= 0) {
9a51593d
DK
659 ipcIo.xerrno = 0;
660 const size_t len = static_cast<size_t>(wrote); // safe because wrote > 0
254912f3 661 debugs(47,8, HERE << "disker" << KidIdentifier << " wrote " <<
9199139f 662 (len == ipcIo.len ? "all " : "just ") << wrote);
9a51593d
DK
663 ipcIo.len = len;
664 } else {
665 ipcIo.xerrno = errno;
666 ipcIo.len = 0;
254912f3 667 debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " <<
9a51593d
DK
668 ipcIo.xerrno);
669 }
8ed94021
DK
670
671 Ipc::Mem::PutPage(ipcIo.page);
254912f3
AR
672}
673
c792f7fc 674void
df881a0f 675IpcIoFile::DiskerHandleMoreRequests(void *source)
c792f7fc 676{
df881a0f
AR
677 debugs(47, 7, HERE << "resuming handling requests after " <<
678 static_cast<const char *>(source));
c792f7fc
AR
679 DiskerHandleMoreRequestsScheduled = false;
680 IpcIoFile::DiskerHandleRequests();
681}
682
df881a0f
AR
683bool
684IpcIoFile::WaitBeforePop()
685{
686 const Ipc::QueueReader::Rate::Value ioRate = queue->localRateLimit();
687 const double maxRate = ioRate/1e3; // req/ms
688
689 // do we need to enforce configured I/O rate?
690 if (maxRate <= 0)
691 return false;
692
693 // is there an I/O request we could potentially delay?
1e614370
DK
694 int processId;
695 IpcIoMsg ipcIo;
696 if (!queue->peek(processId, ipcIo)) {
697 // unlike pop(), peek() is not reliable and does not block reader
df881a0f
AR
698 // so we must proceed with pop() even if it is likely to fail
699 return false;
700 }
701
9a585a30 702 static timeval LastIo = current_time;
df881a0f
AR
703
704 const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
705 // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
706 const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
707
708 const double credit = ioDuration; // what the last I/O should have cost us
709 const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
710 LastIo = current_time;
711
712 Ipc::QueueReader::Balance &balance = queue->localBalance();
713 balance += static_cast<int64_t>(credit - debit);
714
715 debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
716
1e614370
DK
717 if (ipcIo.command == IpcIo::cmdWrite && balance > maxImbalance) {
718 // if the next request is (likely) write and we accumulated
719 // too much time for future slow I/Os, then shed accumulated
720 // time to keep just half of the excess
df881a0f 721 const int64_t toSpend = balance - maxImbalance/2;
69321ae9
AR
722
723 if (toSpend/1e3 > Timeout)
724 debugs(47, DBG_IMPORTANT, "WARNING: Rock disker delays I/O " <<
725 "requests for " << (toSpend/1e3) << " seconds to obey " <<
726 ioRate << "/sec rate limit");
727
df881a0f
AR
728 debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" <<
729 (1e3*maxRate) << "/sec rate");
730 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
731 &IpcIoFile::DiskerHandleMoreRequests,
732 const_cast<char*>("rate limiting"),
733 toSpend/1e3, 0, false);
734 DiskerHandleMoreRequestsScheduled = true;
735 return true;
e29ccb57 736 } else if (balance < -maxImbalance) {
df881a0f
AR
737 // do not owe "too much" to avoid "too large" bursts of I/O
738 balance = -maxImbalance;
739 }
740
741 return false;
742}
743
254912f3 744void
f5591061 745IpcIoFile::DiskerHandleRequests()
254912f3 746{
c792f7fc 747 // Balance our desire to maximize the number of concurrent I/O requests
fdb3059b 748 // (reordred by OS to minimize seek time) with a requirement to
c792f7fc 749 // send 1st-I/O notification messages, process Coordinator events, etc.
fdb3059b
AR
750 const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms
751 const timeval loopStart = current_time;
752
c792f7fc 753 int popped = 0;
fa61cefe
AR
754 int workerId = 0;
755 IpcIoMsg ipcIo;
df881a0f 756 while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
fdb3059b
AR
757 ++popped;
758
759 // at least one I/O per call is guaranteed if the queue is not empty
fa61cefe
AR
760 DiskerHandleRequest(workerId, ipcIo);
761
fdb3059b
AR
762 getCurrentTime();
763 const double elapsedMsec = tvSubMsec(loopStart, current_time);
764 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
765 if (!DiskerHandleMoreRequestsScheduled) {
766 // the gap must be positive for select(2) to be given a chance
767 const double minBreakSecs = 0.001;
768 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
9199139f 769 &IpcIoFile::DiskerHandleMoreRequests,
df881a0f
AR
770 const_cast<char*>("long I/O loop"),
771 minBreakSecs, 0, false);
fdb3059b
AR
772 DiskerHandleMoreRequestsScheduled = true;
773 }
774 debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " <<
9199139f 775 elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O");
fdb3059b
AR
776 break;
777 }
c792f7fc
AR
778 }
779
fdb3059b 780 // TODO: consider using O_DIRECT with "elevator" optimization where we pop
fa61cefe
AR
781 // requests first, then reorder the popped requests to optimize seek time,
782 // then do I/O, then take a break, and come back for the next set of I/O
783 // requests.
9a51593d 784}
254912f3 785
9a51593d
DK
786/// called when disker receives an I/O request
787void
788IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
789{
9a51593d 790 if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
fa84c01d 791 debugs(0, DBG_CRITICAL, HERE << "disker" << KidIdentifier <<
9a51593d
DK
792 " should not receive " << ipcIo.command <<
793 " ipcIo" << workerId << '.' << ipcIo.requestId);
794 return;
795 }
796
797 debugs(47,5, HERE << "disker" << KidIdentifier <<
798 (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
799 ipcIo.len << " at " << ipcIo.offset <<
800 " ipcIo" << workerId << '.' << ipcIo.requestId);
801
802 if (ipcIo.command == IpcIo::cmdRead)
803 diskerRead(ipcIo);
804 else // ipcIo.command == IpcIo::cmdWrite
805 diskerWrite(ipcIo);
806
f5591061 807 debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
fa61cefe 808
7a907247 809 try {
f5591061 810 if (queue->push(workerId, ipcIo))
5e44782e 811 Notify(workerId); // must notify worker
f5591061 812 } catch (const Queue::Full &) {
fa61cefe
AR
813 // The worker queue should not overflow because the worker should pop()
814 // before push()ing and because if disker pops N requests at a time,
815 // we should make sure the worker pop() queue length is the worker
816 // push queue length plus N+1. XXX: implement the N+1 difference.
817 debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue overflow: " <<
5e44782e 818 SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
fa61cefe
AR
819
820 // the I/O request we could not push will timeout
7a907247 821 }
254912f3
AR
822}
823
824static bool
825DiskerOpen(const String &path, int flags, mode_t mode)
826{
827 assert(TheFile < 0);
828
829 TheFile = file_open(path.termedBuf(), flags);
830
831 if (TheFile < 0) {
832 const int xerrno = errno;
fa84c01d 833 debugs(47, DBG_CRITICAL, HERE << "rock db error opening " << path << ": " <<
254912f3
AR
834 xstrerr(xerrno));
835 return false;
9a51593d 836 }
254912f3 837
cb4185f1 838 ++store_open_disk_fd;
254912f3 839 debugs(79,3, HERE << "rock db opened " << path << ": FD " << TheFile);
9a51593d 840 return true;
254912f3
AR
841}
842
843static void
844DiskerClose(const String &path)
845{
846 if (TheFile >= 0) {
847 file_close(TheFile);
848 debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
849 TheFile = -1;
5e263176 850 --store_open_disk_fd;
a1c98830 851 }
254912f3 852}
f5591061 853
ea2cdeb6
DK
854/// reports our needs for shared memory pages to Ipc::Mem::Pages
855class IpcIoClaimMemoryNeedsRr: public RegisteredRunner
856{
857public:
858 /* RegisteredRunner API */
859 virtual void run(const RunnerRegistry &r);
860};
861
862RunnerRegistrationEntry(rrClaimMemoryNeeds, IpcIoClaimMemoryNeedsRr);
863
ea2cdeb6
DK
864void
865IpcIoClaimMemoryNeedsRr::run(const RunnerRegistry &)
866{
867 const int itemsCount = Ipc::FewToFewBiQueue::MaxItemsCount(
c11211d9 868 ::Config.workers, ::Config.cacheSwap.n_strands, QueueCapacity);
ea2cdeb6
DK
869 // the maximum number of shared I/O pages is approximately the
870 // number of queue slots, we add a fudge factor to that to account
871 // for corner cases where I/O pages are created before queue
872 // limits are checked or destroyed long after the I/O is dequeued
29a9ff4e
DK
873 Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage,
874 static_cast<int>(itemsCount * 1.1));
ea2cdeb6
DK
875}
876
f5591061 877/// initializes shared memory segments used by IpcIoFile
4404f1c5 878class IpcIoRr: public Ipc::Mem::RegisteredRunner
f5591061
DK
879{
880public:
881 /* RegisteredRunner API */
882 IpcIoRr(): owner(NULL) {}
f5591061
DK
883 virtual ~IpcIoRr();
884
4404f1c5
DK
885protected:
886 virtual void create(const RunnerRegistry &);
887
f5591061
DK
888private:
889 Ipc::FewToFewBiQueue::Owner *owner;
890};
891
892RunnerRegistrationEntry(rrAfterConfig, IpcIoRr);
893
4404f1c5 894void IpcIoRr::create(const RunnerRegistry &)
f5591061 895{
e9f68413 896 if (Config.cacheSwap.n_strands <= 0)
f5591061
DK
897 return;
898
4404f1c5 899 Must(!owner);
4404f1c5 900 owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1,
3b581957 901 Config.cacheSwap.n_strands,
4404f1c5 902 1 + Config.workers, sizeof(IpcIoMsg),
ea2cdeb6 903 QueueCapacity);
f5591061
DK
904}
905
906IpcIoRr::~IpcIoRr()
907{
908 delete owner;
909}