]> git.ipfire.org Git - thirdparty/squid.git/blame - src/DiskIO/IpcIo/IpcIoFile.cc
Separate shared page limits for different purposes.
[thirdparty/squid.git] / src / DiskIO / IpcIo / IpcIoFile.cc
CommitLineData
254912f3
AR
1/*
2 * $Id$
3 *
4 * DEBUG: section 47 Store Directory Routines
5 */
6
7#include "config.h"
f5591061 8#include "base/RunnersRegistry.h"
254912f3
AR
9#include "base/TextException.h"
10#include "DiskIO/IORequestor.h"
11#include "DiskIO/IpcIo/IpcIoFile.h"
12#include "DiskIO/ReadRequest.h"
13#include "DiskIO/WriteRequest.h"
14#include "ipc/Messages.h"
15#include "ipc/Port.h"
f5591061 16#include "ipc/Queue.h"
9a51593d 17#include "ipc/StrandSearch.h"
254912f3 18#include "ipc/UdsOp.h"
8ed94021 19#include "ipc/mem/Pages.h"
254912f3
AR
20
21CBDATA_CLASS_INIT(IpcIoFile);
22
f5591061
DK
23/// shared memory segment path to use for IpcIoFile maps
24static const char *const ShmLabel = "io_file";
25
fa61cefe 26const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
b2aa0934
DK
27IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
28IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
f5591061 29std::auto_ptr<IpcIoFile::Queue> IpcIoFile::queue;
254912f3
AR
30
31static bool DiskerOpen(const String &path, int flags, mode_t mode);
32static void DiskerClose(const String &path);
33
fa61cefe
AR
34/// IpcIo wrapper for debugs() streams; XXX: find a better class name
35struct SipcIo {
36 SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
37 worker(aWorker), msg(aMsg), disker(aDisker) {}
38
39 int worker;
40 const IpcIoMsg &msg;
41 int disker;
42};
43
44std::ostream &
45operator <<(std::ostream &os, const SipcIo &sio)
46{
47 return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
48 (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker;
49}
50
254912f3
AR
51
52IpcIoFile::IpcIoFile(char const *aDb):
f5591061 53 dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
b2aa0934 54 olderRequests(&requestMap1), newerRequests(&requestMap2),
9a51593d 55 timeoutCheckScheduled(false)
254912f3
AR
56{
57}
58
59IpcIoFile::~IpcIoFile()
60{
b2aa0934
DK
61 if (diskId >= 0) {
62 const IpcIoFilesMap::iterator i = IpcIoFiles.find(diskId);
63 // XXX: warn and continue?
64 Must(i != IpcIoFiles.end());
65 Must(i->second == this);
66 IpcIoFiles.erase(i);
67 }
254912f3
AR
68}
69
70void
71IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
72{
73 ioRequestor = callback;
74 Must(diskId < 0); // we do not know our disker yet
f5591061
DK
75
76 if (!queue.get())
77 queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier));
254912f3
AR
78
79 if (IamDiskProcess()) {
80 error_ = !DiskerOpen(dbName, flags, mode);
f7091279
DK
81 if (error_)
82 return;
83
b2aa0934
DK
84 diskId = KidIdentifier;
85 const bool inserted =
86 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
87 Must(inserted);
9a51593d 88
f7091279
DK
89 Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
90 ann.strand.tag = dbName;
91 Ipc::TypedMsgHdr message;
92 ann.pack(message);
93 SendMessage(Ipc::coordinatorAddr, message);
94
e528e570 95 ioRequestor->ioCompletedNotification();
254912f3 96 return;
9a51593d 97 }
254912f3 98
9a51593d
DK
99 Ipc::StrandSearchRequest request;
100 request.requestorId = KidIdentifier;
101 request.tag = dbName;
254912f3 102
9a51593d
DK
103 Ipc::TypedMsgHdr msg;
104 request.pack(msg);
105 Ipc::SendMessage(Ipc::coordinatorAddr, msg);
106
b2aa0934
DK
107 WaitingForOpen.push_back(this);
108
109 eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
110 this, Timeout, 0, false); // "this" pointer is used as id
254912f3
AR
111}
112
113void
9a51593d 114IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) {
9a51593d 115 Must(diskId < 0); // we do not know our disker yet
9a51593d
DK
116
117 if (!response) {
caca86d7
AR
118 debugs(79,1, HERE << "error: timeout");
119 error_ = true;
9a51593d
DK
120 } else {
121 diskId = response->strand.kidId;
122 if (diskId >= 0) {
b2aa0934
DK
123 const bool inserted =
124 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
125 Must(inserted);
9a51593d 126 } else {
254912f3
AR
127 error_ = true;
128 debugs(79,1, HERE << "error: no disker claimed " << dbName);
9a51593d
DK
129 }
130 }
254912f3
AR
131
132 ioRequestor->ioCompletedNotification();
133}
134
135/**
136 * Alias for IpcIoFile::open(...)
137 \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
138 */
139void
140IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
141{
142 assert(false); // check
143 /* We use the same logic path for open */
144 open(flags, mode, callback);
145}
146
147void
148IpcIoFile::close()
149{
150 assert(ioRequestor != NULL);
151
152 if (IamDiskProcess())
153 DiskerClose(dbName);
9a51593d 154 // XXX: else nothing to do?
254912f3
AR
155
156 ioRequestor->closeCompleted();
157}
158
159bool
160IpcIoFile::canRead() const
161{
162 return diskId >= 0;
163}
164
165bool
166IpcIoFile::canWrite() const
167{
168 return diskId >= 0;
169}
170
171bool
172IpcIoFile::error() const
173{
174 return error_;
175}
176
177void
178IpcIoFile::read(ReadRequest *readRequest)
179{
180 debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " <<
181 readRequest->offset << ")");
182
183 assert(ioRequestor != NULL);
184 assert(readRequest->len >= 0);
185 assert(readRequest->offset >= 0);
186 Must(!error_);
187
188 //assert(minOffset < 0 || minOffset <= readRequest->offset);
189 //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
190
9a51593d 191 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
254912f3 192 pending->readRequest = readRequest;
7a907247 193 push(pending);
254912f3
AR
194}
195
196void
197IpcIoFile::readCompleted(ReadRequest *readRequest,
8ed94021 198 IpcIoMsg *const response)
254912f3 199{
caca86d7 200 bool ioError = false;
9a51593d 201 if (!response) {
caca86d7
AR
202 debugs(79,1, HERE << "error: timeout");
203 ioError = true; // I/O timeout does not warrant setting error_?
9a51593d
DK
204 } else
205 if (response->xerrno) {
206 debugs(79,1, HERE << "error: " << xstrerr(response->xerrno));
caca86d7 207 ioError = error_ = true;
8ed94021
DK
208 }
209 else
210 if (!response->page) {
211 debugs(79,1, HERE << "error: run out of shared memory pages");
212 ioError = true;
9a51593d 213 } else {
8ed94021
DK
214 const char *const buf = Ipc::Mem::PagePointer(response->page);
215 memcpy(readRequest->buf, buf, response->len);
20b0e1fe 216 }
254912f3 217
8ed94021
DK
218 Ipc::Mem::PutPage(response->page);
219
caca86d7
AR
220 const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
221 const int errflag = ioError ? DISK_ERROR : DISK_OK;
20b0e1fe 222 ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
254912f3
AR
223}
224
225void
226IpcIoFile::write(WriteRequest *writeRequest)
227{
228 debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " <<
229 writeRequest->offset << ")");
230
231 assert(ioRequestor != NULL);
232 assert(writeRequest->len >= 0);
233 assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
234 assert(writeRequest->offset >= 0);
235 Must(!error_);
236
237 //assert(minOffset < 0 || minOffset <= writeRequest->offset);
238 //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
239
9a51593d 240 IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
254912f3 241 pending->writeRequest = writeRequest;
7a907247 242 push(pending);
254912f3
AR
243}
244
245void
246IpcIoFile::writeCompleted(WriteRequest *writeRequest,
9a51593d 247 const IpcIoMsg *const response)
254912f3 248{
caca86d7 249 bool ioError = false;
9a51593d 250 if (!response) {
caca86d7
AR
251 debugs(79,1, HERE << "error: timeout");
252 ioError = true; // I/O timeout does not warrant setting error_?
254912f3 253 } else
9a51593d
DK
254 if (response->xerrno) {
255 debugs(79,1, HERE << "error: " << xstrerr(response->xerrno));
256 ioError = error_ = true;
257 } else
258 if (response->len != writeRequest->len) {
259 debugs(79,1, HERE << "problem: " << response->len << " < " << writeRequest->len);
254912f3
AR
260 error_ = true;
261 }
262
263 if (writeRequest->free_func)
264 (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
265
caca86d7 266 if (!ioError) {
254912f3
AR
267 debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
268 diskId << " at " << writeRequest->offset);
269 }
270
caca86d7
AR
271 const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len;
272 const int errflag = ioError ? DISK_ERROR : DISK_OK;
254912f3
AR
273 ioRequestor->writeCompleted(errflag, rlen, writeRequest);
274}
275
276bool
277IpcIoFile::ioInProgress() const
278{
9a51593d 279 return !olderRequests->empty() || !newerRequests->empty();
254912f3
AR
280}
281
9a51593d 282/// track a new pending request
254912f3 283void
7a907247 284IpcIoFile::trackPendingRequest(IpcIoPendingRequest *const pending)
254912f3 285{
7a907247 286 newerRequests->insert(std::make_pair(lastRequestId, pending));
9a51593d
DK
287 if (!timeoutCheckScheduled)
288 scheduleTimeoutCheck();
289}
254912f3 290
9a51593d
DK
291/// push an I/O request to disker
292void
7a907247 293IpcIoFile::push(IpcIoPendingRequest *const pending)
9a51593d 294{
fa61cefe 295 // prevent queue overflows: check for responses to earlier requests
f5591061 296 HandleResponses("before push");
fa61cefe 297
a1c98830 298 debugs(47, 7, HERE);
9a51593d 299 Must(diskId >= 0);
7a907247
DK
300 Must(pending);
301 Must(pending->readRequest || pending->writeRequest);
9a51593d
DK
302
303 IpcIoMsg ipcIo;
8ed94021
DK
304 try {
305 ipcIo.requestId = lastRequestId;
306 if (pending->readRequest) {
307 ipcIo.command = IpcIo::cmdRead;
308 ipcIo.offset = pending->readRequest->offset;
309 ipcIo.len = pending->readRequest->len;
310 } else { // pending->writeRequest
311 Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
551f8a18 312 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
8ed94021 313 ipcIo.len = 0;
551f8a18 314 throw TexcHere("run out of shared memory pages for IPC I/O");
8ed94021
DK
315 }
316 ipcIo.command = IpcIo::cmdWrite;
317 ipcIo.offset = pending->writeRequest->offset;
318 ipcIo.len = pending->writeRequest->len;
319 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
320 memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
321 }
254912f3 322
f5591061 323 debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
fa61cefe 324
f5591061 325 if (queue->push(diskId, ipcIo))
fa61cefe 326 Notify(diskId); // must notify disker
7a907247 327 trackPendingRequest(pending);
f5591061 328 } catch (const Queue::Full &) {
fa61cefe
AR
329 debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " <<
330 SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
331 // TODO: grow queue size
8ed94021
DK
332
333 pending->completeIo(NULL); // XXX: should distinguish this from timeout
334 delete pending;
335 } catch (const TextException &e) {
336 debugs(47, DBG_IMPORTANT, HERE << e.what());
fa61cefe 337 pending->completeIo(NULL); // XXX: should distinguish this from timeout
7a907247 338 delete pending;
9a51593d 339 }
254912f3
AR
340}
341
9a51593d 342/// called when coordinator responds to worker open request
254912f3 343void
9a51593d 344IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
254912f3 345{
9a51593d 346 debugs(47, 7, HERE << "coordinator response to open request");
b2aa0934
DK
347 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
348 i != WaitingForOpen.end(); ++i) {
349 if (response.strand.tag == (*i)->dbName) {
350 (*i)->openCompleted(&response);
351 WaitingForOpen.erase(i);
352 return;
353 }
354 }
355
356 debugs(47, 4, HERE << "LATE disker response to open for " <<
357 response.strand.tag);
358 // nothing we can do about it; completeIo() has been called already
9a51593d 359}
254912f3 360
9a51593d 361void
f5591061 362IpcIoFile::HandleResponses(const char *const when)
fa61cefe
AR
363{
364 debugs(47, 4, HERE << "popping all " << when);
fa61cefe
AR
365 IpcIoMsg ipcIo;
366 // get all responses we can: since we are not pushing, this will stop
f5591061
DK
367 int diskId;
368 while (queue->pop(diskId, ipcIo)) {
369 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
370 Must(i != IpcIoFiles.end()); // TODO: warn but continue
371 i->second->handleResponse(ipcIo);
372 }
9a51593d 373}
254912f3 374
9a51593d 375void
8ed94021 376IpcIoFile::handleResponse(IpcIoMsg &ipcIo)
9a51593d
DK
377{
378 const int requestId = ipcIo.requestId;
fa61cefe 379 debugs(47, 7, HERE << "popped disker response: " <<
f5591061 380 SipcIo(KidIdentifier, ipcIo, diskId));
fa61cefe 381
9a51593d
DK
382 Must(requestId);
383 if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
384 pending->completeIo(&ipcIo);
caca86d7
AR
385 delete pending; // XXX: leaking if throwing
386 } else {
9a51593d
DK
387 debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command <<
388 "; ipcIo" << KidIdentifier << '.' << requestId);
caca86d7
AR
389 // nothing we can do about it; completeIo() has been called already
390 }
254912f3
AR
391}
392
254912f3 393void
9a51593d 394IpcIoFile::Notify(const int peerId)
254912f3 395{
fa61cefe 396 // TODO: Count and report the total number of notifications, pops, pushes.
a1c98830 397 debugs(47, 7, HERE << "kid" << peerId);
9a51593d
DK
398 Ipc::TypedMsgHdr msg;
399 msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
400 msg.putInt(KidIdentifier);
401 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, peerId);
402 Ipc::SendMessage(addr, msg);
403}
404
405void
406IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
407{
fa61cefe
AR
408 const int from = msg.getInt();
409 debugs(47, 7, HERE << "from " << from);
f5591061
DK
410 queue->clearReaderSignal(from);
411 if (IamDiskProcess())
412 DiskerHandleRequests();
413 else
414 HandleResponses("after notification");
9a51593d
DK
415}
416
b2aa0934
DK
417/// handles open request timeout
418void
419IpcIoFile::OpenTimeout(void *const param)
420{
421 Must(param);
422 // the pointer is used for comparison only and not dereferenced
423 const IpcIoFile *const ipcIoFile =
424 reinterpret_cast<const IpcIoFile *>(param);
425 for (IpcIoFileList::iterator i = WaitingForOpen.begin();
426 i != WaitingForOpen.end(); ++i) {
427 if (*i == ipcIoFile) {
428 (*i)->openCompleted(NULL);
429 WaitingForOpen.erase(i);
430 break;
431 }
432 }
433}
434
9a51593d
DK
435/// IpcIoFile::checkTimeouts wrapper
436void
437IpcIoFile::CheckTimeouts(void *const param)
438{
9a51593d 439 Must(param);
b2aa0934
DK
440 const int diskId = reinterpret_cast<uintptr_t>(param);
441 debugs(47, 7, HERE << "diskId=" << diskId);
442 const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
443 if (i != IpcIoFiles.end())
444 i->second->checkTimeouts();
9a51593d
DK
445}
446
447void
448IpcIoFile::checkTimeouts()
449{
9a51593d 450 timeoutCheckScheduled = false;
254912f3 451
caca86d7
AR
452 // any old request would have timed out by now
453 typedef RequestMap::const_iterator RMCI;
9a51593d
DK
454 for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
455 IpcIoPendingRequest *const pending = i->second;
254912f3 456
b2aa0934 457 const unsigned int requestId = i->first;
caca86d7 458 debugs(47, 7, HERE << "disker timeout; ipcIo" <<
b2aa0934 459 KidIdentifier << '.' << requestId);
254912f3 460
caca86d7
AR
461 pending->completeIo(NULL); // no response
462 delete pending; // XXX: leaking if throwing
9a51593d
DK
463 }
464 olderRequests->clear();
caca86d7 465
9a51593d
DK
466 swap(olderRequests, newerRequests); // switches pointers around
467 if (!olderRequests->empty())
468 scheduleTimeoutCheck();
254912f3
AR
469}
470
caca86d7 471/// prepare to check for timeouts in a little while
254912f3 472void
9a51593d 473IpcIoFile::scheduleTimeoutCheck()
254912f3 474{
b2aa0934 475 // we check all older requests at once so some may be wait for 2*Timeout
caca86d7 476 eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
b2aa0934 477 reinterpret_cast<void *>(diskId), Timeout, 0, false);
9a51593d 478 timeoutCheckScheduled = true;
254912f3
AR
479}
480
481/// returns and forgets the right IpcIoFile pending request
482IpcIoPendingRequest *
9a51593d 483IpcIoFile::dequeueRequest(const unsigned int requestId)
254912f3 484{
254912f3 485 Must(requestId != 0);
caca86d7 486
9a51593d
DK
487 RequestMap *map = NULL;
488 RequestMap::iterator i = requestMap1.find(requestId);
caca86d7 489
9a51593d
DK
490 if (i != requestMap1.end())
491 map = &requestMap1;
caca86d7 492 else {
9a51593d
DK
493 i = requestMap2.find(requestId);
494 if (i != requestMap2.end())
495 map = &requestMap2;
caca86d7
AR
496 }
497
498 if (!map) // not found in both maps
499 return NULL;
500
501 IpcIoPendingRequest *pending = i->second;
502 map->erase(i);
503 return pending;
254912f3
AR
504}
505
506int
9a51593d 507IpcIoFile::getFD() const
254912f3
AR
508{
509 assert(false); // not supported; TODO: remove this method from API
510 return -1;
511}
512
513
9a51593d 514/* IpcIoMsg */
254912f3 515
9a51593d
DK
516IpcIoMsg::IpcIoMsg():
517 requestId(0), offset(0), len(0), command(IpcIo::cmdNone), xerrno(0)
254912f3 518{
254912f3
AR
519}
520
caca86d7 521/* IpcIoPendingRequest */
254912f3
AR
522
523IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
524 file(aFile), readRequest(NULL), writeRequest(NULL)
525{
b2aa0934 526 Must(file != NULL);
9a51593d
DK
527 if (++file->lastRequestId == 0) // don't use zero value as requestId
528 ++file->lastRequestId;
254912f3
AR
529}
530
caca86d7 531void
8ed94021 532IpcIoPendingRequest::completeIo(IpcIoMsg *const response)
caca86d7 533{
caca86d7
AR
534 if (readRequest)
535 file->readCompleted(readRequest, response);
536 else
537 if (writeRequest)
538 file->writeCompleted(writeRequest, response);
9a51593d
DK
539 else {
540 Must(!response); // only timeouts are handled here
541 file->openCompleted(NULL);
542 }
caca86d7
AR
543}
544
545
254912f3
AR
546
547/* XXX: disker code that should probably be moved elsewhere */
548
549static int TheFile = -1; ///< db file descriptor
550
9a51593d
DK
551static void
552diskerRead(IpcIoMsg &ipcIo)
254912f3 553{
551f8a18 554 if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) {
8ed94021 555 ipcIo.len = 0;
551f8a18 556 debugs(47,5, HERE << "run out of shared memory pages for IPC I/O");
8ed94021
DK
557 return;
558 }
559
560 char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
561 const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
2d338731
AR
562 statCounter.syscalls.disk.reads++;
563 fd_bytes(TheFile, read, FD_READ);
564
254912f3 565 if (read >= 0) {
9a51593d
DK
566 ipcIo.xerrno = 0;
567 const size_t len = static_cast<size_t>(read); // safe because read > 0
254912f3 568 debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
9a51593d
DK
569 (len == ipcIo.len ? "all " : "just ") << read);
570 ipcIo.len = len;
571 } else {
572 ipcIo.xerrno = errno;
573 ipcIo.len = 0;
254912f3 574 debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
9a51593d
DK
575 ipcIo.xerrno);
576 }
254912f3
AR
577}
578
9a51593d
DK
579static void
580diskerWrite(IpcIoMsg &ipcIo)
254912f3 581{
8ed94021
DK
582 const char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
583 const ssize_t wrote = pwrite(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
2d338731
AR
584 statCounter.syscalls.disk.writes++;
585 fd_bytes(TheFile, wrote, FD_WRITE);
586
254912f3 587 if (wrote >= 0) {
9a51593d
DK
588 ipcIo.xerrno = 0;
589 const size_t len = static_cast<size_t>(wrote); // safe because wrote > 0
254912f3 590 debugs(47,8, HERE << "disker" << KidIdentifier << " wrote " <<
9a51593d
DK
591 (len == ipcIo.len ? "all " : "just ") << wrote);
592 ipcIo.len = len;
593 } else {
594 ipcIo.xerrno = errno;
595 ipcIo.len = 0;
254912f3 596 debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " <<
9a51593d
DK
597 ipcIo.xerrno);
598 }
8ed94021
DK
599
600 Ipc::Mem::PutPage(ipcIo.page);
254912f3
AR
601}
602
254912f3 603void
f5591061 604IpcIoFile::DiskerHandleRequests()
254912f3 605{
fa61cefe
AR
606 int workerId = 0;
607 IpcIoMsg ipcIo;
f5591061 608 while (queue->pop(workerId, ipcIo))
fa61cefe
AR
609 DiskerHandleRequest(workerId, ipcIo);
610
611 // TODO: If the loop keeps on looping, we probably should take a break
612 // once in a while to update clock, read Coordinator messages, etc.
613 // This can be combined with "elevator" optimization where we get up to N
614 // requests first, then reorder the popped requests to optimize seek time,
615 // then do I/O, then take a break, and come back for the next set of I/O
616 // requests.
9a51593d 617}
254912f3 618
9a51593d
DK
619/// called when disker receives an I/O request
620void
621IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
622{
9a51593d 623 if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
254912f3 624 debugs(0,0, HERE << "disker" << KidIdentifier <<
9a51593d
DK
625 " should not receive " << ipcIo.command <<
626 " ipcIo" << workerId << '.' << ipcIo.requestId);
627 return;
628 }
629
630 debugs(47,5, HERE << "disker" << KidIdentifier <<
631 (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
632 ipcIo.len << " at " << ipcIo.offset <<
633 " ipcIo" << workerId << '.' << ipcIo.requestId);
634
635 if (ipcIo.command == IpcIo::cmdRead)
636 diskerRead(ipcIo);
637 else // ipcIo.command == IpcIo::cmdWrite
638 diskerWrite(ipcIo);
639
f5591061 640 debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
fa61cefe 641
7a907247 642 try {
f5591061 643 if (queue->push(workerId, ipcIo))
5e44782e 644 Notify(workerId); // must notify worker
f5591061 645 } catch (const Queue::Full &) {
fa61cefe
AR
646 // The worker queue should not overflow because the worker should pop()
647 // before push()ing and because if disker pops N requests at a time,
648 // we should make sure the worker pop() queue length is the worker
649 // push queue length plus N+1. XXX: implement the N+1 difference.
650 debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue overflow: " <<
5e44782e 651 SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
fa61cefe
AR
652
653 // the I/O request we could not push will timeout
7a907247 654 }
254912f3
AR
655}
656
657static bool
658DiskerOpen(const String &path, int flags, mode_t mode)
659{
660 assert(TheFile < 0);
661
662 TheFile = file_open(path.termedBuf(), flags);
663
664 if (TheFile < 0) {
665 const int xerrno = errno;
666 debugs(47,0, HERE << "rock db error opening " << path << ": " <<
667 xstrerr(xerrno));
668 return false;
9a51593d 669 }
254912f3
AR
670
671 store_open_disk_fd++;
672 debugs(79,3, HERE << "rock db opened " << path << ": FD " << TheFile);
9a51593d 673 return true;
254912f3
AR
674}
675
676static void
677DiskerClose(const String &path)
678{
679 if (TheFile >= 0) {
680 file_close(TheFile);
681 debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
682 TheFile = -1;
683 store_open_disk_fd--;
a1c98830 684 }
254912f3 685}
f5591061
DK
686
687
688/// initializes shared memory segments used by IpcIoFile
689class IpcIoRr: public RegisteredRunner
690{
691public:
692 /* RegisteredRunner API */
693 IpcIoRr(): owner(NULL) {}
694 virtual void run(const RunnerRegistry &);
695 virtual ~IpcIoRr();
696
697private:
698 Ipc::FewToFewBiQueue::Owner *owner;
699};
700
701RunnerRegistrationEntry(rrAfterConfig, IpcIoRr);
702
703
704void IpcIoRr::run(const RunnerRegistry &)
705{
706 if (!UsingSmp())
707 return;
708
709 if (IamMasterProcess()) {
710 Must(!owner);
711 // XXX: make capacity configurable
712 owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1, Config.cacheSwap.n_configured, 1 + Config.workers, sizeof(IpcIoMsg), 1024);
713 }
714}
715
716IpcIoRr::~IpcIoRr()
717{
718 delete owner;
719}