]> git.ipfire.org Git - thirdparty/squid.git/blame - src/DiskIO/IpcIo/IpcIoFile.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / DiskIO / IpcIo / IpcIoFile.cc
CommitLineData
254912f3
AR
1/*
2 * $Id$
3 *
4 * DEBUG: section 47 Store Directory Routines
5 */
6
7#include "config.h"
f5591061 8#include "base/RunnersRegistry.h"
254912f3
AR
9#include "base/TextException.h"
10#include "DiskIO/IORequestor.h"
11#include "DiskIO/IpcIo/IpcIoFile.h"
12#include "DiskIO/ReadRequest.h"
13#include "DiskIO/WriteRequest.h"
14#include "ipc/Messages.h"
15#include "ipc/Port.h"
f5591061 16#include "ipc/Queue.h"
9a51593d 17#include "ipc/StrandSearch.h"
254912f3 18#include "ipc/UdsOp.h"
8ed94021 19#include "ipc/mem/Pages.h"
0a11e039 20#include "SquidTime.h"
254912f3
AR
21
22CBDATA_CLASS_INIT(IpcIoFile);
23
f5591061
DK
24/// shared memory segment path to use for IpcIoFile maps
25static const char *const ShmLabel = "io_file";
26
fa61cefe 27const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
b2aa0934
DK
28IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
29IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
f5591061 30std::auto_ptr<IpcIoFile::Queue> IpcIoFile::queue;
254912f3 31
c792f7fc
AR
32bool IpcIoFile::DiskerHandleMoreRequestsScheduled = false;
33
254912f3
AR
34static bool DiskerOpen(const String &path, int flags, mode_t mode);
35static void DiskerClose(const String &path);
36
fa61cefe
AR
37/// IpcIo wrapper for debugs() streams; XXX: find a better class name
38struct SipcIo {
39 SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
9199139f 40 worker(aWorker), msg(aMsg), disker(aDisker) {}
fa61cefe
AR
41
42 int worker;
43 const IpcIoMsg &msg;
44 int disker;
45};
46
47std::ostream &
48operator <<(std::ostream &os, const SipcIo &sio)
49{
50 return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
9199139f 51 (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker;
fa61cefe
AR
52}
53
254912f3
AR
54
55IpcIoFile::IpcIoFile(char const *aDb):
9199139f
AR
56 dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
57 olderRequests(&requestMap1), newerRequests(&requestMap2),
58 timeoutCheckScheduled(false)
254912f3
AR
59{
60}
61
62IpcIoFile::~IpcIoFile()
63{
b2aa0934
DK
64 if (diskId >= 0) {
65 const IpcIoFilesMap::iterator i = IpcIoFiles.find(diskId);
66 // XXX: warn and continue?
67 Must(i != IpcIoFiles.end());
68 Must(i->second == this);
69 IpcIoFiles.erase(i);
70 }
254912f3
AR
71}
72
43ebbac3
AR
73void
74IpcIoFile::configure(const Config &cfg)
75{
76 DiskFile::configure(cfg);
77 config = cfg;
78}
79
254912f3
AR
80void
81IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
82{
83 ioRequestor = callback;
84 Must(diskId < 0); // we do not know our disker yet
f5591061
DK
85
86 if (!queue.get())
87 queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier));
254912f3
AR
88
89 if (IamDiskProcess()) {
90 error_ = !DiskerOpen(dbName, flags, mode);
f7091279
DK
91 if (error_)
92 return;
93
b2aa0934
DK
94 diskId = KidIdentifier;
95 const bool inserted =
96 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
97 Must(inserted);
9a51593d 98
df881a0f
AR
99 queue->localRateLimit() =
100 static_cast<Ipc::QueueReader::Rate::Value>(config.ioRate);
101
f7091279
DK
102 Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
103 ann.strand.tag = dbName;
104 Ipc::TypedMsgHdr message;
105 ann.pack(message);
106 SendMessage(Ipc::coordinatorAddr, message);
107
e528e570 108 ioRequestor->ioCompletedNotification();
254912f3 109 return;
9a51593d 110 }
254912f3 111
9a51593d
DK
112 Ipc::StrandSearchRequest request;
113 request.requestorId = KidIdentifier;
114 request.tag = dbName;
254912f3 115
9a51593d
DK
116 Ipc::TypedMsgHdr msg;
117 request.pack(msg);
118 Ipc::SendMessage(Ipc::coordinatorAddr, msg);
119
b2aa0934
DK
120 WaitingForOpen.push_back(this);
121
122 eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
123 this, Timeout, 0, false); // "this" pointer is used as id
254912f3
AR
124}
125
126void
9199139f
AR
127IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response)
128{
9a51593d 129 Must(diskId < 0); // we do not know our disker yet
9a51593d
DK
130
131 if (!response) {
caca86d7
AR
132 debugs(79,1, HERE << "error: timeout");
133 error_ = true;
9a51593d
DK
134 } else {
135 diskId = response->strand.kidId;
136 if (diskId >= 0) {
b2aa0934
DK
137 const bool inserted =
138 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
139 Must(inserted);
9a51593d 140 } else {
254912f3
AR
141 error_ = true;
142 debugs(79,1, HERE << "error: no disker claimed " << dbName);
9a51593d
DK
143 }
144 }
254912f3
AR
145
146 ioRequestor->ioCompletedNotification();
147}
148
149/**
150 * Alias for IpcIoFile::open(...)
151 \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
152 */
153void
154IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
155{
156 assert(false); // check
157 /* We use the same logic path for open */
158 open(flags, mode, callback);
159}
160
161void
162IpcIoFile::close()
163{
164 assert(ioRequestor != NULL);
165
166 if (IamDiskProcess())
167 DiskerClose(dbName);
9a51593d 168 // XXX: else nothing to do?
254912f3
AR
169
170 ioRequestor->closeCompleted();
171}
172
173bool
174IpcIoFile::canRead() const
175{
0a11e039 176 return diskId >= 0 && canWait();
254912f3
AR
177}
178
179bool
180IpcIoFile::canWrite() const
181{
0a11e039 182 return diskId >= 0 && canWait();
254912f3
AR
183}
184
185bool
186IpcIoFile::error() const
187{
188 return error_;
189}
190
191void
192IpcIoFile::read(ReadRequest *readRequest)
193{
194 debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " <<
9199139f 195 readRequest->offset << ")");
254912f3
AR
196
197 assert(ioRequestor != NULL);
254912f3
AR
198 assert(readRequest->offset >= 0);
199 Must(!error_);
200
201 //assert(minOffset < 0 || minOffset <= readRequest->offset);
202 //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
203
9a51593d 204 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
254912f3 205 pending->readRequest = readRequest;
7a907247 206 push(pending);
254912f3
AR
207}
208
209void
210IpcIoFile::readCompleted(ReadRequest *readRequest,
8ed94021 211 IpcIoMsg *const response)
254912f3 212{
caca86d7 213 bool ioError = false;
9a51593d 214 if (!response) {
0ef0509e 215 debugs(79, 3, HERE << "error: timeout");
caca86d7 216 ioError = true; // I/O timeout does not warrant setting error_?
9a51593d 217 } else {
70e3f706
DK
218 if (response->xerrno) {
219 debugs(79,1, HERE << "error: " << xstrerr(response->xerrno));
220 ioError = error_ = true;
9199139f 221 } else if (!response->page) {
70e3f706
DK
222 debugs(79,1, HERE << "error: run out of shared memory pages");
223 ioError = true;
224 } else {
225 const char *const buf = Ipc::Mem::PagePointer(response->page);
226 memcpy(readRequest->buf, buf, response->len);
227 }
254912f3 228
70e3f706
DK
229 Ipc::Mem::PutPage(response->page);
230 }
8ed94021 231
caca86d7 232 const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
9199139f 233 const int errflag = ioError ? DISK_ERROR :DISK_OK;
20b0e1fe 234 ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
254912f3
AR
235}
236
237void
238IpcIoFile::write(WriteRequest *writeRequest)
239{
240 debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " <<
9199139f 241 writeRequest->offset << ")");
254912f3
AR
242
243 assert(ioRequestor != NULL);
254912f3
AR
244 assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
245 assert(writeRequest->offset >= 0);
246 Must(!error_);
247
248 //assert(minOffset < 0 || minOffset <= writeRequest->offset);
249 //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
250
9a51593d 251 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
254912f3 252 pending->writeRequest = writeRequest;
7a907247 253 push(pending);
254912f3
AR
254}
255
256void
257IpcIoFile::writeCompleted(WriteRequest *writeRequest,
9a51593d 258 const IpcIoMsg *const response)
254912f3 259{
caca86d7 260 bool ioError = false;
9a51593d 261 if (!response) {
0ef0509e 262 debugs(79, 3, HERE << "error: timeout");
caca86d7 263 ioError = true; // I/O timeout does not warrant setting error_?
9199139f 264 } else if (response->xerrno) {
9a51593d
DK
265 debugs(79,1, HERE << "error: " << xstrerr(response->xerrno));
266 ioError = error_ = true;
9199139f 267 } else if (response->len != writeRequest->len) {
9a51593d 268 debugs(79,1, HERE << "problem: " << response->len << " < " << writeRequest->len);
254912f3
AR
269 error_ = true;
270 }
271
272 if (writeRequest->free_func)
273 (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
274
caca86d7 275 if (!ioError) {
254912f3 276 debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
9199139f
AR
277 diskId << " at " << writeRequest->offset);
278 }
254912f3 279
caca86d7 280 const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
9199139f 281 const int errflag = ioError ? DISK_ERROR :DISK_OK;
254912f3
AR
282 ioRequestor->writeCompleted(errflag, rlen, writeRequest);
283}
284
285bool
286IpcIoFile::ioInProgress() const
287{
9a51593d 288 return !olderRequests->empty() || !newerRequests->empty();
254912f3
AR
289}
290
9a51593d 291/// track a new pending request
254912f3 292void
7a907247 293IpcIoFile::trackPendingRequest(IpcIoPendingRequest *const pending)
254912f3 294{
7a907247 295 newerRequests->insert(std::make_pair(lastRequestId, pending));
9a51593d
DK
296 if (!timeoutCheckScheduled)
297 scheduleTimeoutCheck();
298}
254912f3 299
9a51593d
DK
300/// push an I/O request to disker
301void
7a907247 302IpcIoFile::push(IpcIoPendingRequest *const pending)
9a51593d 303{
fa61cefe 304 // prevent queue overflows: check for responses to earlier requests
f5591061 305 HandleResponses("before push");
fa61cefe 306
a1c98830 307 debugs(47, 7, HERE);
9a51593d 308 Must(diskId >= 0);
7a907247
DK
309 Must(pending);
310 Must(pending->readRequest || pending->writeRequest);
9a51593d
DK
311
312 IpcIoMsg ipcIo;
8ed94021
DK
313 try {
314 ipcIo.requestId = lastRequestId;
0a11e039 315 ipcIo.start = current_time;
8ed94021
DK
316 if (pending->readRequest) {
317 ipcIo.command = IpcIo::cmdRead;
318 ipcIo.offset = pending->readRequest->offset;
319 ipcIo.len = pending->readRequest->len;
320 } else { // pending->writeRequest
321 Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
551f8a18 322 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
8ed94021 323 ipcIo.len = 0;
551f8a18 324 throw TexcHere("run out of shared memory pages for IPC I/O");
8ed94021
DK
325 }
326 ipcIo.command = IpcIo::cmdWrite;
327 ipcIo.offset = pending->writeRequest->offset;
328 ipcIo.len = pending->writeRequest->len;
329 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
330 memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
331 }
254912f3 332
f5591061 333 debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
fa61cefe 334
f5591061 335 if (queue->push(diskId, ipcIo))
fa61cefe 336 Notify(diskId); // must notify disker
7a907247 337 trackPendingRequest(pending);
f5591061 338 } catch (const Queue::Full &) {
fa61cefe
AR
339 debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " <<
340 SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
341 // TODO: grow queue size
9199139f 342
0ef0509e 343 pending->completeIo(NULL);
8ed94021
DK
344 delete pending;
345 } catch (const TextException &e) {
346 debugs(47, DBG_IMPORTANT, HERE << e.what());
0ef0509e 347 pending->completeIo(NULL);
7a907247 348 delete pending;
9a51593d 349 }
254912f3
AR
350}
351
0a11e039
AR
352/// whether we think there is enough time to complete the I/O
353bool
9199139f
AR
354IpcIoFile::canWait() const
355{
43ebbac3 356 if (!config.ioTimeout)
0a11e039
AR
357 return true; // no timeout specified
358
359 IpcIoMsg oldestIo;
360 if (!queue->peek(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
361 return true; // we cannot estimate expected wait time; assume it is OK
362
55939a01
AR
363 const int oldestWait = tvSubMsec(oldestIo.start, current_time);
364
365 int rateWait = -1; // time in millisecons
366 const Ipc::QueueReader::Rate::Value ioRate = queue->rateLimit(diskId);
367 if (ioRate > 0) {
368 // if there are N requests pending, the new one will wait at
369 // least N/max-swap-rate seconds
75017bc9 370 rateWait = static_cast<int>(1e3 * queue->outSize(diskId) / ioRate);
55939a01
AR
371 // adjust N/max-swap-rate value based on the queue "balance"
372 // member, in case we have been borrowing time against future
373 // I/O already
374 rateWait += queue->balance(diskId);
375 }
376
377 const int expectedWait = max(oldestWait, rateWait);
0a11e039 378 if (expectedWait < 0 ||
43ebbac3 379 static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
0a11e039
AR
380 return true; // expected wait time is acceptible
381
c792f7fc
AR
382 debugs(47,2, HERE << "cannot wait: " << expectedWait <<
383 " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
0a11e039
AR
384 return false; // do not want to wait that long
385}
386
9a51593d 387/// called when coordinator responds to worker open request
254912f3 388void
9a51593d 389IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
254912f3 390{
9a51593d 391 debugs(47, 7, HERE << "coordinator response to open request");
b2aa0934 392 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
9199139f 393 i != WaitingForOpen.end(); ++i) {
b2aa0934
DK
394 if (response.strand.tag == (*i)->dbName) {
395 (*i)->openCompleted(&response);
396 WaitingForOpen.erase(i);
397 return;
398 }
399 }
400
401 debugs(47, 4, HERE << "LATE disker response to open for " <<
402 response.strand.tag);
403 // nothing we can do about it; completeIo() has been called already
9a51593d 404}
254912f3 405
9a51593d 406void
f5591061 407IpcIoFile::HandleResponses(const char *const when)
fa61cefe
AR
408{
409 debugs(47, 4, HERE << "popping all " << when);
fa61cefe
AR
410 IpcIoMsg ipcIo;
411 // get all responses we can: since we are not pushing, this will stop
f5591061
DK
412 int diskId;
413 while (queue->pop(diskId, ipcIo)) {
414 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
415 Must(i != IpcIoFiles.end()); // TODO: warn but continue
416 i->second->handleResponse(ipcIo);
417 }
9a51593d 418}
254912f3 419
9a51593d 420void
8ed94021 421IpcIoFile::handleResponse(IpcIoMsg &ipcIo)
9a51593d
DK
422{
423 const int requestId = ipcIo.requestId;
fa61cefe 424 debugs(47, 7, HERE << "popped disker response: " <<
9199139f 425 SipcIo(KidIdentifier, ipcIo, diskId));
fa61cefe 426
9a51593d
DK
427 Must(requestId);
428 if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
429 pending->completeIo(&ipcIo);
caca86d7
AR
430 delete pending; // XXX: leaking if throwing
431 } else {
9a51593d
DK
432 debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command <<
433 "; ipcIo" << KidIdentifier << '.' << requestId);
caca86d7
AR
434 // nothing we can do about it; completeIo() has been called already
435 }
254912f3
AR
436}
437
254912f3 438void
9a51593d 439IpcIoFile::Notify(const int peerId)
254912f3 440{
fa61cefe 441 // TODO: Count and report the total number of notifications, pops, pushes.
a1c98830 442 debugs(47, 7, HERE << "kid" << peerId);
9a51593d
DK
443 Ipc::TypedMsgHdr msg;
444 msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
445 msg.putInt(KidIdentifier);
446 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, peerId);
447 Ipc::SendMessage(addr, msg);
448}
449
450void
451IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
452{
fa61cefe
AR
453 const int from = msg.getInt();
454 debugs(47, 7, HERE << "from " << from);
f5591061
DK
455 queue->clearReaderSignal(from);
456 if (IamDiskProcess())
457 DiskerHandleRequests();
458 else
459 HandleResponses("after notification");
9a51593d
DK
460}
461
b2aa0934
DK
462/// handles open request timeout
463void
464IpcIoFile::OpenTimeout(void *const param)
465{
466 Must(param);
467 // the pointer is used for comparison only and not dereferenced
468 const IpcIoFile *const ipcIoFile =
469 reinterpret_cast<const IpcIoFile *>(param);
470 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
9199139f 471 i != WaitingForOpen.end(); ++i) {
b2aa0934
DK
472 if (*i == ipcIoFile) {
473 (*i)->openCompleted(NULL);
474 WaitingForOpen.erase(i);
475 break;
476 }
477 }
478}
479
9a51593d
DK
480/// IpcIoFile::checkTimeouts wrapper
481void
482IpcIoFile::CheckTimeouts(void *const param)
483{
9a51593d 484 Must(param);
b2aa0934
DK
485 const int diskId = reinterpret_cast<uintptr_t>(param);
486 debugs(47, 7, HERE << "diskId=" << diskId);
487 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
488 if (i != IpcIoFiles.end())
489 i->second->checkTimeouts();
9a51593d
DK
490}
491
492void
493IpcIoFile::checkTimeouts()
494{
9a51593d 495 timeoutCheckScheduled = false;
254912f3 496
0ef0509e
AR
497 // last chance to recover in case a notification message was lost, etc.
498 const RequestMap::size_type timeoutsBefore = olderRequests->size();
499 HandleResponses("before timeout");
500 const RequestMap::size_type timeoutsNow = olderRequests->size();
501
502 if (timeoutsBefore > timeoutsNow) { // some requests were rescued
503 // notification message lost or significantly delayed?
504 debugs(47, DBG_IMPORTANT, "WARNING: communication with disker " <<
505 "may be too slow or disrupted for about " <<
506 Timeout << "s; rescued " << (timeoutsBefore - timeoutsNow) <<
507 " out of " << timeoutsBefore << " I/Os");
508 }
509
510 if (timeoutsNow) {
511 debugs(47, DBG_IMPORTANT, "WARNING: abandoning " <<
512 timeoutsNow << " I/Os after at least " <<
513 Timeout << "s timeout");
514 }
515
caca86d7
AR
516 // any old request would have timed out by now
517 typedef RequestMap::const_iterator RMCI;
9a51593d
DK
518 for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
519 IpcIoPendingRequest *const pending = i->second;
254912f3 520
b2aa0934 521 const unsigned int requestId = i->first;
caca86d7 522 debugs(47, 7, HERE << "disker timeout; ipcIo" <<
b2aa0934 523 KidIdentifier << '.' << requestId);
254912f3 524
caca86d7
AR
525 pending->completeIo(NULL); // no response
526 delete pending; // XXX: leaking if throwing
9a51593d
DK
527 }
528 olderRequests->clear();
caca86d7 529
9a51593d 530 swap(olderRequests, newerRequests); // switches pointers around
0ef0509e 531 if (!olderRequests->empty() && !timeoutCheckScheduled)
9a51593d 532 scheduleTimeoutCheck();
254912f3
AR
533}
534
caca86d7 535/// prepare to check for timeouts in a little while
254912f3 536void
9a51593d 537IpcIoFile::scheduleTimeoutCheck()
254912f3 538{
b2aa0934 539 // we check all older requests at once so some may be wait for 2*Timeout
caca86d7 540 eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
b2aa0934 541 reinterpret_cast<void *>(diskId), Timeout, 0, false);
9a51593d 542 timeoutCheckScheduled = true;
254912f3
AR
543}
544
545/// returns and forgets the right IpcIoFile pending request
546IpcIoPendingRequest *
9a51593d 547IpcIoFile::dequeueRequest(const unsigned int requestId)
254912f3 548{
254912f3 549 Must(requestId != 0);
caca86d7 550
9a51593d
DK
551 RequestMap *map = NULL;
552 RequestMap::iterator i = requestMap1.find(requestId);
caca86d7 553
9a51593d
DK
554 if (i != requestMap1.end())
555 map = &requestMap1;
caca86d7 556 else {
9a51593d
DK
557 i = requestMap2.find(requestId);
558 if (i != requestMap2.end())
559 map = &requestMap2;
caca86d7
AR
560 }
561
562 if (!map) // not found in both maps
563 return NULL;
564
565 IpcIoPendingRequest *pending = i->second;
566 map->erase(i);
567 return pending;
254912f3
AR
568}
569
570int
9a51593d 571IpcIoFile::getFD() const
254912f3
AR
572{
573 assert(false); // not supported; TODO: remove this method from API
574 return -1;
575}
576
577
9a51593d 578/* IpcIoMsg */
254912f3 579
9a51593d 580IpcIoMsg::IpcIoMsg():
9199139f 581 requestId(0), offset(0), len(0), command(IpcIo::cmdNone), xerrno(0)
254912f3 582{
0a11e039 583 start.tv_sec = 0;
254912f3
AR
584}
585
caca86d7 586/* IpcIoPendingRequest */
254912f3
AR
587
588IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
9199139f 589 file(aFile), readRequest(NULL), writeRequest(NULL)
254912f3 590{
b2aa0934 591 Must(file != NULL);
9a51593d
DK
592 if (++file->lastRequestId == 0) // don't use zero value as requestId
593 ++file->lastRequestId;
254912f3
AR
594}
595
caca86d7 596void
8ed94021 597IpcIoPendingRequest::completeIo(IpcIoMsg *const response)
caca86d7 598{
caca86d7
AR
599 if (readRequest)
600 file->readCompleted(readRequest, response);
9199139f 601 else if (writeRequest)
caca86d7 602 file->writeCompleted(writeRequest, response);
9a51593d
DK
603 else {
604 Must(!response); // only timeouts are handled here
605 file->openCompleted(NULL);
606 }
caca86d7
AR
607}
608
609
254912f3
AR
610
611/* XXX: disker code that should probably be moved elsewhere */
612
613static int TheFile = -1; ///< db file descriptor
614
9a51593d
DK
615static void
616diskerRead(IpcIoMsg &ipcIo)
254912f3 617{
551f8a18 618 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
8ed94021 619 ipcIo.len = 0;
c792f7fc 620 debugs(47,2, HERE << "run out of shared memory pages for IPC I/O");
8ed94021
DK
621 return;
622 }
623
624 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
625 const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
2d338731
AR
626 statCounter.syscalls.disk.reads++;
627 fd_bytes(TheFile, read, FD_READ);
628
254912f3 629 if (read >= 0) {
9a51593d
DK
630 ipcIo.xerrno = 0;
631 const size_t len = static_cast<size_t>(read); // safe because read > 0
254912f3 632 debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
9199139f 633 (len == ipcIo.len ? "all " : "just ") << read);
9a51593d
DK
634 ipcIo.len = len;
635 } else {
636 ipcIo.xerrno = errno;
637 ipcIo.len = 0;
254912f3 638 debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
9199139f 639 ipcIo.xerrno);
9a51593d 640 }
254912f3
AR
641}
642
9a51593d
DK
643static void
644diskerWrite(IpcIoMsg &ipcIo)
254912f3 645{
8ed94021
DK
646 const char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
647 const ssize_t wrote = pwrite(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
2d338731
AR
648 statCounter.syscalls.disk.writes++;
649 fd_bytes(TheFile, wrote, FD_WRITE);
650
254912f3 651 if (wrote >= 0) {
9a51593d
DK
652 ipcIo.xerrno = 0;
653 const size_t len = static_cast<size_t>(wrote); // safe because wrote > 0
254912f3 654 debugs(47,8, HERE << "disker" << KidIdentifier << " wrote " <<
9199139f 655 (len == ipcIo.len ? "all " : "just ") << wrote);
9a51593d
DK
656 ipcIo.len = len;
657 } else {
658 ipcIo.xerrno = errno;
659 ipcIo.len = 0;
254912f3 660 debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " <<
9a51593d
DK
661 ipcIo.xerrno);
662 }
8ed94021
DK
663
664 Ipc::Mem::PutPage(ipcIo.page);
254912f3
AR
665}
666
c792f7fc
AR
667
668void
df881a0f 669IpcIoFile::DiskerHandleMoreRequests(void *source)
c792f7fc 670{
df881a0f
AR
671 debugs(47, 7, HERE << "resuming handling requests after " <<
672 static_cast<const char *>(source));
c792f7fc
AR
673 DiskerHandleMoreRequestsScheduled = false;
674 IpcIoFile::DiskerHandleRequests();
675}
676
df881a0f
AR
677bool
678IpcIoFile::WaitBeforePop()
679{
680 const Ipc::QueueReader::Rate::Value ioRate = queue->localRateLimit();
681 const double maxRate = ioRate/1e3; // req/ms
682
683 // do we need to enforce configured I/O rate?
684 if (maxRate <= 0)
685 return false;
686
687 // is there an I/O request we could potentially delay?
688 if (!queue->popReady()) {
689 // unlike pop(), popReady() is not reliable and does not block reader
690 // so we must proceed with pop() even if it is likely to fail
691 return false;
692 }
693
9a585a30 694 static timeval LastIo = current_time;
df881a0f
AR
695
696 const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
697 // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
698 const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
699
700 const double credit = ioDuration; // what the last I/O should have cost us
701 const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
702 LastIo = current_time;
703
704 Ipc::QueueReader::Balance &balance = queue->localBalance();
705 balance += static_cast<int64_t>(credit - debit);
706
707 debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
708
709 if (balance > maxImbalance) {
710 // if we accumulated too much time for future slow I/Os,
711 // then shed accumulated time to keep just half of the excess
712 const int64_t toSpend = balance - maxImbalance/2;
69321ae9
AR
713
714 if (toSpend/1e3 > Timeout)
715 debugs(47, DBG_IMPORTANT, "WARNING: Rock disker delays I/O " <<
716 "requests for " << (toSpend/1e3) << " seconds to obey " <<
717 ioRate << "/sec rate limit");
718
df881a0f
AR
719 debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" <<
720 (1e3*maxRate) << "/sec rate");
721 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
722 &IpcIoFile::DiskerHandleMoreRequests,
723 const_cast<char*>("rate limiting"),
724 toSpend/1e3, 0, false);
725 DiskerHandleMoreRequestsScheduled = true;
726 return true;
e29ccb57 727 } else if (balance < -maxImbalance) {
df881a0f
AR
728 // do not owe "too much" to avoid "too large" bursts of I/O
729 balance = -maxImbalance;
730 }
731
732 return false;
733}
734
254912f3 735void
f5591061 736IpcIoFile::DiskerHandleRequests()
254912f3 737{
c792f7fc 738 // Balance our desire to maximize the number of concurrent I/O requests
fdb3059b 739 // (reordred by OS to minimize seek time) with a requirement to
c792f7fc 740 // send 1st-I/O notification messages, process Coordinator events, etc.
fdb3059b
AR
741 const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms
742 const timeval loopStart = current_time;
743
c792f7fc 744 int popped = 0;
fa61cefe
AR
745 int workerId = 0;
746 IpcIoMsg ipcIo;
df881a0f 747 while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
fdb3059b
AR
748 ++popped;
749
750 // at least one I/O per call is guaranteed if the queue is not empty
fa61cefe
AR
751 DiskerHandleRequest(workerId, ipcIo);
752
fdb3059b
AR
753 getCurrentTime();
754 const double elapsedMsec = tvSubMsec(loopStart, current_time);
755 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
756 if (!DiskerHandleMoreRequestsScheduled) {
757 // the gap must be positive for select(2) to be given a chance
758 const double minBreakSecs = 0.001;
759 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
9199139f 760 &IpcIoFile::DiskerHandleMoreRequests,
df881a0f
AR
761 const_cast<char*>("long I/O loop"),
762 minBreakSecs, 0, false);
fdb3059b
AR
763 DiskerHandleMoreRequestsScheduled = true;
764 }
765 debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " <<
9199139f 766 elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O");
fdb3059b
AR
767 break;
768 }
c792f7fc
AR
769 }
770
fdb3059b 771 // TODO: consider using O_DIRECT with "elevator" optimization where we pop
fa61cefe
AR
772 // requests first, then reorder the popped requests to optimize seek time,
773 // then do I/O, then take a break, and come back for the next set of I/O
774 // requests.
9a51593d 775}
254912f3 776
9a51593d
DK
777/// called when disker receives an I/O request
778void
779IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
780{
9a51593d 781 if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
254912f3 782 debugs(0,0, HERE << "disker" << KidIdentifier <<
9a51593d
DK
783 " should not receive " << ipcIo.command <<
784 " ipcIo" << workerId << '.' << ipcIo.requestId);
785 return;
786 }
787
788 debugs(47,5, HERE << "disker" << KidIdentifier <<
789 (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
790 ipcIo.len << " at " << ipcIo.offset <<
791 " ipcIo" << workerId << '.' << ipcIo.requestId);
792
793 if (ipcIo.command == IpcIo::cmdRead)
794 diskerRead(ipcIo);
795 else // ipcIo.command == IpcIo::cmdWrite
796 diskerWrite(ipcIo);
797
f5591061 798 debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
fa61cefe 799
7a907247 800 try {
f5591061 801 if (queue->push(workerId, ipcIo))
5e44782e 802 Notify(workerId); // must notify worker
f5591061 803 } catch (const Queue::Full &) {
fa61cefe
AR
804 // The worker queue should not overflow because the worker should pop()
805 // before push()ing and because if disker pops N requests at a time,
806 // we should make sure the worker pop() queue length is the worker
807 // push queue length plus N+1. XXX: implement the N+1 difference.
808 debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue overflow: " <<
5e44782e 809 SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
fa61cefe
AR
810
811 // the I/O request we could not push will timeout
7a907247 812 }
254912f3
AR
813}
814
815static bool
816DiskerOpen(const String &path, int flags, mode_t mode)
817{
818 assert(TheFile < 0);
819
820 TheFile = file_open(path.termedBuf(), flags);
821
822 if (TheFile < 0) {
823 const int xerrno = errno;
824 debugs(47,0, HERE << "rock db error opening " << path << ": " <<
825 xstrerr(xerrno));
826 return false;
9a51593d 827 }
254912f3
AR
828
829 store_open_disk_fd++;
830 debugs(79,3, HERE << "rock db opened " << path << ": FD " << TheFile);
9a51593d 831 return true;
254912f3
AR
832}
833
834static void
835DiskerClose(const String &path)
836{
837 if (TheFile >= 0) {
838 file_close(TheFile);
839 debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
840 TheFile = -1;
841 store_open_disk_fd--;
a1c98830 842 }
254912f3 843}
f5591061
DK
844
845
846/// initializes shared memory segments used by IpcIoFile
4404f1c5 847class IpcIoRr: public Ipc::Mem::RegisteredRunner
f5591061
DK
848{
849public:
850 /* RegisteredRunner API */
851 IpcIoRr(): owner(NULL) {}
f5591061
DK
852 virtual ~IpcIoRr();
853
4404f1c5
DK
854protected:
855 virtual void create(const RunnerRegistry &);
856
f5591061
DK
857private:
858 Ipc::FewToFewBiQueue::Owner *owner;
859};
860
861RunnerRegistrationEntry(rrAfterConfig, IpcIoRr);
862
863
4404f1c5 864void IpcIoRr::create(const RunnerRegistry &)
f5591061
DK
865{
866 if (!UsingSmp())
867 return;
868
4404f1c5
DK
869 Must(!owner);
870 // XXX: make capacity configurable
871 owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1,
3b581957 872 Config.cacheSwap.n_strands,
4404f1c5
DK
873 1 + Config.workers, sizeof(IpcIoMsg),
874 1024);
f5591061
DK
875}
876
877IpcIoRr::~IpcIoRr()
878{
879 delete owner;
880}