]> git.ipfire.org Git - thirdparty/squid.git/blob - src/DiskIO/IpcIo/IpcIoFile.cc
Store API and layout polishing. No functionality changes intended.
[thirdparty/squid.git] / src / DiskIO / IpcIo / IpcIoFile.cc
1 /*
2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 47 Store Directory Routines */
10
11 #include "squid.h"
12 #include "base/RunnersRegistry.h"
13 #include "base/TextException.h"
14 #include "fs_io.h"
15 #include "DiskIO/IORequestor.h"
16 #include "DiskIO/IpcIo/IpcIoFile.h"
17 #include "DiskIO/ReadRequest.h"
18 #include "DiskIO/WriteRequest.h"
19 #include "fd.h"
20 #include "globals.h"
21 #include "ipc/mem/Pages.h"
22 #include "ipc/Messages.h"
23 #include "ipc/Port.h"
24 #include "ipc/Queue.h"
25 #include "ipc/StrandSearch.h"
26 #include "ipc/UdsOp.h"
27 #include "SBuf.h"
28 #include "SquidConfig.h"
29 #include "SquidTime.h"
30 #include "StatCounters.h"
31 #include "tools.h"
32
33 #include <cerrno>
34
35 CBDATA_CLASS_INIT(IpcIoFile);
36
37 /// shared memory segment path to use for IpcIoFile maps
38 static const char *const ShmLabel = "io_file";
39 /// a single worker-to-disker or disker-to-worker queue capacity; up
40 /// to 2*QueueCapacity I/O requests queued between a single worker and
41 /// a single disker
42 // TODO: make configurable or compute from squid.conf settings if possible
43 static const int QueueCapacity = 1024;
44
45 const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
46 IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
47 IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
48 std::unique_ptr<IpcIoFile::Queue> IpcIoFile::queue;
49
50 bool IpcIoFile::DiskerHandleMoreRequestsScheduled = false;
51
52 static bool DiskerOpen(const SBuf &path, int flags, mode_t mode);
53 static void DiskerClose(const SBuf &path);
54
55 /// IpcIo wrapper for debugs() streams; XXX: find a better class name
56 struct SipcIo {
57 SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
58 worker(aWorker), msg(aMsg), disker(aDisker) {}
59
60 int worker;
61 const IpcIoMsg &msg;
62 int disker;
63 };
64
65 std::ostream &
66 operator <<(std::ostream &os, const SipcIo &sio)
67 {
68 return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
69 (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker;
70 }
71
72 IpcIoFile::IpcIoFile(char const *aDb):
73 dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
74 olderRequests(&requestMap1), newerRequests(&requestMap2),
75 timeoutCheckScheduled(false)
76 {
77 }
78
79 IpcIoFile::~IpcIoFile()
80 {
81 if (diskId >= 0) {
82 const IpcIoFilesMap::iterator i = IpcIoFiles.find(diskId);
83 // XXX: warn and continue?
84 Must(i != IpcIoFiles.end());
85 Must(i->second == this);
86 IpcIoFiles.erase(i);
87 }
88 }
89
90 void
91 IpcIoFile::configure(const Config &cfg)
92 {
93 DiskFile::configure(cfg);
94 config = cfg;
95 }
96
97 void
98 IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
99 {
100 ioRequestor = callback;
101 Must(diskId < 0); // we do not know our disker yet
102
103 if (!queue.get())
104 queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier));
105
106 if (IamDiskProcess()) {
107 error_ = !DiskerOpen(SBuf(dbName.termedBuf()), flags, mode);
108 if (error_)
109 return;
110
111 diskId = KidIdentifier;
112 const bool inserted =
113 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
114 Must(inserted);
115
116 queue->localRateLimit().store(config.ioRate);
117
118 Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
119 ann.strand.tag = dbName;
120 Ipc::TypedMsgHdr message;
121 ann.pack(message);
122 SendMessage(Ipc::Port::CoordinatorAddr(), message);
123
124 ioRequestor->ioCompletedNotification();
125 return;
126 }
127
128 Ipc::StrandSearchRequest request;
129 request.requestorId = KidIdentifier;
130 request.tag = dbName;
131
132 Ipc::TypedMsgHdr msg;
133 request.pack(msg);
134 Ipc::SendMessage(Ipc::Port::CoordinatorAddr(), msg);
135
136 WaitingForOpen.push_back(this);
137
138 eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
139 this, Timeout, 0, false); // "this" pointer is used as id
140 }
141
142 void
143 IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response)
144 {
145 Must(diskId < 0); // we do not know our disker yet
146
147 if (!response) {
148 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " communication " <<
149 "channel establishment timeout");
150 error_ = true;
151 } else {
152 diskId = response->strand.kidId;
153 if (diskId >= 0) {
154 const bool inserted =
155 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
156 Must(inserted);
157 } else {
158 error_ = true;
159 debugs(79, DBG_IMPORTANT, "ERROR: no disker claimed " <<
160 "responsibility for " << dbName);
161 }
162 }
163
164 ioRequestor->ioCompletedNotification();
165 }
166
167 /**
168 * Alias for IpcIoFile::open(...)
169 \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
170 */
171 void
172 IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
173 {
174 assert(false); // check
175 /* We use the same logic path for open */
176 open(flags, mode, callback);
177 }
178
179 void
180 IpcIoFile::close()
181 {
182 assert(ioRequestor != NULL);
183
184 if (IamDiskProcess())
185 DiskerClose(SBuf(dbName.termedBuf()));
186 // XXX: else nothing to do?
187
188 ioRequestor->closeCompleted();
189 }
190
191 bool
192 IpcIoFile::canRead() const
193 {
194 return diskId >= 0 && !error_ && canWait();
195 }
196
197 bool
198 IpcIoFile::canWrite() const
199 {
200 return diskId >= 0 && !error_ && canWait();
201 }
202
203 bool
204 IpcIoFile::error() const
205 {
206 return error_;
207 }
208
209 void
210 IpcIoFile::read(ReadRequest *readRequest)
211 {
212 debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " <<
213 readRequest->offset << ")");
214
215 assert(ioRequestor != NULL);
216 assert(readRequest->offset >= 0);
217 Must(!error_);
218
219 //assert(minOffset < 0 || minOffset <= readRequest->offset);
220 //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
221
222 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
223 pending->readRequest = readRequest;
224 push(pending);
225 }
226
227 void
228 IpcIoFile::readCompleted(ReadRequest *readRequest,
229 IpcIoMsg *const response)
230 {
231 bool ioError = false;
232 if (!response) {
233 debugs(79, 3, HERE << "error: timeout");
234 ioError = true; // I/O timeout does not warrant setting error_?
235 } else {
236 if (response->xerrno) {
237 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read: " <<
238 xstrerr(response->xerrno));
239 ioError = error_ = true;
240 } else if (!response->page) {
241 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read ran " <<
242 "out of shared memory pages");
243 ioError = true;
244 } else {
245 const char *const buf = Ipc::Mem::PagePointer(response->page);
246 memcpy(readRequest->buf, buf, response->len);
247 }
248
249 Ipc::Mem::PutPage(response->page);
250 }
251
252 const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
253 const int errflag = ioError ? DISK_ERROR :DISK_OK;
254 ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
255 }
256
257 void
258 IpcIoFile::write(WriteRequest *writeRequest)
259 {
260 debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " <<
261 writeRequest->offset << ")");
262
263 assert(ioRequestor != NULL);
264 assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
265 assert(writeRequest->offset >= 0);
266 Must(!error_);
267
268 //assert(minOffset < 0 || minOffset <= writeRequest->offset);
269 //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
270
271 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
272 pending->writeRequest = writeRequest;
273 push(pending);
274 }
275
276 void
277 IpcIoFile::writeCompleted(WriteRequest *writeRequest,
278 const IpcIoMsg *const response)
279 {
280 bool ioError = false;
281 if (!response) {
282 debugs(79, 3, "disker " << diskId << " timeout");
283 ioError = true; // I/O timeout does not warrant setting error_?
284 } else if (response->xerrno) {
285 debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId <<
286 " error writing " << writeRequest->len << " bytes at " <<
287 writeRequest->offset << ": " << xstrerr(response->xerrno) <<
288 "; this worker will stop using " << dbName);
289 ioError = error_ = true;
290 } else if (response->len != writeRequest->len) {
291 debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId << " wrote " <<
292 response->len << " instead of " << writeRequest->len <<
293 " bytes (offset " << writeRequest->offset << "); " <<
294 "this worker will stop using " << dbName);
295 error_ = true;
296 }
297
298 if (writeRequest->free_func)
299 (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
300
301 if (!ioError) {
302 debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
303 diskId << " at " << writeRequest->offset);
304 }
305
306 const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
307 const int errflag = ioError ? DISK_ERROR :DISK_OK;
308 ioRequestor->writeCompleted(errflag, rlen, writeRequest);
309 }
310
311 bool
312 IpcIoFile::ioInProgress() const
313 {
314 return !olderRequests->empty() || !newerRequests->empty();
315 }
316
317 /// track a new pending request
318 void
319 IpcIoFile::trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
320 {
321 const std::pair<RequestMap::iterator,bool> result =
322 newerRequests->insert(std::make_pair(id, pending));
323 Must(result.second); // failures means that id was not unique
324 if (!timeoutCheckScheduled)
325 scheduleTimeoutCheck();
326 }
327
328 /// push an I/O request to disker
329 void
330 IpcIoFile::push(IpcIoPendingRequest *const pending)
331 {
332 // prevent queue overflows: check for responses to earlier requests
333 // warning: this call may result in indirect push() recursion
334 HandleResponses("before push");
335
336 debugs(47, 7, HERE);
337 Must(diskId >= 0);
338 Must(pending);
339 Must(pending->readRequest || pending->writeRequest);
340
341 IpcIoMsg ipcIo;
342 try {
343 if (++lastRequestId == 0) // don't use zero value as requestId
344 ++lastRequestId;
345 ipcIo.requestId = lastRequestId;
346 ipcIo.start = current_time;
347 if (pending->readRequest) {
348 ipcIo.command = IpcIo::cmdRead;
349 ipcIo.offset = pending->readRequest->offset;
350 ipcIo.len = pending->readRequest->len;
351 } else { // pending->writeRequest
352 Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
353 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
354 ipcIo.len = 0;
355 throw TexcHere("run out of shared memory pages for IPC I/O");
356 }
357 ipcIo.command = IpcIo::cmdWrite;
358 ipcIo.offset = pending->writeRequest->offset;
359 ipcIo.len = pending->writeRequest->len;
360 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
361 memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
362 }
363
364 debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
365
366 if (queue->push(diskId, ipcIo))
367 Notify(diskId); // must notify disker
368 trackPendingRequest(ipcIo.requestId, pending);
369 } catch (const Queue::Full &) {
370 debugs(47, DBG_IMPORTANT, "ERROR: worker I/O push queue for " <<
371 dbName << " overflow: " <<
372 SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
373 // TODO: grow queue size
374
375 pending->completeIo(NULL);
376 delete pending;
377 } catch (const TextException &e) {
378 debugs(47, DBG_IMPORTANT, "ERROR: " << dbName << " exception: " << e.what());
379 pending->completeIo(NULL);
380 delete pending;
381 }
382 }
383
384 /// whether we think there is enough time to complete the I/O
385 bool
386 IpcIoFile::canWait() const
387 {
388 if (!config.ioTimeout)
389 return true; // no timeout specified
390
391 IpcIoMsg oldestIo;
392 if (!queue->findOldest(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
393 return true; // we cannot estimate expected wait time; assume it is OK
394
395 const int oldestWait = tvSubMsec(oldestIo.start, current_time);
396
397 int rateWait = -1; // time in millisecons
398 const int ioRate = queue->rateLimit(diskId).load();
399 if (ioRate > 0) {
400 // if there are N requests pending, the new one will wait at
401 // least N/max-swap-rate seconds
402 rateWait = static_cast<int>(1e3 * queue->outSize(diskId) / ioRate);
403 // adjust N/max-swap-rate value based on the queue "balance"
404 // member, in case we have been borrowing time against future
405 // I/O already
406 rateWait += queue->balance(diskId);
407 }
408
409 const int expectedWait = max(oldestWait, rateWait);
410 if (expectedWait < 0 ||
411 static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
412 return true; // expected wait time is acceptible
413
414 debugs(47,2, HERE << "cannot wait: " << expectedWait <<
415 " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
416 return false; // do not want to wait that long
417 }
418
419 /// called when coordinator responds to worker open request
420 void
421 IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
422 {
423 debugs(47, 7, HERE << "coordinator response to open request");
424 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
425 i != WaitingForOpen.end(); ++i) {
426 if (response.strand.tag == (*i)->dbName) {
427 (*i)->openCompleted(&response);
428 WaitingForOpen.erase(i);
429 return;
430 }
431 }
432
433 debugs(47, 4, HERE << "LATE disker response to open for " <<
434 response.strand.tag);
435 // nothing we can do about it; completeIo() has been called already
436 }
437
438 void
439 IpcIoFile::HandleResponses(const char *const when)
440 {
441 debugs(47, 4, HERE << "popping all " << when);
442 IpcIoMsg ipcIo;
443 // get all responses we can: since we are not pushing, this will stop
444 int diskId;
445 while (queue->pop(diskId, ipcIo)) {
446 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
447 Must(i != IpcIoFiles.end()); // TODO: warn but continue
448 i->second->handleResponse(ipcIo);
449 }
450 }
451
452 void
453 IpcIoFile::handleResponse(IpcIoMsg &ipcIo)
454 {
455 const int requestId = ipcIo.requestId;
456 debugs(47, 7, HERE << "popped disker response: " <<
457 SipcIo(KidIdentifier, ipcIo, diskId));
458
459 Must(requestId);
460 if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
461 pending->completeIo(&ipcIo);
462 delete pending; // XXX: leaking if throwing
463 } else {
464 debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command <<
465 "; ipcIo" << KidIdentifier << '.' << requestId);
466 // nothing we can do about it; completeIo() has been called already
467 }
468 }
469
470 void
471 IpcIoFile::Notify(const int peerId)
472 {
473 // TODO: Count and report the total number of notifications, pops, pushes.
474 debugs(47, 7, HERE << "kid" << peerId);
475 Ipc::TypedMsgHdr msg;
476 msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
477 msg.putInt(KidIdentifier);
478 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, peerId);
479 Ipc::SendMessage(addr, msg);
480 }
481
482 void
483 IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
484 {
485 const int from = msg.getInt();
486 debugs(47, 7, HERE << "from " << from);
487 queue->clearReaderSignal(from);
488 if (IamDiskProcess())
489 DiskerHandleRequests();
490 else
491 HandleResponses("after notification");
492 }
493
494 /// handles open request timeout
495 void
496 IpcIoFile::OpenTimeout(void *const param)
497 {
498 Must(param);
499 // the pointer is used for comparison only and not dereferenced
500 const IpcIoFile *const ipcIoFile =
501 reinterpret_cast<const IpcIoFile *>(param);
502 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
503 i != WaitingForOpen.end(); ++i) {
504 if (*i == ipcIoFile) {
505 (*i)->openCompleted(NULL);
506 WaitingForOpen.erase(i);
507 break;
508 }
509 }
510 }
511
512 /// IpcIoFile::checkTimeouts wrapper
513 void
514 IpcIoFile::CheckTimeouts(void *const param)
515 {
516 Must(param);
517 const int diskId = reinterpret_cast<uintptr_t>(param);
518 debugs(47, 7, HERE << "diskId=" << diskId);
519 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
520 if (i != IpcIoFiles.end())
521 i->second->checkTimeouts();
522 }
523
524 void
525 IpcIoFile::checkTimeouts()
526 {
527 timeoutCheckScheduled = false;
528
529 // last chance to recover in case a notification message was lost, etc.
530 const RequestMap::size_type timeoutsBefore = olderRequests->size();
531 HandleResponses("before timeout");
532 const RequestMap::size_type timeoutsNow = olderRequests->size();
533
534 if (timeoutsBefore > timeoutsNow) { // some requests were rescued
535 // notification message lost or significantly delayed?
536 debugs(47, DBG_IMPORTANT, "WARNING: communication with " << dbName <<
537 " may be too slow or disrupted for about " <<
538 Timeout << "s; rescued " << (timeoutsBefore - timeoutsNow) <<
539 " out of " << timeoutsBefore << " I/Os");
540 }
541
542 if (timeoutsNow) {
543 debugs(47, DBG_IMPORTANT, "WARNING: abandoning " <<
544 timeoutsNow << ' ' << dbName << " I/Os after at least " <<
545 Timeout << "s timeout");
546 }
547
548 // any old request would have timed out by now
549 typedef RequestMap::const_iterator RMCI;
550 for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
551 IpcIoPendingRequest *const pending = i->second;
552
553 const unsigned int requestId = i->first;
554 debugs(47, 7, HERE << "disker timeout; ipcIo" <<
555 KidIdentifier << '.' << requestId);
556
557 pending->completeIo(NULL); // no response
558 delete pending; // XXX: leaking if throwing
559 }
560 olderRequests->clear();
561
562 swap(olderRequests, newerRequests); // switches pointers around
563 if (!olderRequests->empty() && !timeoutCheckScheduled)
564 scheduleTimeoutCheck();
565 }
566
567 /// prepare to check for timeouts in a little while
568 void
569 IpcIoFile::scheduleTimeoutCheck()
570 {
571 // we check all older requests at once so some may be wait for 2*Timeout
572 eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
573 reinterpret_cast<void *>(diskId), Timeout, 0, false);
574 timeoutCheckScheduled = true;
575 }
576
577 /// returns and forgets the right IpcIoFile pending request
578 IpcIoPendingRequest *
579 IpcIoFile::dequeueRequest(const unsigned int requestId)
580 {
581 Must(requestId != 0);
582
583 RequestMap *map = NULL;
584 RequestMap::iterator i = requestMap1.find(requestId);
585
586 if (i != requestMap1.end())
587 map = &requestMap1;
588 else {
589 i = requestMap2.find(requestId);
590 if (i != requestMap2.end())
591 map = &requestMap2;
592 }
593
594 if (!map) // not found in both maps
595 return NULL;
596
597 IpcIoPendingRequest *pending = i->second;
598 map->erase(i);
599 return pending;
600 }
601
602 int
603 IpcIoFile::getFD() const
604 {
605 assert(false); // not supported; TODO: remove this method from API
606 return -1;
607 }
608
609 /* IpcIoMsg */
610
611 IpcIoMsg::IpcIoMsg():
612 requestId(0),
613 offset(0),
614 len(0),
615 command(IpcIo::cmdNone),
616 xerrno(0)
617 {
618 start.tv_sec = 0;
619 start.tv_usec = 0;
620 }
621
622 /* IpcIoPendingRequest */
623
624 IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
625 file(aFile), readRequest(NULL), writeRequest(NULL)
626 {
627 }
628
629 void
630 IpcIoPendingRequest::completeIo(IpcIoMsg *const response)
631 {
632 if (readRequest)
633 file->readCompleted(readRequest, response);
634 else if (writeRequest)
635 file->writeCompleted(writeRequest, response);
636 else {
637 Must(!response); // only timeouts are handled here
638 file->openCompleted(NULL);
639 }
640 }
641
642 /* XXX: disker code that should probably be moved elsewhere */
643
644 static SBuf DbName; ///< full db file name
645 static int TheFile = -1; ///< db file descriptor
646
647 static void
648 diskerRead(IpcIoMsg &ipcIo)
649 {
650 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
651 ipcIo.len = 0;
652 debugs(47,2, HERE << "run out of shared memory pages for IPC I/O");
653 return;
654 }
655
656 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
657 const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
658 ++statCounter.syscalls.disk.reads;
659 fd_bytes(TheFile, read, FD_READ);
660
661 if (read >= 0) {
662 ipcIo.xerrno = 0;
663 const size_t len = static_cast<size_t>(read); // safe because read > 0
664 debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
665 (len == ipcIo.len ? "all " : "just ") << read);
666 ipcIo.len = len;
667 } else {
668 ipcIo.xerrno = errno;
669 ipcIo.len = 0;
670 debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
671 ipcIo.xerrno);
672 }
673 }
674
675 /// Tries to write buffer to disk (a few times if needed);
676 /// sets ipcIo results, but does no cleanup. The caller must cleanup.
677 static void
678 diskerWriteAttempts(IpcIoMsg &ipcIo)
679 {
680 const char *buf = Ipc::Mem::PagePointer(ipcIo.page);
681 size_t toWrite = min(ipcIo.len, Ipc::Mem::PageSize());
682 size_t wroteSoFar = 0;
683 off_t offset = ipcIo.offset;
684 // Partial writes to disk do happen. It is unlikely that the caller can
685 // handle partial writes by doing something other than writing leftovers
686 // again, so we try to write them ourselves to minimize overheads.
687 const int attemptLimit = 10;
688 for (int attempts = 1; attempts <= attemptLimit; ++attempts) {
689 const ssize_t result = pwrite(TheFile, buf, toWrite, offset);
690 ++statCounter.syscalls.disk.writes;
691 fd_bytes(TheFile, result, FD_WRITE);
692
693 if (result < 0) {
694 ipcIo.xerrno = errno;
695 assert(ipcIo.xerrno);
696 debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " failure" <<
697 " writing " << toWrite << '/' << ipcIo.len <<
698 " at " << ipcIo.offset << '+' << wroteSoFar <<
699 " on " << attempts << " try: " << xstrerr(ipcIo.xerrno));
700 ipcIo.len = wroteSoFar;
701 return; // bail on error
702 }
703
704 const size_t wroteNow = static_cast<size_t>(result); // result >= 0
705 ipcIo.xerrno = 0;
706
707 debugs(47,3, "disker" << KidIdentifier << " wrote " <<
708 (wroteNow >= toWrite ? "all " : "just ") << wroteNow <<
709 " out of " << toWrite << '/' << ipcIo.len << " at " <<
710 ipcIo.offset << '+' << wroteSoFar << " on " << attempts <<
711 " try");
712
713 wroteSoFar += wroteNow;
714
715 if (wroteNow >= toWrite) {
716 ipcIo.xerrno = 0;
717 ipcIo.len = wroteSoFar;
718 return; // wrote everything there was to write
719 }
720
721 buf += wroteNow;
722 offset += wroteNow;
723 toWrite -= wroteNow;
724 }
725
726 debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " exhausted all " <<
727 attemptLimit << " attempts while writing " <<
728 toWrite << '/' << ipcIo.len << " at " << ipcIo.offset << '+' <<
729 wroteSoFar);
730 return; // not a fatal I/O error, unless the caller treats it as such
731 }
732
733 static void
734 diskerWrite(IpcIoMsg &ipcIo)
735 {
736 diskerWriteAttempts(ipcIo); // may fail
737 Ipc::Mem::PutPage(ipcIo.page);
738 }
739
740 void
741 IpcIoFile::DiskerHandleMoreRequests(void *source)
742 {
743 debugs(47, 7, HERE << "resuming handling requests after " <<
744 static_cast<const char *>(source));
745 DiskerHandleMoreRequestsScheduled = false;
746 IpcIoFile::DiskerHandleRequests();
747 }
748
749 bool
750 IpcIoFile::WaitBeforePop()
751 {
752 const int ioRate = queue->localRateLimit().load();
753 const double maxRate = ioRate/1e3; // req/ms
754
755 // do we need to enforce configured I/O rate?
756 if (maxRate <= 0)
757 return false;
758
759 // is there an I/O request we could potentially delay?
760 int processId;
761 IpcIoMsg ipcIo;
762 if (!queue->peek(processId, ipcIo)) {
763 // unlike pop(), peek() is not reliable and does not block reader
764 // so we must proceed with pop() even if it is likely to fail
765 return false;
766 }
767
768 static timeval LastIo = current_time;
769
770 const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
771 // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
772 const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
773
774 const double credit = ioDuration; // what the last I/O should have cost us
775 const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
776 LastIo = current_time;
777
778 Ipc::QueueReader::Balance &balance = queue->localBalance();
779 balance += static_cast<int64_t>(credit - debit);
780
781 debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
782
783 if (ipcIo.command == IpcIo::cmdWrite && balance > maxImbalance) {
784 // if the next request is (likely) write and we accumulated
785 // too much time for future slow I/Os, then shed accumulated
786 // time to keep just half of the excess
787 const int64_t toSpend = balance - maxImbalance/2;
788
789 if (toSpend/1e3 > Timeout)
790 debugs(47, DBG_IMPORTANT, "WARNING: " << DbName << " delays " <<
791 "I/O requests for " << (toSpend/1e3) << " seconds " <<
792 "to obey " << ioRate << "/sec rate limit");
793
794 debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" <<
795 (1e3*maxRate) << "/sec rate");
796 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
797 &IpcIoFile::DiskerHandleMoreRequests,
798 const_cast<char*>("rate limiting"),
799 toSpend/1e3, 0, false);
800 DiskerHandleMoreRequestsScheduled = true;
801 return true;
802 } else if (balance < -maxImbalance) {
803 // do not owe "too much" to avoid "too large" bursts of I/O
804 balance = -maxImbalance;
805 }
806
807 return false;
808 }
809
810 void
811 IpcIoFile::DiskerHandleRequests()
812 {
813 // Balance our desire to maximize the number of concurrent I/O requests
814 // (reordred by OS to minimize seek time) with a requirement to
815 // send 1st-I/O notification messages, process Coordinator events, etc.
816 const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms
817 const timeval loopStart = current_time;
818
819 int popped = 0;
820 int workerId = 0;
821 IpcIoMsg ipcIo;
822 while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
823 ++popped;
824
825 // at least one I/O per call is guaranteed if the queue is not empty
826 DiskerHandleRequest(workerId, ipcIo);
827
828 getCurrentTime();
829 const double elapsedMsec = tvSubMsec(loopStart, current_time);
830 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
831 if (!DiskerHandleMoreRequestsScheduled) {
832 // the gap must be positive for select(2) to be given a chance
833 const double minBreakSecs = 0.001;
834 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
835 &IpcIoFile::DiskerHandleMoreRequests,
836 const_cast<char*>("long I/O loop"),
837 minBreakSecs, 0, false);
838 DiskerHandleMoreRequestsScheduled = true;
839 }
840 debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " <<
841 elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O");
842 break;
843 }
844 }
845
846 // TODO: consider using O_DIRECT with "elevator" optimization where we pop
847 // requests first, then reorder the popped requests to optimize seek time,
848 // then do I/O, then take a break, and come back for the next set of I/O
849 // requests.
850 }
851
852 /// called when disker receives an I/O request
853 void
854 IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
855 {
856 if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
857 debugs(0, DBG_CRITICAL, "ERROR: " << DbName <<
858 " should not receive " << ipcIo.command <<
859 " ipcIo" << workerId << '.' << ipcIo.requestId);
860 return;
861 }
862
863 debugs(47,5, HERE << "disker" << KidIdentifier <<
864 (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
865 ipcIo.len << " at " << ipcIo.offset <<
866 " ipcIo" << workerId << '.' << ipcIo.requestId);
867
868 if (ipcIo.command == IpcIo::cmdRead)
869 diskerRead(ipcIo);
870 else // ipcIo.command == IpcIo::cmdWrite
871 diskerWrite(ipcIo);
872
873 debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
874
875 try {
876 if (queue->push(workerId, ipcIo))
877 Notify(workerId); // must notify worker
878 } catch (const Queue::Full &) {
879 // The worker queue should not overflow because the worker should pop()
880 // before push()ing and because if disker pops N requests at a time,
881 // we should make sure the worker pop() queue length is the worker
882 // push queue length plus N+1. XXX: implement the N+1 difference.
883 debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue for " <<
884 DbName << " overflow: " <<
885 SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
886
887 // the I/O request we could not push will timeout
888 }
889 }
890
891 static bool
892 DiskerOpen(const SBuf &path, int flags, mode_t)
893 {
894 assert(TheFile < 0);
895
896 DbName = path;
897 TheFile = file_open(DbName.c_str(), flags);
898
899 if (TheFile < 0) {
900 const int xerrno = errno;
901 debugs(47, DBG_CRITICAL, "ERROR: cannot open " << DbName << ": " <<
902 xstrerr(xerrno));
903 return false;
904 }
905
906 ++store_open_disk_fd;
907 debugs(79,3, "rock db opened " << DbName << ": FD " << TheFile);
908 return true;
909 }
910
911 static void
912 DiskerClose(const SBuf &path)
913 {
914 if (TheFile >= 0) {
915 file_close(TheFile);
916 debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
917 TheFile = -1;
918 --store_open_disk_fd;
919 }
920 DbName.clear();
921 }
922
923 /// reports our needs for shared memory pages to Ipc::Mem::Pages
924 /// and initializes shared memory segments used by IpcIoFile
925 class IpcIoRr: public Ipc::Mem::RegisteredRunner
926 {
927 public:
928 /* RegisteredRunner API */
929 IpcIoRr(): owner(NULL) {}
930 virtual ~IpcIoRr();
931 virtual void claimMemoryNeeds();
932
933 protected:
934 /* Ipc::Mem::RegisteredRunner API */
935 virtual void create();
936
937 private:
938 Ipc::FewToFewBiQueue::Owner *owner;
939 };
940
941 RunnerRegistrationEntry(IpcIoRr);
942
943 void
944 IpcIoRr::claimMemoryNeeds()
945 {
946 const int itemsCount = Ipc::FewToFewBiQueue::MaxItemsCount(
947 ::Config.workers, ::Config.cacheSwap.n_strands, QueueCapacity);
948 // the maximum number of shared I/O pages is approximately the
949 // number of queue slots, we add a fudge factor to that to account
950 // for corner cases where I/O pages are created before queue
951 // limits are checked or destroyed long after the I/O is dequeued
952 Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage,
953 static_cast<int>(itemsCount * 1.1));
954 }
955
956 void
957 IpcIoRr::create()
958 {
959 if (Config.cacheSwap.n_strands <= 0)
960 return;
961
962 Must(!owner);
963 owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1,
964 Config.cacheSwap.n_strands,
965 1 + Config.workers, sizeof(IpcIoMsg),
966 QueueCapacity);
967 }
968
969 IpcIoRr::~IpcIoRr()
970 {
971 delete owner;
972 }
973