]> git.ipfire.org Git - thirdparty/squid.git/blob - src/DiskIO/IpcIo/IpcIoFile.cc
Boilerplate: update copyright blurbs on src/
[thirdparty/squid.git] / src / DiskIO / IpcIo / IpcIoFile.cc
1 /*
2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 47 Store Directory Routines */
10
11 #include "squid.h"
12 #include "base/RunnersRegistry.h"
13 #include "base/TextException.h"
14 #include "disk.h"
15 #include "DiskIO/IORequestor.h"
16 #include "DiskIO/IpcIo/IpcIoFile.h"
17 #include "DiskIO/ReadRequest.h"
18 #include "DiskIO/WriteRequest.h"
19 #include "fd.h"
20 #include "globals.h"
21 #include "ipc/mem/Pages.h"
22 #include "ipc/Messages.h"
23 #include "ipc/Port.h"
24 #include "ipc/Queue.h"
25 #include "ipc/StrandSearch.h"
26 #include "ipc/UdsOp.h"
27 #include "SBuf.h"
28 #include "SquidConfig.h"
29 #include "SquidTime.h"
30 #include "StatCounters.h"
31 #include "tools.h"
32
33 #include <cerrno>
34
35 CBDATA_CLASS_INIT(IpcIoFile);
36
37 /// shared memory segment path to use for IpcIoFile maps
38 static const char *const ShmLabel = "io_file";
39 /// a single worker-to-disker or disker-to-worker queue capacity; up
40 /// to 2*QueueCapacity I/O requests queued between a single worker and
41 /// a single disker
42 // TODO: make configurable or compute from squid.conf settings if possible
43 static const int QueueCapacity = 1024;
44
45 const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
46 IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
47 IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
48 std::auto_ptr<IpcIoFile::Queue> IpcIoFile::queue;
49
50 bool IpcIoFile::DiskerHandleMoreRequestsScheduled = false;
51
52 static bool DiskerOpen(const SBuf &path, int flags, mode_t mode);
53 static void DiskerClose(const SBuf &path);
54
55 /// IpcIo wrapper for debugs() streams; XXX: find a better class name
56 struct SipcIo {
57 SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
58 worker(aWorker), msg(aMsg), disker(aDisker) {}
59
60 int worker;
61 const IpcIoMsg &msg;
62 int disker;
63 };
64
65 std::ostream &
66 operator <<(std::ostream &os, const SipcIo &sio)
67 {
68 return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
69 (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker;
70 }
71
72 IpcIoFile::IpcIoFile(char const *aDb):
73 dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
74 olderRequests(&requestMap1), newerRequests(&requestMap2),
75 timeoutCheckScheduled(false)
76 {
77 }
78
79 IpcIoFile::~IpcIoFile()
80 {
81 if (diskId >= 0) {
82 const IpcIoFilesMap::iterator i = IpcIoFiles.find(diskId);
83 // XXX: warn and continue?
84 Must(i != IpcIoFiles.end());
85 Must(i->second == this);
86 IpcIoFiles.erase(i);
87 }
88 }
89
90 void
91 IpcIoFile::configure(const Config &cfg)
92 {
93 DiskFile::configure(cfg);
94 config = cfg;
95 }
96
97 void
98 IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
99 {
100 ioRequestor = callback;
101 Must(diskId < 0); // we do not know our disker yet
102
103 if (!queue.get())
104 queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier));
105
106 if (IamDiskProcess()) {
107 error_ = !DiskerOpen(SBuf(dbName.termedBuf()), flags, mode);
108 if (error_)
109 return;
110
111 diskId = KidIdentifier;
112 const bool inserted =
113 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
114 Must(inserted);
115
116 queue->localRateLimit() =
117 static_cast<Ipc::QueueReader::Rate::Value>(config.ioRate);
118
119 Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
120 ann.strand.tag = dbName;
121 Ipc::TypedMsgHdr message;
122 ann.pack(message);
123 SendMessage(Ipc::Port::CoordinatorAddr(), message);
124
125 ioRequestor->ioCompletedNotification();
126 return;
127 }
128
129 Ipc::StrandSearchRequest request;
130 request.requestorId = KidIdentifier;
131 request.tag = dbName;
132
133 Ipc::TypedMsgHdr msg;
134 request.pack(msg);
135 Ipc::SendMessage(Ipc::Port::CoordinatorAddr(), msg);
136
137 WaitingForOpen.push_back(this);
138
139 eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
140 this, Timeout, 0, false); // "this" pointer is used as id
141 }
142
143 void
144 IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response)
145 {
146 Must(diskId < 0); // we do not know our disker yet
147
148 if (!response) {
149 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " communication " <<
150 "channel establishment timeout");
151 error_ = true;
152 } else {
153 diskId = response->strand.kidId;
154 if (diskId >= 0) {
155 const bool inserted =
156 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
157 Must(inserted);
158 } else {
159 error_ = true;
160 debugs(79, DBG_IMPORTANT, "ERROR: no disker claimed " <<
161 "responsibility for " << dbName);
162 }
163 }
164
165 ioRequestor->ioCompletedNotification();
166 }
167
168 /**
169 * Alias for IpcIoFile::open(...)
170 \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
171 */
172 void
173 IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
174 {
175 assert(false); // check
176 /* We use the same logic path for open */
177 open(flags, mode, callback);
178 }
179
180 void
181 IpcIoFile::close()
182 {
183 assert(ioRequestor != NULL);
184
185 if (IamDiskProcess())
186 DiskerClose(SBuf(dbName.termedBuf()));
187 // XXX: else nothing to do?
188
189 ioRequestor->closeCompleted();
190 }
191
192 bool
193 IpcIoFile::canRead() const
194 {
195 return diskId >= 0 && !error_ && canWait();
196 }
197
198 bool
199 IpcIoFile::canWrite() const
200 {
201 return diskId >= 0 && !error_ && canWait();
202 }
203
204 bool
205 IpcIoFile::error() const
206 {
207 return error_;
208 }
209
210 void
211 IpcIoFile::read(ReadRequest *readRequest)
212 {
213 debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " <<
214 readRequest->offset << ")");
215
216 assert(ioRequestor != NULL);
217 assert(readRequest->offset >= 0);
218 Must(!error_);
219
220 //assert(minOffset < 0 || minOffset <= readRequest->offset);
221 //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
222
223 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
224 pending->readRequest = readRequest;
225 push(pending);
226 }
227
228 void
229 IpcIoFile::readCompleted(ReadRequest *readRequest,
230 IpcIoMsg *const response)
231 {
232 bool ioError = false;
233 if (!response) {
234 debugs(79, 3, HERE << "error: timeout");
235 ioError = true; // I/O timeout does not warrant setting error_?
236 } else {
237 if (response->xerrno) {
238 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read: " <<
239 xstrerr(response->xerrno));
240 ioError = error_ = true;
241 } else if (!response->page) {
242 debugs(79, DBG_IMPORTANT, "ERROR: " << dbName << " read ran " <<
243 "out of shared memory pages");
244 ioError = true;
245 } else {
246 const char *const buf = Ipc::Mem::PagePointer(response->page);
247 memcpy(readRequest->buf, buf, response->len);
248 }
249
250 Ipc::Mem::PutPage(response->page);
251 }
252
253 const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
254 const int errflag = ioError ? DISK_ERROR :DISK_OK;
255 ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
256 }
257
258 void
259 IpcIoFile::write(WriteRequest *writeRequest)
260 {
261 debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " <<
262 writeRequest->offset << ")");
263
264 assert(ioRequestor != NULL);
265 assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
266 assert(writeRequest->offset >= 0);
267 Must(!error_);
268
269 //assert(minOffset < 0 || minOffset <= writeRequest->offset);
270 //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
271
272 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
273 pending->writeRequest = writeRequest;
274 push(pending);
275 }
276
277 void
278 IpcIoFile::writeCompleted(WriteRequest *writeRequest,
279 const IpcIoMsg *const response)
280 {
281 bool ioError = false;
282 if (!response) {
283 debugs(79, 3, "disker " << diskId << " timeout");
284 ioError = true; // I/O timeout does not warrant setting error_?
285 } else if (response->xerrno) {
286 debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId <<
287 " error writing " << writeRequest->len << " bytes at " <<
288 writeRequest->offset << ": " << xstrerr(response->xerrno) <<
289 "; this worker will stop using " << dbName);
290 ioError = error_ = true;
291 } else if (response->len != writeRequest->len) {
292 debugs(79, DBG_IMPORTANT, "ERROR: disker " << diskId << " wrote " <<
293 response->len << " instead of " << writeRequest->len <<
294 " bytes (offset " << writeRequest->offset << "); " <<
295 "this worker will stop using " << dbName);
296 error_ = true;
297 }
298
299 if (writeRequest->free_func)
300 (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
301
302 if (!ioError) {
303 debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
304 diskId << " at " << writeRequest->offset);
305 }
306
307 const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
308 const int errflag = ioError ? DISK_ERROR :DISK_OK;
309 ioRequestor->writeCompleted(errflag, rlen, writeRequest);
310 }
311
312 bool
313 IpcIoFile::ioInProgress() const
314 {
315 return !olderRequests->empty() || !newerRequests->empty();
316 }
317
318 /// track a new pending request
319 void
320 IpcIoFile::trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending)
321 {
322 const std::pair<RequestMap::iterator,bool> result =
323 newerRequests->insert(std::make_pair(id, pending));
324 Must(result.second); // failures means that id was not unique
325 if (!timeoutCheckScheduled)
326 scheduleTimeoutCheck();
327 }
328
329 /// push an I/O request to disker
330 void
331 IpcIoFile::push(IpcIoPendingRequest *const pending)
332 {
333 // prevent queue overflows: check for responses to earlier requests
334 // warning: this call may result in indirect push() recursion
335 HandleResponses("before push");
336
337 debugs(47, 7, HERE);
338 Must(diskId >= 0);
339 Must(pending);
340 Must(pending->readRequest || pending->writeRequest);
341
342 IpcIoMsg ipcIo;
343 try {
344 if (++lastRequestId == 0) // don't use zero value as requestId
345 ++lastRequestId;
346 ipcIo.requestId = lastRequestId;
347 ipcIo.start = current_time;
348 if (pending->readRequest) {
349 ipcIo.command = IpcIo::cmdRead;
350 ipcIo.offset = pending->readRequest->offset;
351 ipcIo.len = pending->readRequest->len;
352 } else { // pending->writeRequest
353 Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
354 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
355 ipcIo.len = 0;
356 throw TexcHere("run out of shared memory pages for IPC I/O");
357 }
358 ipcIo.command = IpcIo::cmdWrite;
359 ipcIo.offset = pending->writeRequest->offset;
360 ipcIo.len = pending->writeRequest->len;
361 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
362 memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
363 }
364
365 debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
366
367 if (queue->push(diskId, ipcIo))
368 Notify(diskId); // must notify disker
369 trackPendingRequest(ipcIo.requestId, pending);
370 } catch (const Queue::Full &) {
371 debugs(47, DBG_IMPORTANT, "ERROR: worker I/O push queue for " <<
372 dbName << " overflow: " <<
373 SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
374 // TODO: grow queue size
375
376 pending->completeIo(NULL);
377 delete pending;
378 } catch (const TextException &e) {
379 debugs(47, DBG_IMPORTANT, "ERROR: " << dbName << " exception: " << e.what());
380 pending->completeIo(NULL);
381 delete pending;
382 }
383 }
384
385 /// whether we think there is enough time to complete the I/O
386 bool
387 IpcIoFile::canWait() const
388 {
389 if (!config.ioTimeout)
390 return true; // no timeout specified
391
392 IpcIoMsg oldestIo;
393 if (!queue->findOldest(diskId, oldestIo) || oldestIo.start.tv_sec <= 0)
394 return true; // we cannot estimate expected wait time; assume it is OK
395
396 const int oldestWait = tvSubMsec(oldestIo.start, current_time);
397
398 int rateWait = -1; // time in millisecons
399 const Ipc::QueueReader::Rate::Value ioRate = queue->rateLimit(diskId);
400 if (ioRate > 0) {
401 // if there are N requests pending, the new one will wait at
402 // least N/max-swap-rate seconds
403 rateWait = static_cast<int>(1e3 * queue->outSize(diskId) / ioRate);
404 // adjust N/max-swap-rate value based on the queue "balance"
405 // member, in case we have been borrowing time against future
406 // I/O already
407 rateWait += queue->balance(diskId);
408 }
409
410 const int expectedWait = max(oldestWait, rateWait);
411 if (expectedWait < 0 ||
412 static_cast<time_msec_t>(expectedWait) < config.ioTimeout)
413 return true; // expected wait time is acceptible
414
415 debugs(47,2, HERE << "cannot wait: " << expectedWait <<
416 " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId));
417 return false; // do not want to wait that long
418 }
419
420 /// called when coordinator responds to worker open request
421 void
422 IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
423 {
424 debugs(47, 7, HERE << "coordinator response to open request");
425 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
426 i != WaitingForOpen.end(); ++i) {
427 if (response.strand.tag == (*i)->dbName) {
428 (*i)->openCompleted(&response);
429 WaitingForOpen.erase(i);
430 return;
431 }
432 }
433
434 debugs(47, 4, HERE << "LATE disker response to open for " <<
435 response.strand.tag);
436 // nothing we can do about it; completeIo() has been called already
437 }
438
439 void
440 IpcIoFile::HandleResponses(const char *const when)
441 {
442 debugs(47, 4, HERE << "popping all " << when);
443 IpcIoMsg ipcIo;
444 // get all responses we can: since we are not pushing, this will stop
445 int diskId;
446 while (queue->pop(diskId, ipcIo)) {
447 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
448 Must(i != IpcIoFiles.end()); // TODO: warn but continue
449 i->second->handleResponse(ipcIo);
450 }
451 }
452
453 void
454 IpcIoFile::handleResponse(IpcIoMsg &ipcIo)
455 {
456 const int requestId = ipcIo.requestId;
457 debugs(47, 7, HERE << "popped disker response: " <<
458 SipcIo(KidIdentifier, ipcIo, diskId));
459
460 Must(requestId);
461 if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
462 pending->completeIo(&ipcIo);
463 delete pending; // XXX: leaking if throwing
464 } else {
465 debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command <<
466 "; ipcIo" << KidIdentifier << '.' << requestId);
467 // nothing we can do about it; completeIo() has been called already
468 }
469 }
470
471 void
472 IpcIoFile::Notify(const int peerId)
473 {
474 // TODO: Count and report the total number of notifications, pops, pushes.
475 debugs(47, 7, HERE << "kid" << peerId);
476 Ipc::TypedMsgHdr msg;
477 msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
478 msg.putInt(KidIdentifier);
479 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, peerId);
480 Ipc::SendMessage(addr, msg);
481 }
482
483 void
484 IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
485 {
486 const int from = msg.getInt();
487 debugs(47, 7, HERE << "from " << from);
488 queue->clearReaderSignal(from);
489 if (IamDiskProcess())
490 DiskerHandleRequests();
491 else
492 HandleResponses("after notification");
493 }
494
495 /// handles open request timeout
496 void
497 IpcIoFile::OpenTimeout(void *const param)
498 {
499 Must(param);
500 // the pointer is used for comparison only and not dereferenced
501 const IpcIoFile *const ipcIoFile =
502 reinterpret_cast<const IpcIoFile *>(param);
503 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
504 i != WaitingForOpen.end(); ++i) {
505 if (*i == ipcIoFile) {
506 (*i)->openCompleted(NULL);
507 WaitingForOpen.erase(i);
508 break;
509 }
510 }
511 }
512
513 /// IpcIoFile::checkTimeouts wrapper
514 void
515 IpcIoFile::CheckTimeouts(void *const param)
516 {
517 Must(param);
518 const int diskId = reinterpret_cast<uintptr_t>(param);
519 debugs(47, 7, HERE << "diskId=" << diskId);
520 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
521 if (i != IpcIoFiles.end())
522 i->second->checkTimeouts();
523 }
524
525 void
526 IpcIoFile::checkTimeouts()
527 {
528 timeoutCheckScheduled = false;
529
530 // last chance to recover in case a notification message was lost, etc.
531 const RequestMap::size_type timeoutsBefore = olderRequests->size();
532 HandleResponses("before timeout");
533 const RequestMap::size_type timeoutsNow = olderRequests->size();
534
535 if (timeoutsBefore > timeoutsNow) { // some requests were rescued
536 // notification message lost or significantly delayed?
537 debugs(47, DBG_IMPORTANT, "WARNING: communication with " << dbName <<
538 " may be too slow or disrupted for about " <<
539 Timeout << "s; rescued " << (timeoutsBefore - timeoutsNow) <<
540 " out of " << timeoutsBefore << " I/Os");
541 }
542
543 if (timeoutsNow) {
544 debugs(47, DBG_IMPORTANT, "WARNING: abandoning " <<
545 timeoutsNow << ' ' << dbName << " I/Os after at least " <<
546 Timeout << "s timeout");
547 }
548
549 // any old request would have timed out by now
550 typedef RequestMap::const_iterator RMCI;
551 for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
552 IpcIoPendingRequest *const pending = i->second;
553
554 const unsigned int requestId = i->first;
555 debugs(47, 7, HERE << "disker timeout; ipcIo" <<
556 KidIdentifier << '.' << requestId);
557
558 pending->completeIo(NULL); // no response
559 delete pending; // XXX: leaking if throwing
560 }
561 olderRequests->clear();
562
563 swap(olderRequests, newerRequests); // switches pointers around
564 if (!olderRequests->empty() && !timeoutCheckScheduled)
565 scheduleTimeoutCheck();
566 }
567
568 /// prepare to check for timeouts in a little while
569 void
570 IpcIoFile::scheduleTimeoutCheck()
571 {
572 // we check all older requests at once so some may be wait for 2*Timeout
573 eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
574 reinterpret_cast<void *>(diskId), Timeout, 0, false);
575 timeoutCheckScheduled = true;
576 }
577
578 /// returns and forgets the right IpcIoFile pending request
579 IpcIoPendingRequest *
580 IpcIoFile::dequeueRequest(const unsigned int requestId)
581 {
582 Must(requestId != 0);
583
584 RequestMap *map = NULL;
585 RequestMap::iterator i = requestMap1.find(requestId);
586
587 if (i != requestMap1.end())
588 map = &requestMap1;
589 else {
590 i = requestMap2.find(requestId);
591 if (i != requestMap2.end())
592 map = &requestMap2;
593 }
594
595 if (!map) // not found in both maps
596 return NULL;
597
598 IpcIoPendingRequest *pending = i->second;
599 map->erase(i);
600 return pending;
601 }
602
603 int
604 IpcIoFile::getFD() const
605 {
606 assert(false); // not supported; TODO: remove this method from API
607 return -1;
608 }
609
610 /* IpcIoMsg */
611
612 IpcIoMsg::IpcIoMsg():
613 requestId(0),
614 offset(0),
615 len(0),
616 command(IpcIo::cmdNone),
617 xerrno(0)
618 {
619 start.tv_sec = 0;
620 start.tv_usec = 0;
621 }
622
623 /* IpcIoPendingRequest */
624
625 IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
626 file(aFile), readRequest(NULL), writeRequest(NULL)
627 {
628 }
629
630 void
631 IpcIoPendingRequest::completeIo(IpcIoMsg *const response)
632 {
633 if (readRequest)
634 file->readCompleted(readRequest, response);
635 else if (writeRequest)
636 file->writeCompleted(writeRequest, response);
637 else {
638 Must(!response); // only timeouts are handled here
639 file->openCompleted(NULL);
640 }
641 }
642
643 /* XXX: disker code that should probably be moved elsewhere */
644
645 static SBuf DbName; ///< full db file name
646 static int TheFile = -1; ///< db file descriptor
647
648 static void
649 diskerRead(IpcIoMsg &ipcIo)
650 {
651 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
652 ipcIo.len = 0;
653 debugs(47,2, HERE << "run out of shared memory pages for IPC I/O");
654 return;
655 }
656
657 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
658 const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
659 ++statCounter.syscalls.disk.reads;
660 fd_bytes(TheFile, read, FD_READ);
661
662 if (read >= 0) {
663 ipcIo.xerrno = 0;
664 const size_t len = static_cast<size_t>(read); // safe because read > 0
665 debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
666 (len == ipcIo.len ? "all " : "just ") << read);
667 ipcIo.len = len;
668 } else {
669 ipcIo.xerrno = errno;
670 ipcIo.len = 0;
671 debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
672 ipcIo.xerrno);
673 }
674 }
675
676 /// Tries to write buffer to disk (a few times if needed);
677 /// sets ipcIo results, but does no cleanup. The caller must cleanup.
678 static void
679 diskerWriteAttempts(IpcIoMsg &ipcIo)
680 {
681 const char *buf = Ipc::Mem::PagePointer(ipcIo.page);
682 size_t toWrite = min(ipcIo.len, Ipc::Mem::PageSize());
683 size_t wroteSoFar = 0;
684 off_t offset = ipcIo.offset;
685 // Partial writes to disk do happen. It is unlikely that the caller can
686 // handle partial writes by doing something other than writing leftovers
687 // again, so we try to write them ourselves to minimize overheads.
688 const int attemptLimit = 10;
689 for (int attempts = 1; attempts <= attemptLimit; ++attempts) {
690 const ssize_t result = pwrite(TheFile, buf, toWrite, offset);
691 ++statCounter.syscalls.disk.writes;
692 fd_bytes(TheFile, result, FD_WRITE);
693
694 if (result < 0) {
695 ipcIo.xerrno = errno;
696 assert(ipcIo.xerrno);
697 debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " failure" <<
698 " writing " << toWrite << '/' << ipcIo.len <<
699 " at " << ipcIo.offset << '+' << wroteSoFar <<
700 " on " << attempts << " try: " << xstrerr(ipcIo.xerrno));
701 ipcIo.len = wroteSoFar;
702 return; // bail on error
703 }
704
705 const size_t wroteNow = static_cast<size_t>(result); // result >= 0
706 ipcIo.xerrno = 0;
707
708 debugs(47,3, "disker" << KidIdentifier << " wrote " <<
709 (wroteNow >= toWrite ? "all " : "just ") << wroteNow <<
710 " out of " << toWrite << '/' << ipcIo.len << " at " <<
711 ipcIo.offset << '+' << wroteSoFar << " on " << attempts <<
712 " try");
713
714 wroteSoFar += wroteNow;
715
716 if (wroteNow >= toWrite) {
717 ipcIo.xerrno = 0;
718 ipcIo.len = wroteSoFar;
719 return; // wrote everything there was to write
720 }
721
722 buf += wroteNow;
723 offset += wroteNow;
724 toWrite -= wroteNow;
725 }
726
727 debugs(47, DBG_IMPORTANT, "ERROR: " << DbName << " exhausted all " <<
728 attemptLimit << " attempts while writing " <<
729 toWrite << '/' << ipcIo.len << " at " << ipcIo.offset << '+' <<
730 wroteSoFar);
731 return; // not a fatal I/O error, unless the caller treats it as such
732 }
733
734 static void
735 diskerWrite(IpcIoMsg &ipcIo)
736 {
737 diskerWriteAttempts(ipcIo); // may fail
738 Ipc::Mem::PutPage(ipcIo.page);
739 }
740
741 void
742 IpcIoFile::DiskerHandleMoreRequests(void *source)
743 {
744 debugs(47, 7, HERE << "resuming handling requests after " <<
745 static_cast<const char *>(source));
746 DiskerHandleMoreRequestsScheduled = false;
747 IpcIoFile::DiskerHandleRequests();
748 }
749
750 bool
751 IpcIoFile::WaitBeforePop()
752 {
753 const Ipc::QueueReader::Rate::Value ioRate = queue->localRateLimit();
754 const double maxRate = ioRate/1e3; // req/ms
755
756 // do we need to enforce configured I/O rate?
757 if (maxRate <= 0)
758 return false;
759
760 // is there an I/O request we could potentially delay?
761 int processId;
762 IpcIoMsg ipcIo;
763 if (!queue->peek(processId, ipcIo)) {
764 // unlike pop(), peek() is not reliable and does not block reader
765 // so we must proceed with pop() even if it is likely to fail
766 return false;
767 }
768
769 static timeval LastIo = current_time;
770
771 const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os
772 // do not accumulate more than 100ms or 100 I/Os, whichever is smaller
773 const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration));
774
775 const double credit = ioDuration; // what the last I/O should have cost us
776 const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O
777 LastIo = current_time;
778
779 Ipc::QueueReader::Balance &balance = queue->localBalance();
780 balance += static_cast<int64_t>(credit - debit);
781
782 debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit);
783
784 if (ipcIo.command == IpcIo::cmdWrite && balance > maxImbalance) {
785 // if the next request is (likely) write and we accumulated
786 // too much time for future slow I/Os, then shed accumulated
787 // time to keep just half of the excess
788 const int64_t toSpend = balance - maxImbalance/2;
789
790 if (toSpend/1e3 > Timeout)
791 debugs(47, DBG_IMPORTANT, "WARNING: " << DbName << " delays " <<
792 "I/O requests for " << (toSpend/1e3) << " seconds " <<
793 "to obey " << ioRate << "/sec rate limit");
794
795 debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" <<
796 (1e3*maxRate) << "/sec rate");
797 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
798 &IpcIoFile::DiskerHandleMoreRequests,
799 const_cast<char*>("rate limiting"),
800 toSpend/1e3, 0, false);
801 DiskerHandleMoreRequestsScheduled = true;
802 return true;
803 } else if (balance < -maxImbalance) {
804 // do not owe "too much" to avoid "too large" bursts of I/O
805 balance = -maxImbalance;
806 }
807
808 return false;
809 }
810
811 void
812 IpcIoFile::DiskerHandleRequests()
813 {
814 // Balance our desire to maximize the number of concurrent I/O requests
815 // (reordred by OS to minimize seek time) with a requirement to
816 // send 1st-I/O notification messages, process Coordinator events, etc.
817 const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms
818 const timeval loopStart = current_time;
819
820 int popped = 0;
821 int workerId = 0;
822 IpcIoMsg ipcIo;
823 while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) {
824 ++popped;
825
826 // at least one I/O per call is guaranteed if the queue is not empty
827 DiskerHandleRequest(workerId, ipcIo);
828
829 getCurrentTime();
830 const double elapsedMsec = tvSubMsec(loopStart, current_time);
831 if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) {
832 if (!DiskerHandleMoreRequestsScheduled) {
833 // the gap must be positive for select(2) to be given a chance
834 const double minBreakSecs = 0.001;
835 eventAdd("IpcIoFile::DiskerHandleMoreRequests",
836 &IpcIoFile::DiskerHandleMoreRequests,
837 const_cast<char*>("long I/O loop"),
838 minBreakSecs, 0, false);
839 DiskerHandleMoreRequestsScheduled = true;
840 }
841 debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " <<
842 elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O");
843 break;
844 }
845 }
846
847 // TODO: consider using O_DIRECT with "elevator" optimization where we pop
848 // requests first, then reorder the popped requests to optimize seek time,
849 // then do I/O, then take a break, and come back for the next set of I/O
850 // requests.
851 }
852
853 /// called when disker receives an I/O request
854 void
855 IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
856 {
857 if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
858 debugs(0, DBG_CRITICAL, "ERROR: " << DbName <<
859 " should not receive " << ipcIo.command <<
860 " ipcIo" << workerId << '.' << ipcIo.requestId);
861 return;
862 }
863
864 debugs(47,5, HERE << "disker" << KidIdentifier <<
865 (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
866 ipcIo.len << " at " << ipcIo.offset <<
867 " ipcIo" << workerId << '.' << ipcIo.requestId);
868
869 if (ipcIo.command == IpcIo::cmdRead)
870 diskerRead(ipcIo);
871 else // ipcIo.command == IpcIo::cmdWrite
872 diskerWrite(ipcIo);
873
874 debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
875
876 try {
877 if (queue->push(workerId, ipcIo))
878 Notify(workerId); // must notify worker
879 } catch (const Queue::Full &) {
880 // The worker queue should not overflow because the worker should pop()
881 // before push()ing and because if disker pops N requests at a time,
882 // we should make sure the worker pop() queue length is the worker
883 // push queue length plus N+1. XXX: implement the N+1 difference.
884 debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue for " <<
885 DbName << " overflow: " <<
886 SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
887
888 // the I/O request we could not push will timeout
889 }
890 }
891
892 static bool
893 DiskerOpen(const SBuf &path, int flags, mode_t mode)
894 {
895 assert(TheFile < 0);
896
897 DbName = path;
898 TheFile = file_open(DbName.c_str(), flags);
899
900 if (TheFile < 0) {
901 const int xerrno = errno;
902 debugs(47, DBG_CRITICAL, "ERROR: cannot open " << DbName << ": " <<
903 xstrerr(xerrno));
904 return false;
905 }
906
907 ++store_open_disk_fd;
908 debugs(79,3, "rock db opened " << DbName << ": FD " << TheFile);
909 return true;
910 }
911
912 static void
913 DiskerClose(const SBuf &path)
914 {
915 if (TheFile >= 0) {
916 file_close(TheFile);
917 debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
918 TheFile = -1;
919 --store_open_disk_fd;
920 }
921 DbName.clear();
922 }
923
924 /// reports our needs for shared memory pages to Ipc::Mem::Pages
925 /// and initializes shared memory segments used by IpcIoFile
926 class IpcIoRr: public Ipc::Mem::RegisteredRunner
927 {
928 public:
929 /* RegisteredRunner API */
930 IpcIoRr(): owner(NULL) {}
931 virtual ~IpcIoRr();
932 virtual void claimMemoryNeeds();
933
934 protected:
935 /* Ipc::Mem::RegisteredRunner API */
936 virtual void create();
937
938 private:
939 Ipc::FewToFewBiQueue::Owner *owner;
940 };
941
942 RunnerRegistrationEntry(IpcIoRr);
943
944 void
945 IpcIoRr::claimMemoryNeeds()
946 {
947 const int itemsCount = Ipc::FewToFewBiQueue::MaxItemsCount(
948 ::Config.workers, ::Config.cacheSwap.n_strands, QueueCapacity);
949 // the maximum number of shared I/O pages is approximately the
950 // number of queue slots, we add a fudge factor to that to account
951 // for corner cases where I/O pages are created before queue
952 // limits are checked or destroyed long after the I/O is dequeued
953 Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage,
954 static_cast<int>(itemsCount * 1.1));
955 }
956
957 void
958 IpcIoRr::create()
959 {
960 if (Config.cacheSwap.n_strands <= 0)
961 return;
962
963 Must(!owner);
964 owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1,
965 Config.cacheSwap.n_strands,
966 1 + Config.workers, sizeof(IpcIoMsg),
967 QueueCapacity);
968 }
969
970 IpcIoRr::~IpcIoRr()
971 {
972 delete owner;
973 }