]> git.ipfire.org Git - thirdparty/squid.git/blob - src/DiskIO/IpcIo/IpcIoFile.cc
Merged from trunk r13474.
[thirdparty/squid.git] / src / DiskIO / IpcIo / IpcIoFile.cc
1 /*
2 * DEBUG: section 47 Store Directory Routines
3 */
4
5 #include "squid.h"
6 #include "base/RunnersRegistry.h"
7 #include "base/TextException.h"
8 #include "disk.h"
9 #include "DiskIO/IORequestor.h"
10 #include "DiskIO/IpcIo/IpcIoFile.h"
11 #include "DiskIO/ReadRequest.h"
12 #include "DiskIO/WriteRequest.h"
13 #include "fd.h"
14 #include "globals.h"
15 #include "ipc/mem/Pages.h"
16 #include "ipc/Messages.h"
17 #include "ipc/Port.h"
18 #include "ipc/Queue.h"
19 #include "ipc/StrandSearch.h"
20 #include "ipc/UdsOp.h"
21 #include "SBuf.h"
22 #include "SquidConfig.h"
23 #include "SquidTime.h"
24 #include "StatCounters.h"
25 #include "tools.h"
26
27 #include <cerrno>
28
29 CBDATA_CLASS_INIT(IpcIoFile);
30
31 /// shared memory segment path to use for IpcIoFile maps
32 static const char *const ShmLabel = "io_file";
33 /// a single worker-to-disker or disker-to-worker queue capacity; up
34 /// to 2*QueueCapacity I/O requests queued between a single worker and
35 /// a single disker
36 // TODO: make configurable or compute from squid.conf settings if possible
37 static const int QueueCapacity = 1024;
38
39 const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
40 IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
41 IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
42 std::auto_ptr<IpcIoFile::Queue> IpcIoFile::queue;
43
44 bool IpcIoFile::DiskerHandleMoreRequestsScheduled = false;
45
46 static bool DiskerOpen(const SBuf &path, int flags, mode_t mode);
47 static void DiskerClose(const SBuf &path);
48
49 /// IpcIo wrapper for debugs() streams; XXX: find a better class name
50 struct SipcIo {
51 SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
52 worker(aWorker), msg(aMsg), disker(aDisker) {}
53
54 int worker;
55 const IpcIoMsg &msg;
56 int disker;
57 };
58
59 std::ostream &
60 operator <<(std::ostream &os, const SipcIo &sio)
61 {
62 return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
63 (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker;
64 }
65
66 IpcIoFile::IpcIoFile(char const *aDb):
67 dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
68 olderRequests(&requestMap1), newerRequests(&requestMap2),
69 timeoutCheckScheduled(false)
70 {
71 }
72
73 IpcIoFile::~IpcIoFile()
74 {
75 if (diskId >= 0) {
76 const IpcIoFilesMap::iterator i = IpcIoFiles.find(diskId);
77 // XXX: warn and continue?
78 Must(i != IpcIoFiles.end());
79 Must(i->second == this);
80 IpcIoFiles.erase(i);
81 }
82 }
83
84 void
85 IpcIoFile::configure(const Config &cfg)
86 {
87 DiskFile::configure(cfg);
88 config = cfg;
89 }
90
91 void
92 IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
93 {
94 ioRequestor = callback;
95 Must(diskId < 0); // we do not know our disker yet
96
97 if (!queue.get())
98 queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier));
99
100 if (IamDiskProcess()) {
101 error_ = !DiskerOpen(SBuf(dbName.termedBuf()), flags, mode);
102 if (error_)
103 return;
104
105 diskId = KidIdentifier;
106 const bool inserted =
107 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
108 Must(inserted);
109
110 queue->localRateLimit() =
111 static_cast<Ipc::QueueReader::Rate::Value>(config.ioRate);
112
113 Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
114 ann.strand.tag = dbName;
115 Ipc::TypedMsgHdr message;
116 ann.pack(message);
117 SendMessage(Ipc::Port::CoordinatorAddr(), message);
118
119 ioRequestor->ioCompletedNotification();
120 return;
121 }
122
123 Ipc::StrandSearchRequest request;
124 request.requestorId = KidIdentifier;
125 request.tag = dbName;
126
127 Ipc::TypedMsgHdr msg;
128 request.pack(msg);
129 Ipc::SendMessage(Ipc::Port::CoordinatorAddr(), msg);
130
131 WaitingForOpen.push_back(this);
132
133 eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
134 this, Timeout, 0, false); // "this" pointer is used as id
135 }
136
137 void
138 IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response)
139 {
140 Must(diskId < 0); // we do not know our disker yet
141
142 if (!response) {
143 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " communication " <<
144 "channel establishment timeout");
145 error_ = true;
146 } else {
147 diskId = response->strand.kidId;
148 if (diskId >= 0) {
149 const bool inserted =
150 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
151 Must(inserted);
152 } else {
153 error_ = true;
154 debugs(79, DBG_IMPORTANT, "ERROR: no disker claimed " <<
155 "responsibility for " << dbName);
156 }
157 }
158
159 ioRequestor->ioCompletedNotification();
160 }
161
162 /**
163 * Alias for IpcIoFile::open(...)
164 \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
165 */
166 void
167 IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
168 {
169 assert(false); // check
170 /* We use the same logic path for open */
171 open(flags, mode, callback);
172 }
173
174 void
175 IpcIoFile::close()
176 {
177 assert(ioRequestor != NULL);
178
179 if (IamDiskProcess())
180 DiskerClose(SBuf(dbName.termedBuf()));
181 // XXX: else nothing to do?
182
183 ioRequestor->closeCompleted();
184 }
185
186 bool
187 IpcIoFile::canRead() const
188 {
189 return diskId >= 0 && !error_ && canWait();
190 }
191
192 bool
193 IpcIoFile::canWrite() const
194 {
195 return diskId >= 0 && !error_ && canWait();
196 }
197
198 bool
199 IpcIoFile::error() const
200 {
201 return error_;
202 }
203
204 void
205 IpcIoFile::read(ReadRequest *readRequest)
206 {
207 debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " <<
208 readRequest->offset << ")");
209
210 assert(ioRequestor != NULL);
211 assert(readRequest->offset >= 0);
212 Must(!error_);
213
214 //assert(minOffset < 0 || minOffset <= readRequest->offset);
215 //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
216
217 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
218 pending->readRequest = readRequest;
219 push(pending);
220 }
221
222 void
223 IpcIoFile::readCompleted(ReadRequest *readRequest,
224 IpcIoMsg *const response)
225 {
226 bool ioError = false;
227 if (!response) {
228 debugs(79, 3, HERE << "error: timeout");
229 ioError = true; // I/O timeout does not warrant setting error_?
230 } else {
231 if (response->xerrno) {
232 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read: " <<
233 xstrerr(response->xerrno));
234 ioError = error_ = true;
235 } else if (!response->page) {
236 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read ran " <<
237 "out of shared memory pages");
238 ioError = true;
239 } else {
240 const char *const buf = Ipc::Mem::PagePointer(response->page);
241 memcpy(readRequest->buf, buf, response->len);
242 }
243
244 Ipc::Mem::PutPage(response->page);
245 }
246
247 const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
248 const int errflag = ioError ? DISK_ERROR :DISK_OK;
249 ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
250 }
251
252 void
253 IpcIoFile::write(WriteRequest *writeRequest)
254 {
255 debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " <<
256 writeRequest->offset << ")");
257
258 assert(ioRequestor != NULL);
259 assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
260 assert(writeRequest->offset >= 0);
261 Must(!error_);
262
263 //assert(minOffset < 0 || minOffset <= writeRequest->offset);
264 //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
265
266 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
267 pending->writeRequest = writeRequest;
268 push(pending);
269 }
270
271 void
272 IpcIoFile::writeCompleted(WriteRequest *writeRequest,
273 const IpcIoMsg *const response)
274 {
275 bool ioError = false;
276 if (!response) {
277 debugs(79, 3, "disker " << diskId << " timeout");
278 ioError = true; // I/O timeout does not warrant setting error_?
279 } else if (response->xerrno) {
280 debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId <<
281 " error writing " << writeRequest->len << " bytes at " <<
282 writeRequest->offset << ": " << xstrerr(response->xerrno) <<
283 "; this worker will stop using " << dbName);
284 ioError = error_ = true;
285 } else if (response->len != writeRequest->len) {
286 debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId << " wrote " <<
287 response->len << " instead of " << writeRequest->len <<
288 " bytes (offset " << writeRequest->offset << "); " <<
289 "this worker will stop using " << dbName);
290 error_ = true;
291 }
292
293 if (writeRequest->free_func)
294 (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
295
296 if (!ioError) {
297 debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
298 diskId << " at " << writeRequest->offset);
299 }
300
301 const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
302 const int errflag = ioError ? DISK_ERROR :DISK_OK;
303 ioRequestor->writeCompleted(errflag, rlen, writeRequest);
304 }
305
306 bool
307 IpcIoFile::ioInProgress() const
308 {
309 return !olderRequests->empty() || !newerRequests->empty();
310 }
311
312 /// track a new pending request
313 void
314 IpcIoFile::trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
315 {
316 const std::pair<RequestMap::iterator,bool> result =
317 newerRequests->insert(std::make_pair(id, pending));
318 Must(result.second); // failures means that id was not unique
319 if (!timeoutCheckScheduled)
320 scheduleTimeoutCheck();
321 }
322
323 /// push an I/O request to disker
324 void
325 IpcIoFile::push(IpcIoPendingRequest *const pending)
326 {
327 // prevent queue overflows: check for responses to earlier requests
328 // warning: this call may result in indirect push() recursion
329 HandleResponses("before push");
330
331 debugs(47, 7, HERE);
332 Must(diskId >= 0);
333 Must(pending);
334 Must(pending->readRequest || pending->writeRequest);
335
336 IpcIoMsg ipcIo;
337 try {
338 if (++lastRequestId == 0) // don't use zero value as requestId
339 ++lastRequestId;
340 ipcIo.requestId = lastRequestId;
341 ipcIo.start = current_time;
342 if (pending->readRequest) {
343 ipcIo.command = IpcIo::cmdRead;
344 ipcIo.offset = pending->readRequest->offset;
345 ipcIo.len = pending->readRequest->len;
346 } else { // pending->writeRequest
347 Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
348 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
349 ipcIo.len = 0;
350 throw TexcHere("run out of shared memory pages for IPC I/O");
351 }
352 ipcIo.command = IpcIo::cmdWrite;
353 ipcIo.offset = pending->writeRequest->offset;
354 ipcIo.len = pending->writeRequest->len;
355 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
356 memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
357 }
358
359 debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
360
361 if (queue->push(diskId, ipcIo))
362 Notify(diskId); // must notify disker
363 trackPendingRequest(ipcIo.requestId, pending);
364 } catch (const Queue::Full &) {
365 debugs(47, DBG_IMPORTANT, "ERROR: worker I/O push queue for " <<
366 dbName << " overflow: " <<
367 SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
368 // TODO: grow queue size
369
370 pending->completeIo(NULL);
371 delete pending;
372 } catch (const TextException &e) {
373 debugs(47, DBG_IMPORTANT, "ERROR: " << dbName << " exception: " << e.what());
374 pending->completeIo(NULL);
375 delete pending;
376 }
377 }
378
379 /// whether we think there is enough time to complete the I/O
380 bool
381 IpcIoFile::canWait() const
382 {
383 if (!config.ioTimeout)
384 return true; // no timeout specified
385
386 IpcIoMsg oldestIo;
387 if (!queue->findOldest(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
388 return true; // we cannot estimate expected wait time; assume it is OK
389
390 const int oldestWait = tvSubMsec(oldestIo.start, current_time);
391
392 int rateWait = -1; // time in millisecons
393 const Ipc::QueueReader::Rate::Value ioRate = queue->rateLimit(diskId);
394 if (ioRate > 0) {
395 // if there are N requests pending, the new one will wait at
396 // least N/max-swap-rate seconds
397 rateWait = static_cast<int>(1e3 * queue->outSize(diskId) / ioRate);
398 // adjust N/max-swap-rate value based on the queue "balance"
399 // member, in case we have been borrowing time against future
400 // I/O already
401 rateWait += queue->balance(diskId);
402 }
403
404 const int expectedWait = max(oldestWait, rateWait);
405 if (expectedWait < 0 ||
406 static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
407 return true; // expected wait time is acceptible
408
409 debugs(47,2, HERE << "cannot wait: " << expectedWait <<
410 " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
411 return false; // do not want to wait that long
412 }
413
414 /// called when coordinator responds to worker open request
415 void
416 IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
417 {
418 debugs(47, 7, HERE << "coordinator response to open request");
419 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
420 i != WaitingForOpen.end(); ++i) {
421 if (response.strand.tag == (*i)->dbName) {
422 (*i)->openCompleted(&response);
423 WaitingForOpen.erase(i);
424 return;
425 }
426 }
427
428 debugs(47, 4, HERE << "LATE disker response to open for " <<
429 response.strand.tag);
430 // nothing we can do about it; completeIo() has been called already
431 }
432
433 void
434 IpcIoFile::HandleResponses(const char *const when)
435 {
436 debugs(47, 4, HERE << "popping all " << when);
437 IpcIoMsg ipcIo;
438 // get all responses we can: since we are not pushing, this will stop
439 int diskId;
440 while (queue->pop(diskId, ipcIo)) {
441 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
442 Must(i != IpcIoFiles.end()); // TODO: warn but continue
443 i->second->handleResponse(ipcIo);
444 }
445 }
446
447 void
448 IpcIoFile::handleResponse(IpcIoMsg &ipcIo)
449 {
450 const int requestId = ipcIo.requestId;
451 debugs(47, 7, HERE << "popped disker response: " <<
452 SipcIo(KidIdentifier, ipcIo, diskId));
453
454 Must(requestId);
455 if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
456 pending->completeIo(&ipcIo);
457 delete pending; // XXX: leaking if throwing
458 } else {
459 debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command <<
460 "; ipcIo" << KidIdentifier << '.' << requestId);
461 // nothing we can do about it; completeIo() has been called already
462 }
463 }
464
465 void
466 IpcIoFile::Notify(const int peerId)
467 {
468 // TODO: Count and report the total number of notifications, pops, pushes.
469 debugs(47, 7, HERE << "kid" << peerId);
470 Ipc::TypedMsgHdr msg;
471 msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
472 msg.putInt(KidIdentifier);
473 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, peerId);
474 Ipc::SendMessage(addr, msg);
475 }
476
477 void
478 IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
479 {
480 const int from = msg.getInt();
481 debugs(47, 7, HERE << "from " << from);
482 queue->clearReaderSignal(from);
483 if (IamDiskProcess())
484 DiskerHandleRequests();
485 else
486 HandleResponses("after notification");
487 }
488
489 /// handles open request timeout
490 void
491 IpcIoFile::OpenTimeout(void *const param)
492 {
493 Must(param);
494 // the pointer is used for comparison only and not dereferenced
495 const IpcIoFile *const ipcIoFile =
496 reinterpret_cast<const IpcIoFile *>(param);
497 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
498 i != WaitingForOpen.end(); ++i) {
499 if (*i == ipcIoFile) {
500 (*i)->openCompleted(NULL);
501 WaitingForOpen.erase(i);
502 break;
503 }
504 }
505 }
506
507 /// IpcIoFile::checkTimeouts wrapper
508 void
509 IpcIoFile::CheckTimeouts(void *const param)
510 {
511 Must(param);
512 const int diskId = reinterpret_cast<uintptr_t>(param);
513 debugs(47, 7, HERE << "diskId=" << diskId);
514 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
515 if (i != IpcIoFiles.end())
516 i->second->checkTimeouts();
517 }
518
519 void
520 IpcIoFile::checkTimeouts()
521 {
522 timeoutCheckScheduled = false;
523
524 // last chance to recover in case a notification message was lost, etc.
525 const RequestMap::size_type timeoutsBefore = olderRequests->size();
526 HandleResponses("before timeout");
527 const RequestMap::size_type timeoutsNow = olderRequests->size();
528
529 if (timeoutsBefore > timeoutsNow) { // some requests were rescued
530 // notification message lost or significantly delayed?
531 debugs(47, DBG_IMPORTANT, "WARNING: communication with " << dbName <<
532 " may be too slow or disrupted for about " <<
533 Timeout << "s; rescued " << (timeoutsBefore - timeoutsNow) <<
534 " out of " << timeoutsBefore << " I/Os");
535 }
536
537 if (timeoutsNow) {
538 debugs(47, DBG_IMPORTANT, "WARNING: abandoning " <<
539 timeoutsNow << ' ' << dbName << " I/Os after at least " <<
540 Timeout << "s timeout");
541 }
542
543 // any old request would have timed out by now
544 typedef RequestMap::const_iterator RMCI;
545 for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
546 IpcIoPendingRequest *const pending = i->second;
547
548 const unsigned int requestId = i->first;
549 debugs(47, 7, HERE << "disker timeout; ipcIo" <<
550 KidIdentifier << '.' << requestId);
551
552 pending->completeIo(NULL); // no response
553 delete pending; // XXX: leaking if throwing
554 }
555 olderRequests->clear();
556
557 swap(olderRequests, newerRequests); // switches pointers around
558 if (!olderRequests->empty() && !timeoutCheckScheduled)
559 scheduleTimeoutCheck();
560 }
561
562 /// prepare to check for timeouts in a little while
563 void
564 IpcIoFile::scheduleTimeoutCheck()
565 {
566 // we check all older requests at once so some may be wait for 2*Timeout
567 eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
568 reinterpret_cast<void *>(diskId), Timeout, 0, false);
569 timeoutCheckScheduled = true;
570 }
571
572 /// returns and forgets the right IpcIoFile pending request
573 IpcIoPendingRequest *
574 IpcIoFile::dequeueRequest(const unsigned int requestId)
575 {
576 Must(requestId != 0);
577
578 RequestMap *map = NULL;
579 RequestMap::iterator i = requestMap1.find(requestId);
580
581 if (i != requestMap1.end())
582 map = &requestMap1;
583 else {
584 i = requestMap2.find(requestId);
585 if (i != requestMap2.end())
586 map = &requestMap2;
587 }
588
589 if (!map) // not found in both maps
590 return NULL;
591
592 IpcIoPendingRequest *pending = i->second;
593 map->erase(i);
594 return pending;
595 }
596
597 int
598 IpcIoFile::getFD() const
599 {
600 assert(false); // not supported; TODO: remove this method from API
601 return -1;
602 }
603
604 /* IpcIoMsg */
605
606 IpcIoMsg::IpcIoMsg():
607 requestId(0),
608 offset(0),
609 len(0),
610 command(IpcIo::cmdNone),
611 xerrno(0)
612 {
613 start.tv_sec = 0;
614 start.tv_usec = 0;
615 }
616
617 /* IpcIoPendingRequest */
618
619 IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
620 file(aFile), readRequest(NULL), writeRequest(NULL)
621 {
622 }
623
624 void
625 IpcIoPendingRequest::completeIo(IpcIoMsg *const response)
626 {
627 if (readRequest)
628 file->readCompleted(readRequest, response);
629 else if (writeRequest)
630 file->writeCompleted(writeRequest, response);
631 else {
632 Must(!response); // only timeouts are handled here
633 file->openCompleted(NULL);
634 }
635 }
636
637 /* XXX: disker code that should probably be moved elsewhere */
638
639 static SBuf DbName; ///< full db file name
640 static int TheFile = -1; ///< db file descriptor
641
642 static void
643 diskerRead(IpcIoMsg &ipcIo)
644 {
645 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
646 ipcIo.len = 0;
647 debugs(47,2, HERE << "run out of shared memory pages for IPC I/O");
648 return;
649 }
650
651 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
652 const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
653 ++statCounter.syscalls.disk.reads;
654 fd_bytes(TheFile, read, FD_READ);
655
656 if (read >= 0) {
657 ipcIo.xerrno = 0;
658 const size_t len = static_cast<size_t>(read); // safe because read > 0
659 debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
660 (len == ipcIo.len ? "all " : "just ") << read);
661 ipcIo.len = len;
662 } else {
663 ipcIo.xerrno = errno;
664 ipcIo.len = 0;
665 debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
666 ipcIo.xerrno);
667 }
668 }
669
670 /// Tries to write buffer to disk (a few times if needed);
671 /// sets ipcIo results, but does no cleanup. The caller must cleanup.
672 static void
673 diskerWriteAttempts(IpcIoMsg &ipcIo)
674 {
675 const char *buf = Ipc::Mem::PagePointer(ipcIo.page);
676 size_t toWrite = min(ipcIo.len, Ipc::Mem::PageSize());
677 size_t wroteSoFar = 0;
678 off_t offset = ipcIo.offset;
679 // Partial writes to disk do happen. It is unlikely that the caller can
680 // handle partial writes by doing something other than writing leftovers
681 // again, so we try to write them ourselves to minimize overheads.
682 const int attemptLimit = 10;
683 for (int attempts = 1; attempts <= attemptLimit; ++attempts) {
684 const ssize_t result = pwrite(TheFile, buf, toWrite, offset);
685 ++statCounter.syscalls.disk.writes;
686 fd_bytes(TheFile, result, FD_WRITE);
687
688 if (result < 0) {
689 ipcIo.xerrno = errno;
690 assert(ipcIo.xerrno);
691 debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " failure" <<
692 " writing " << toWrite << '/' << ipcIo.len <<
693 " at " << ipcIo.offset << '+' << wroteSoFar <<
694 " on " << attempts << " try: " << xstrerr(ipcIo.xerrno));
695 ipcIo.len = wroteSoFar;
696 return; // bail on error
697 }
698
699 const size_t wroteNow = static_cast<size_t>(result); // result >= 0
700 ipcIo.xerrno = 0;
701
702 debugs(47,3, "disker" << KidIdentifier << " wrote " <<
703 (wroteNow >= toWrite ? "all " : "just ") << wroteNow <<
704 " out of " << toWrite << '/' << ipcIo.len << " at " <<
705 ipcIo.offset << '+' << wroteSoFar << " on " << attempts <<
706 " try");
707
708 wroteSoFar += wroteNow;
709
710 if (wroteNow >= toWrite) {
711 ipcIo.xerrno = 0;
712 ipcIo.len = wroteSoFar;
713 return; // wrote everything there was to write
714 }
715
716 buf += wroteNow;
717 offset += wroteNow;
718 toWrite -= wroteNow;
719 }
720
721 debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " exhausted all " <<
722 attemptLimit << " attempts while writing " <<
723 toWrite << '/' << ipcIo.len << " at " << ipcIo.offset << '+' <<
724 wroteSoFar);
725 return; // not a fatal I/O error, unless the caller treats it as such
726 }
727
728 static void
729 diskerWrite(IpcIoMsg &ipcIo)
730 {
731 diskerWriteAttempts(ipcIo); // may fail
732 Ipc::Mem::PutPage(ipcIo.page);
733 }
734
735 void
736 IpcIoFile::DiskerHandleMoreRequests(void *source)
737 {
738 debugs(47, 7, HERE << "resuming handling requests after " <<
739 static_cast<const char *>(source));
740 DiskerHandleMoreRequestsScheduled = false;
741 IpcIoFile::DiskerHandleRequests();
742 }
743
744 bool
745 IpcIoFile::WaitBeforePop()
746 {
747 const Ipc::QueueReader::Rate::Value ioRate = queue->localRateLimit();
748 const double maxRate = ioRate/1e3; // req/ms
749
750 // do we need to enforce configured I/O rate?
751 if (maxRate <= 0)
752 return false;
753
754 // is there an I/O request we could potentially delay?
755 int processId;
756 IpcIoMsg ipcIo;
757 if (!queue->peek(processId, ipcIo)) {
758 // unlike pop(), peek() is not reliable and does not block reader
759 // so we must proceed with pop() even if it is likely to fail
760 return false;
761 }
762
763 static timeval LastIo = current_time;
764
765 const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
766 // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
767 const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
768
769 const double credit = ioDuration; // what the last I/O should have cost us
770 const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
771 LastIo = current_time;
772
773 Ipc::QueueReader::Balance &balance = queue->localBalance();
774 balance += static_cast<int64_t>(credit - debit);
775
776 debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
777
778 if (ipcIo.command == IpcIo::cmdWrite && balance > maxImbalance) {
779 // if the next request is (likely) write and we accumulated
780 // too much time for future slow I/Os, then shed accumulated
781 // time to keep just half of the excess
782 const int64_t toSpend = balance - maxImbalance/2;
783
784 if (toSpend/1e3 > Timeout)
785 debugs(47, DBG_IMPORTANT, "WARNING: " << DbName << " delays " <<
786 "I/O requests for " << (toSpend/1e3) << " seconds " <<
787 "to obey " << ioRate << "/sec rate limit");
788
789 debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" <<
790 (1e3*maxRate) << "/sec rate");
791 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
792 &IpcIoFile::DiskerHandleMoreRequests,
793 const_cast<char*>("rate limiting"),
794 toSpend/1e3, 0, false);
795 DiskerHandleMoreRequestsScheduled = true;
796 return true;
797 } else if (balance < -maxImbalance) {
798 // do not owe "too much" to avoid "too large" bursts of I/O
799 balance = -maxImbalance;
800 }
801
802 return false;
803 }
804
805 void
806 IpcIoFile::DiskerHandleRequests()
807 {
808 // Balance our desire to maximize the number of concurrent I/O requests
809 // (reordred by OS to minimize seek time) with a requirement to
810 // send 1st-I/O notification messages, process Coordinator events, etc.
811 const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms
812 const timeval loopStart = current_time;
813
814 int popped = 0;
815 int workerId = 0;
816 IpcIoMsg ipcIo;
817 while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
818 ++popped;
819
820 // at least one I/O per call is guaranteed if the queue is not empty
821 DiskerHandleRequest(workerId, ipcIo);
822
823 getCurrentTime();
824 const double elapsedMsec = tvSubMsec(loopStart, current_time);
825 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
826 if (!DiskerHandleMoreRequestsScheduled) {
827 // the gap must be positive for select(2) to be given a chance
828 const double minBreakSecs = 0.001;
829 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
830 &IpcIoFile::DiskerHandleMoreRequests,
831 const_cast<char*>("long I/O loop"),
832 minBreakSecs, 0, false);
833 DiskerHandleMoreRequestsScheduled = true;
834 }
835 debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " <<
836 elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O");
837 break;
838 }
839 }
840
841 // TODO: consider using O_DIRECT with "elevator" optimization where we pop
842 // requests first, then reorder the popped requests to optimize seek time,
843 // then do I/O, then take a break, and come back for the next set of I/O
844 // requests.
845 }
846
847 /// called when disker receives an I/O request
848 void
849 IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
850 {
851 if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
852 debugs(0, DBG_CRITICAL, "ERROR: " << DbName <<
853 " should not receive " << ipcIo.command <<
854 " ipcIo" << workerId << '.' << ipcIo.requestId);
855 return;
856 }
857
858 debugs(47,5, HERE << "disker" << KidIdentifier <<
859 (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
860 ipcIo.len << " at " << ipcIo.offset <<
861 " ipcIo" << workerId << '.' << ipcIo.requestId);
862
863 if (ipcIo.command == IpcIo::cmdRead)
864 diskerRead(ipcIo);
865 else // ipcIo.command == IpcIo::cmdWrite
866 diskerWrite(ipcIo);
867
868 debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
869
870 try {
871 if (queue->push(workerId, ipcIo))
872 Notify(workerId); // must notify worker
873 } catch (const Queue::Full &) {
874 // The worker queue should not overflow because the worker should pop()
875 // before push()ing and because if disker pops N requests at a time,
876 // we should make sure the worker pop() queue length is the worker
877 // push queue length plus N+1. XXX: implement the N+1 difference.
878 debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue for " <<
879 DbName << " overflow: " <<
880 SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
881
882 // the I/O request we could not push will timeout
883 }
884 }
885
886 static bool
887 DiskerOpen(const SBuf &path, int flags, mode_t mode)
888 {
889 assert(TheFile < 0);
890
891 DbName = path;
892 TheFile = file_open(DbName.c_str(), flags);
893
894 if (TheFile < 0) {
895 const int xerrno = errno;
896 debugs(47, DBG_CRITICAL, "ERROR: cannot open " << DbName << ": " <<
897 xstrerr(xerrno));
898 return false;
899 }
900
901 ++store_open_disk_fd;
902 debugs(79,3, "rock db opened " << DbName << ": FD " << TheFile);
903 return true;
904 }
905
906 static void
907 DiskerClose(const SBuf &path)
908 {
909 if (TheFile >= 0) {
910 file_close(TheFile);
911 debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
912 TheFile = -1;
913 --store_open_disk_fd;
914 }
915 DbName.clear();
916 }
917
918 /// reports our needs for shared memory pages to Ipc::Mem::Pages
919 /// and initializes shared memory segments used by IpcIoFile
920 class IpcIoRr: public Ipc::Mem::RegisteredRunner
921 {
922 public:
923 /* RegisteredRunner API */
924 IpcIoRr(): owner(NULL) {}
925 virtual ~IpcIoRr();
926 virtual void claimMemoryNeeds();
927
928 protected:
929 /* Ipc::Mem::RegisteredRunner API */
930 virtual void create();
931
932 private:
933 Ipc::FewToFewBiQueue::Owner *owner;
934 };
935
936 RunnerRegistrationEntry(IpcIoRr);
937
938 void
939 IpcIoRr::claimMemoryNeeds()
940 {
941 const int itemsCount = Ipc::FewToFewBiQueue::MaxItemsCount(
942 ::Config.workers, ::Config.cacheSwap.n_strands, QueueCapacity);
943 // the maximum number of shared I/O pages is approximately the
944 // number of queue slots, we add a fudge factor to that to account
945 // for corner cases where I/O pages are created before queue
946 // limits are checked or destroyed long after the I/O is dequeued
947 Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage,
948 static_cast<int>(itemsCount * 1.1));
949 }
950
951 void
952 IpcIoRr::create()
953 {
954 if (Config.cacheSwap.n_strands <= 0)
955 return;
956
957 Must(!owner);
958 owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1,
959 Config.cacheSwap.n_strands,
960 1 + Config.workers, sizeof(IpcIoMsg),
961 QueueCapacity);
962 }
963
964 IpcIoRr::~IpcIoRr()
965 {
966 delete owner;
967 }