]>
Commit | Line | Data |
---|---|---|
254912f3 AR |
1 | /* |
2 | * $Id$ | |
3 | * | |
4 | * DEBUG: section 47 Store Directory Routines | |
5 | */ | |
6 | ||
7 | #include "config.h" | |
f5591061 | 8 | #include "base/RunnersRegistry.h" |
254912f3 AR |
9 | #include "base/TextException.h" |
10 | #include "DiskIO/IORequestor.h" | |
11 | #include "DiskIO/IpcIo/IpcIoFile.h" | |
12 | #include "DiskIO/ReadRequest.h" | |
13 | #include "DiskIO/WriteRequest.h" | |
14 | #include "ipc/Messages.h" | |
15 | #include "ipc/Port.h" | |
f5591061 | 16 | #include "ipc/Queue.h" |
9a51593d | 17 | #include "ipc/StrandSearch.h" |
254912f3 | 18 | #include "ipc/UdsOp.h" |
8ed94021 | 19 | #include "ipc/mem/Pages.h" |
254912f3 AR |
20 | |
21 | CBDATA_CLASS_INIT(IpcIoFile); | |
22 | ||
f5591061 DK |
23 | /// shared memory segment path to use for IpcIoFile maps |
24 | static const char *const ShmLabel = "io_file"; | |
25 | ||
fa61cefe | 26 | const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more |
b2aa0934 DK |
27 | IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen; |
28 | IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles; | |
f5591061 | 29 | std::auto_ptr<IpcIoFile::Queue> IpcIoFile::queue; |
254912f3 AR |
30 | |
31 | static bool DiskerOpen(const String &path, int flags, mode_t mode); | |
32 | static void DiskerClose(const String &path); | |
33 | ||
fa61cefe AR |
34 | /// IpcIo wrapper for debugs() streams; XXX: find a better class name |
35 | struct SipcIo { | |
36 | SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker): | |
37 | worker(aWorker), msg(aMsg), disker(aDisker) {} | |
38 | ||
39 | int worker; | |
40 | const IpcIoMsg &msg; | |
41 | int disker; | |
42 | }; | |
43 | ||
44 | std::ostream & | |
45 | operator <<(std::ostream &os, const SipcIo &sio) | |
46 | { | |
47 | return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId << | |
48 | (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker; | |
49 | } | |
50 | ||
254912f3 AR |
51 | |
52 | IpcIoFile::IpcIoFile(char const *aDb): | |
f5591061 | 53 | dbName(aDb), diskId(-1), error_(false), lastRequestId(0), |
b2aa0934 | 54 | olderRequests(&requestMap1), newerRequests(&requestMap2), |
9a51593d | 55 | timeoutCheckScheduled(false) |
254912f3 AR |
56 | { |
57 | } | |
58 | ||
59 | IpcIoFile::~IpcIoFile() | |
60 | { | |
b2aa0934 DK |
61 | if (diskId >= 0) { |
62 | const IpcIoFilesMap::iterator i = IpcIoFiles.find(diskId); | |
63 | // XXX: warn and continue? | |
64 | Must(i != IpcIoFiles.end()); | |
65 | Must(i->second == this); | |
66 | IpcIoFiles.erase(i); | |
67 | } | |
254912f3 AR |
68 | } |
69 | ||
70 | void | |
71 | IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback) | |
72 | { | |
73 | ioRequestor = callback; | |
74 | Must(diskId < 0); // we do not know our disker yet | |
f5591061 DK |
75 | |
76 | if (!queue.get()) | |
77 | queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier)); | |
254912f3 AR |
78 | |
79 | if (IamDiskProcess()) { | |
80 | error_ = !DiskerOpen(dbName, flags, mode); | |
f7091279 DK |
81 | if (error_) |
82 | return; | |
83 | ||
b2aa0934 DK |
84 | diskId = KidIdentifier; |
85 | const bool inserted = | |
86 | IpcIoFiles.insert(std::make_pair(diskId, this)).second; | |
87 | Must(inserted); | |
9a51593d | 88 | |
f7091279 DK |
89 | Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid())); |
90 | ann.strand.tag = dbName; | |
91 | Ipc::TypedMsgHdr message; | |
92 | ann.pack(message); | |
93 | SendMessage(Ipc::coordinatorAddr, message); | |
94 | ||
e528e570 | 95 | ioRequestor->ioCompletedNotification(); |
254912f3 | 96 | return; |
9a51593d | 97 | } |
254912f3 | 98 | |
9a51593d DK |
99 | Ipc::StrandSearchRequest request; |
100 | request.requestorId = KidIdentifier; | |
101 | request.tag = dbName; | |
254912f3 | 102 | |
9a51593d DK |
103 | Ipc::TypedMsgHdr msg; |
104 | request.pack(msg); | |
105 | Ipc::SendMessage(Ipc::coordinatorAddr, msg); | |
106 | ||
b2aa0934 DK |
107 | WaitingForOpen.push_back(this); |
108 | ||
109 | eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout, | |
110 | this, Timeout, 0, false); // "this" pointer is used as id | |
254912f3 AR |
111 | } |
112 | ||
113 | void | |
9a51593d | 114 | IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) { |
9a51593d | 115 | Must(diskId < 0); // we do not know our disker yet |
9a51593d DK |
116 | |
117 | if (!response) { | |
caca86d7 AR |
118 | debugs(79,1, HERE << "error: timeout"); |
119 | error_ = true; | |
9a51593d DK |
120 | } else { |
121 | diskId = response->strand.kidId; | |
122 | if (diskId >= 0) { | |
b2aa0934 DK |
123 | const bool inserted = |
124 | IpcIoFiles.insert(std::make_pair(diskId, this)).second; | |
125 | Must(inserted); | |
9a51593d | 126 | } else { |
254912f3 AR |
127 | error_ = true; |
128 | debugs(79,1, HERE << "error: no disker claimed " << dbName); | |
9a51593d DK |
129 | } |
130 | } | |
254912f3 AR |
131 | |
132 | ioRequestor->ioCompletedNotification(); | |
133 | } | |
134 | ||
135 | /** | |
136 | * Alias for IpcIoFile::open(...) | |
137 | \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback) | |
138 | */ | |
139 | void | |
140 | IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback) | |
141 | { | |
142 | assert(false); // check | |
143 | /* We use the same logic path for open */ | |
144 | open(flags, mode, callback); | |
145 | } | |
146 | ||
147 | void | |
148 | IpcIoFile::close() | |
149 | { | |
150 | assert(ioRequestor != NULL); | |
151 | ||
152 | if (IamDiskProcess()) | |
153 | DiskerClose(dbName); | |
9a51593d | 154 | // XXX: else nothing to do? |
254912f3 AR |
155 | |
156 | ioRequestor->closeCompleted(); | |
157 | } | |
158 | ||
159 | bool | |
160 | IpcIoFile::canRead() const | |
161 | { | |
162 | return diskId >= 0; | |
163 | } | |
164 | ||
165 | bool | |
166 | IpcIoFile::canWrite() const | |
167 | { | |
168 | return diskId >= 0; | |
169 | } | |
170 | ||
171 | bool | |
172 | IpcIoFile::error() const | |
173 | { | |
174 | return error_; | |
175 | } | |
176 | ||
177 | void | |
178 | IpcIoFile::read(ReadRequest *readRequest) | |
179 | { | |
180 | debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " << | |
181 | readRequest->offset << ")"); | |
182 | ||
183 | assert(ioRequestor != NULL); | |
184 | assert(readRequest->len >= 0); | |
185 | assert(readRequest->offset >= 0); | |
186 | Must(!error_); | |
187 | ||
188 | //assert(minOffset < 0 || minOffset <= readRequest->offset); | |
189 | //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset); | |
190 | ||
9a51593d | 191 | IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this); |
254912f3 | 192 | pending->readRequest = readRequest; |
7a907247 | 193 | push(pending); |
254912f3 AR |
194 | } |
195 | ||
196 | void | |
197 | IpcIoFile::readCompleted(ReadRequest *readRequest, | |
8ed94021 | 198 | IpcIoMsg *const response) |
254912f3 | 199 | { |
caca86d7 | 200 | bool ioError = false; |
9a51593d | 201 | if (!response) { |
caca86d7 AR |
202 | debugs(79,1, HERE << "error: timeout"); |
203 | ioError = true; // I/O timeout does not warrant setting error_? | |
9a51593d DK |
204 | } else |
205 | if (response->xerrno) { | |
206 | debugs(79,1, HERE << "error: " << xstrerr(response->xerrno)); | |
caca86d7 | 207 | ioError = error_ = true; |
8ed94021 DK |
208 | } |
209 | else | |
210 | if (!response->page) { | |
211 | debugs(79,1, HERE << "error: run out of shared memory pages"); | |
212 | ioError = true; | |
9a51593d | 213 | } else { |
8ed94021 DK |
214 | const char *const buf = Ipc::Mem::PagePointer(response->page); |
215 | memcpy(readRequest->buf, buf, response->len); | |
20b0e1fe | 216 | } |
254912f3 | 217 | |
8ed94021 DK |
218 | Ipc::Mem::PutPage(response->page); |
219 | ||
caca86d7 AR |
220 | const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len; |
221 | const int errflag = ioError ? DISK_ERROR : DISK_OK; | |
20b0e1fe | 222 | ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest); |
254912f3 AR |
223 | } |
224 | ||
225 | void | |
226 | IpcIoFile::write(WriteRequest *writeRequest) | |
227 | { | |
228 | debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " << | |
229 | writeRequest->offset << ")"); | |
230 | ||
231 | assert(ioRequestor != NULL); | |
232 | assert(writeRequest->len >= 0); | |
233 | assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len? | |
234 | assert(writeRequest->offset >= 0); | |
235 | Must(!error_); | |
236 | ||
237 | //assert(minOffset < 0 || minOffset <= writeRequest->offset); | |
238 | //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset); | |
239 | ||
9a51593d | 240 | IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this); |
254912f3 | 241 | pending->writeRequest = writeRequest; |
7a907247 | 242 | push(pending); |
254912f3 AR |
243 | } |
244 | ||
245 | void | |
246 | IpcIoFile::writeCompleted(WriteRequest *writeRequest, | |
9a51593d | 247 | const IpcIoMsg *const response) |
254912f3 | 248 | { |
caca86d7 | 249 | bool ioError = false; |
9a51593d | 250 | if (!response) { |
caca86d7 AR |
251 | debugs(79,1, HERE << "error: timeout"); |
252 | ioError = true; // I/O timeout does not warrant setting error_? | |
254912f3 | 253 | } else |
9a51593d DK |
254 | if (response->xerrno) { |
255 | debugs(79,1, HERE << "error: " << xstrerr(response->xerrno)); | |
256 | ioError = error_ = true; | |
257 | } else | |
258 | if (response->len != writeRequest->len) { | |
259 | debugs(79,1, HERE << "problem: " << response->len << " < " << writeRequest->len); | |
254912f3 AR |
260 | error_ = true; |
261 | } | |
262 | ||
263 | if (writeRequest->free_func) | |
264 | (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API? | |
265 | ||
caca86d7 | 266 | if (!ioError) { |
254912f3 AR |
267 | debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" << |
268 | diskId << " at " << writeRequest->offset); | |
269 | } | |
270 | ||
caca86d7 AR |
271 | const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len; |
272 | const int errflag = ioError ? DISK_ERROR : DISK_OK; | |
254912f3 AR |
273 | ioRequestor->writeCompleted(errflag, rlen, writeRequest); |
274 | } | |
275 | ||
276 | bool | |
277 | IpcIoFile::ioInProgress() const | |
278 | { | |
9a51593d | 279 | return !olderRequests->empty() || !newerRequests->empty(); |
254912f3 AR |
280 | } |
281 | ||
9a51593d | 282 | /// track a new pending request |
254912f3 | 283 | void |
7a907247 | 284 | IpcIoFile::trackPendingRequest(IpcIoPendingRequest *const pending) |
254912f3 | 285 | { |
7a907247 | 286 | newerRequests->insert(std::make_pair(lastRequestId, pending)); |
9a51593d DK |
287 | if (!timeoutCheckScheduled) |
288 | scheduleTimeoutCheck(); | |
289 | } | |
254912f3 | 290 | |
9a51593d DK |
291 | /// push an I/O request to disker |
292 | void | |
7a907247 | 293 | IpcIoFile::push(IpcIoPendingRequest *const pending) |
9a51593d | 294 | { |
fa61cefe | 295 | // prevent queue overflows: check for responses to earlier requests |
f5591061 | 296 | HandleResponses("before push"); |
fa61cefe | 297 | |
a1c98830 | 298 | debugs(47, 7, HERE); |
9a51593d | 299 | Must(diskId >= 0); |
7a907247 DK |
300 | Must(pending); |
301 | Must(pending->readRequest || pending->writeRequest); | |
9a51593d DK |
302 | |
303 | IpcIoMsg ipcIo; | |
8ed94021 DK |
304 | try { |
305 | ipcIo.requestId = lastRequestId; | |
306 | if (pending->readRequest) { | |
307 | ipcIo.command = IpcIo::cmdRead; | |
308 | ipcIo.offset = pending->readRequest->offset; | |
309 | ipcIo.len = pending->readRequest->len; | |
310 | } else { // pending->writeRequest | |
311 | Must(pending->writeRequest->len <= Ipc::Mem::PageSize()); | |
551f8a18 | 312 | if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) { |
8ed94021 | 313 | ipcIo.len = 0; |
551f8a18 | 314 | throw TexcHere("run out of shared memory pages for IPC I/O"); |
8ed94021 DK |
315 | } |
316 | ipcIo.command = IpcIo::cmdWrite; | |
317 | ipcIo.offset = pending->writeRequest->offset; | |
318 | ipcIo.len = pending->writeRequest->len; | |
319 | char *const buf = Ipc::Mem::PagePointer(ipcIo.page); | |
320 | memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away | |
321 | } | |
254912f3 | 322 | |
f5591061 | 323 | debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId)); |
fa61cefe | 324 | |
f5591061 | 325 | if (queue->push(diskId, ipcIo)) |
fa61cefe | 326 | Notify(diskId); // must notify disker |
7a907247 | 327 | trackPendingRequest(pending); |
f5591061 | 328 | } catch (const Queue::Full &) { |
fa61cefe AR |
329 | debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " << |
330 | SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len | |
331 | // TODO: grow queue size | |
8ed94021 DK |
332 | |
333 | pending->completeIo(NULL); // XXX: should distinguish this from timeout | |
334 | delete pending; | |
335 | } catch (const TextException &e) { | |
336 | debugs(47, DBG_IMPORTANT, HERE << e.what()); | |
fa61cefe | 337 | pending->completeIo(NULL); // XXX: should distinguish this from timeout |
7a907247 | 338 | delete pending; |
9a51593d | 339 | } |
254912f3 AR |
340 | } |
341 | ||
9a51593d | 342 | /// called when coordinator responds to worker open request |
254912f3 | 343 | void |
9a51593d | 344 | IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response) |
254912f3 | 345 | { |
9a51593d | 346 | debugs(47, 7, HERE << "coordinator response to open request"); |
b2aa0934 DK |
347 | for (IpcIoFileList::iterator i = WaitingForOpen.begin(); |
348 | i != WaitingForOpen.end(); ++i) { | |
349 | if (response.strand.tag == (*i)->dbName) { | |
350 | (*i)->openCompleted(&response); | |
351 | WaitingForOpen.erase(i); | |
352 | return; | |
353 | } | |
354 | } | |
355 | ||
356 | debugs(47, 4, HERE << "LATE disker response to open for " << | |
357 | response.strand.tag); | |
358 | // nothing we can do about it; completeIo() has been called already | |
9a51593d | 359 | } |
254912f3 | 360 | |
9a51593d | 361 | void |
f5591061 | 362 | IpcIoFile::HandleResponses(const char *const when) |
fa61cefe AR |
363 | { |
364 | debugs(47, 4, HERE << "popping all " << when); | |
fa61cefe AR |
365 | IpcIoMsg ipcIo; |
366 | // get all responses we can: since we are not pushing, this will stop | |
f5591061 DK |
367 | int diskId; |
368 | while (queue->pop(diskId, ipcIo)) { | |
369 | const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId); | |
370 | Must(i != IpcIoFiles.end()); // TODO: warn but continue | |
371 | i->second->handleResponse(ipcIo); | |
372 | } | |
9a51593d | 373 | } |
254912f3 | 374 | |
9a51593d | 375 | void |
8ed94021 | 376 | IpcIoFile::handleResponse(IpcIoMsg &ipcIo) |
9a51593d DK |
377 | { |
378 | const int requestId = ipcIo.requestId; | |
fa61cefe | 379 | debugs(47, 7, HERE << "popped disker response: " << |
f5591061 | 380 | SipcIo(KidIdentifier, ipcIo, diskId)); |
fa61cefe | 381 | |
9a51593d DK |
382 | Must(requestId); |
383 | if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) { | |
384 | pending->completeIo(&ipcIo); | |
caca86d7 AR |
385 | delete pending; // XXX: leaking if throwing |
386 | } else { | |
9a51593d DK |
387 | debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command << |
388 | "; ipcIo" << KidIdentifier << '.' << requestId); | |
caca86d7 AR |
389 | // nothing we can do about it; completeIo() has been called already |
390 | } | |
254912f3 AR |
391 | } |
392 | ||
254912f3 | 393 | void |
9a51593d | 394 | IpcIoFile::Notify(const int peerId) |
254912f3 | 395 | { |
fa61cefe | 396 | // TODO: Count and report the total number of notifications, pops, pushes. |
a1c98830 | 397 | debugs(47, 7, HERE << "kid" << peerId); |
9a51593d DK |
398 | Ipc::TypedMsgHdr msg; |
399 | msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type? | |
400 | msg.putInt(KidIdentifier); | |
401 | const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, peerId); | |
402 | Ipc::SendMessage(addr, msg); | |
403 | } | |
404 | ||
405 | void | |
406 | IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg) | |
407 | { | |
fa61cefe AR |
408 | const int from = msg.getInt(); |
409 | debugs(47, 7, HERE << "from " << from); | |
f5591061 DK |
410 | queue->clearReaderSignal(from); |
411 | if (IamDiskProcess()) | |
412 | DiskerHandleRequests(); | |
413 | else | |
414 | HandleResponses("after notification"); | |
9a51593d DK |
415 | } |
416 | ||
b2aa0934 DK |
417 | /// handles open request timeout |
418 | void | |
419 | IpcIoFile::OpenTimeout(void *const param) | |
420 | { | |
421 | Must(param); | |
422 | // the pointer is used for comparison only and not dereferenced | |
423 | const IpcIoFile *const ipcIoFile = | |
424 | reinterpret_cast<const IpcIoFile *>(param); | |
425 | for (IpcIoFileList::iterator i = WaitingForOpen.begin(); | |
426 | i != WaitingForOpen.end(); ++i) { | |
427 | if (*i == ipcIoFile) { | |
428 | (*i)->openCompleted(NULL); | |
429 | WaitingForOpen.erase(i); | |
430 | break; | |
431 | } | |
432 | } | |
433 | } | |
434 | ||
9a51593d DK |
435 | /// IpcIoFile::checkTimeouts wrapper |
436 | void | |
437 | IpcIoFile::CheckTimeouts(void *const param) | |
438 | { | |
9a51593d | 439 | Must(param); |
b2aa0934 DK |
440 | const int diskId = reinterpret_cast<uintptr_t>(param); |
441 | debugs(47, 7, HERE << "diskId=" << diskId); | |
442 | const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId); | |
443 | if (i != IpcIoFiles.end()) | |
444 | i->second->checkTimeouts(); | |
9a51593d DK |
445 | } |
446 | ||
447 | void | |
448 | IpcIoFile::checkTimeouts() | |
449 | { | |
9a51593d | 450 | timeoutCheckScheduled = false; |
254912f3 | 451 | |
caca86d7 AR |
452 | // any old request would have timed out by now |
453 | typedef RequestMap::const_iterator RMCI; | |
9a51593d DK |
454 | for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) { |
455 | IpcIoPendingRequest *const pending = i->second; | |
254912f3 | 456 | |
b2aa0934 | 457 | const unsigned int requestId = i->first; |
caca86d7 | 458 | debugs(47, 7, HERE << "disker timeout; ipcIo" << |
b2aa0934 | 459 | KidIdentifier << '.' << requestId); |
254912f3 | 460 | |
caca86d7 AR |
461 | pending->completeIo(NULL); // no response |
462 | delete pending; // XXX: leaking if throwing | |
9a51593d DK |
463 | } |
464 | olderRequests->clear(); | |
caca86d7 | 465 | |
9a51593d DK |
466 | swap(olderRequests, newerRequests); // switches pointers around |
467 | if (!olderRequests->empty()) | |
468 | scheduleTimeoutCheck(); | |
254912f3 AR |
469 | } |
470 | ||
caca86d7 | 471 | /// prepare to check for timeouts in a little while |
254912f3 | 472 | void |
9a51593d | 473 | IpcIoFile::scheduleTimeoutCheck() |
254912f3 | 474 | { |
b2aa0934 | 475 | // we check all older requests at once so some may be wait for 2*Timeout |
caca86d7 | 476 | eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts, |
b2aa0934 | 477 | reinterpret_cast<void *>(diskId), Timeout, 0, false); |
9a51593d | 478 | timeoutCheckScheduled = true; |
254912f3 AR |
479 | } |
480 | ||
481 | /// returns and forgets the right IpcIoFile pending request | |
482 | IpcIoPendingRequest * | |
9a51593d | 483 | IpcIoFile::dequeueRequest(const unsigned int requestId) |
254912f3 | 484 | { |
254912f3 | 485 | Must(requestId != 0); |
caca86d7 | 486 | |
9a51593d DK |
487 | RequestMap *map = NULL; |
488 | RequestMap::iterator i = requestMap1.find(requestId); | |
caca86d7 | 489 | |
9a51593d DK |
490 | if (i != requestMap1.end()) |
491 | map = &requestMap1; | |
caca86d7 | 492 | else { |
9a51593d DK |
493 | i = requestMap2.find(requestId); |
494 | if (i != requestMap2.end()) | |
495 | map = &requestMap2; | |
caca86d7 AR |
496 | } |
497 | ||
498 | if (!map) // not found in both maps | |
499 | return NULL; | |
500 | ||
501 | IpcIoPendingRequest *pending = i->second; | |
502 | map->erase(i); | |
503 | return pending; | |
254912f3 AR |
504 | } |
505 | ||
506 | int | |
9a51593d | 507 | IpcIoFile::getFD() const |
254912f3 AR |
508 | { |
509 | assert(false); // not supported; TODO: remove this method from API | |
510 | return -1; | |
511 | } | |
512 | ||
513 | ||
9a51593d | 514 | /* IpcIoMsg */ |
254912f3 | 515 | |
9a51593d DK |
516 | IpcIoMsg::IpcIoMsg(): |
517 | requestId(0), offset(0), len(0), command(IpcIo::cmdNone), xerrno(0) | |
254912f3 | 518 | { |
254912f3 AR |
519 | } |
520 | ||
caca86d7 | 521 | /* IpcIoPendingRequest */ |
254912f3 AR |
522 | |
523 | IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile): | |
524 | file(aFile), readRequest(NULL), writeRequest(NULL) | |
525 | { | |
b2aa0934 | 526 | Must(file != NULL); |
9a51593d DK |
527 | if (++file->lastRequestId == 0) // don't use zero value as requestId |
528 | ++file->lastRequestId; | |
254912f3 AR |
529 | } |
530 | ||
caca86d7 | 531 | void |
8ed94021 | 532 | IpcIoPendingRequest::completeIo(IpcIoMsg *const response) |
caca86d7 | 533 | { |
caca86d7 AR |
534 | if (readRequest) |
535 | file->readCompleted(readRequest, response); | |
536 | else | |
537 | if (writeRequest) | |
538 | file->writeCompleted(writeRequest, response); | |
9a51593d DK |
539 | else { |
540 | Must(!response); // only timeouts are handled here | |
541 | file->openCompleted(NULL); | |
542 | } | |
caca86d7 AR |
543 | } |
544 | ||
545 | ||
254912f3 AR |
546 | |
547 | /* XXX: disker code that should probably be moved elsewhere */ | |
548 | ||
549 | static int TheFile = -1; ///< db file descriptor | |
550 | ||
9a51593d DK |
551 | static void |
552 | diskerRead(IpcIoMsg &ipcIo) | |
254912f3 | 553 | { |
551f8a18 | 554 | if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) { |
8ed94021 | 555 | ipcIo.len = 0; |
551f8a18 | 556 | debugs(47,5, HERE << "run out of shared memory pages for IPC I/O"); |
8ed94021 DK |
557 | return; |
558 | } | |
559 | ||
560 | char *const buf = Ipc::Mem::PagePointer(ipcIo.page); | |
561 | const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset); | |
2d338731 AR |
562 | statCounter.syscalls.disk.reads++; |
563 | fd_bytes(TheFile, read, FD_READ); | |
564 | ||
254912f3 | 565 | if (read >= 0) { |
9a51593d DK |
566 | ipcIo.xerrno = 0; |
567 | const size_t len = static_cast<size_t>(read); // safe because read > 0 | |
254912f3 | 568 | debugs(47,8, HERE << "disker" << KidIdentifier << " read " << |
9a51593d DK |
569 | (len == ipcIo.len ? "all " : "just ") << read); |
570 | ipcIo.len = len; | |
571 | } else { | |
572 | ipcIo.xerrno = errno; | |
573 | ipcIo.len = 0; | |
254912f3 | 574 | debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " << |
9a51593d DK |
575 | ipcIo.xerrno); |
576 | } | |
254912f3 AR |
577 | } |
578 | ||
9a51593d DK |
579 | static void |
580 | diskerWrite(IpcIoMsg &ipcIo) | |
254912f3 | 581 | { |
8ed94021 DK |
582 | const char *const buf = Ipc::Mem::PagePointer(ipcIo.page); |
583 | const ssize_t wrote = pwrite(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset); | |
2d338731 AR |
584 | statCounter.syscalls.disk.writes++; |
585 | fd_bytes(TheFile, wrote, FD_WRITE); | |
586 | ||
254912f3 | 587 | if (wrote >= 0) { |
9a51593d DK |
588 | ipcIo.xerrno = 0; |
589 | const size_t len = static_cast<size_t>(wrote); // safe because wrote > 0 | |
254912f3 | 590 | debugs(47,8, HERE << "disker" << KidIdentifier << " wrote " << |
9a51593d DK |
591 | (len == ipcIo.len ? "all " : "just ") << wrote); |
592 | ipcIo.len = len; | |
593 | } else { | |
594 | ipcIo.xerrno = errno; | |
595 | ipcIo.len = 0; | |
254912f3 | 596 | debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " << |
9a51593d DK |
597 | ipcIo.xerrno); |
598 | } | |
8ed94021 DK |
599 | |
600 | Ipc::Mem::PutPage(ipcIo.page); | |
254912f3 AR |
601 | } |
602 | ||
254912f3 | 603 | void |
f5591061 | 604 | IpcIoFile::DiskerHandleRequests() |
254912f3 | 605 | { |
fa61cefe AR |
606 | int workerId = 0; |
607 | IpcIoMsg ipcIo; | |
f5591061 | 608 | while (queue->pop(workerId, ipcIo)) |
fa61cefe AR |
609 | DiskerHandleRequest(workerId, ipcIo); |
610 | ||
611 | // TODO: If the loop keeps on looping, we probably should take a break | |
612 | // once in a while to update clock, read Coordinator messages, etc. | |
613 | // This can be combined with "elevator" optimization where we get up to N | |
614 | // requests first, then reorder the popped requests to optimize seek time, | |
615 | // then do I/O, then take a break, and come back for the next set of I/O | |
616 | // requests. | |
9a51593d | 617 | } |
254912f3 | 618 | |
9a51593d DK |
619 | /// called when disker receives an I/O request |
620 | void | |
621 | IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo) | |
622 | { | |
9a51593d | 623 | if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) { |
254912f3 | 624 | debugs(0,0, HERE << "disker" << KidIdentifier << |
9a51593d DK |
625 | " should not receive " << ipcIo.command << |
626 | " ipcIo" << workerId << '.' << ipcIo.requestId); | |
627 | return; | |
628 | } | |
629 | ||
630 | debugs(47,5, HERE << "disker" << KidIdentifier << | |
631 | (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") << | |
632 | ipcIo.len << " at " << ipcIo.offset << | |
633 | " ipcIo" << workerId << '.' << ipcIo.requestId); | |
634 | ||
635 | if (ipcIo.command == IpcIo::cmdRead) | |
636 | diskerRead(ipcIo); | |
637 | else // ipcIo.command == IpcIo::cmdWrite | |
638 | diskerWrite(ipcIo); | |
639 | ||
f5591061 | 640 | debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier)); |
fa61cefe | 641 | |
7a907247 | 642 | try { |
f5591061 | 643 | if (queue->push(workerId, ipcIo)) |
5e44782e | 644 | Notify(workerId); // must notify worker |
f5591061 | 645 | } catch (const Queue::Full &) { |
fa61cefe AR |
646 | // The worker queue should not overflow because the worker should pop() |
647 | // before push()ing and because if disker pops N requests at a time, | |
648 | // we should make sure the worker pop() queue length is the worker | |
649 | // push queue length plus N+1. XXX: implement the N+1 difference. | |
650 | debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue overflow: " << | |
5e44782e | 651 | SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len |
fa61cefe AR |
652 | |
653 | // the I/O request we could not push will timeout | |
7a907247 | 654 | } |
254912f3 AR |
655 | } |
656 | ||
657 | static bool | |
658 | DiskerOpen(const String &path, int flags, mode_t mode) | |
659 | { | |
660 | assert(TheFile < 0); | |
661 | ||
662 | TheFile = file_open(path.termedBuf(), flags); | |
663 | ||
664 | if (TheFile < 0) { | |
665 | const int xerrno = errno; | |
666 | debugs(47,0, HERE << "rock db error opening " << path << ": " << | |
667 | xstrerr(xerrno)); | |
668 | return false; | |
9a51593d | 669 | } |
254912f3 AR |
670 | |
671 | store_open_disk_fd++; | |
672 | debugs(79,3, HERE << "rock db opened " << path << ": FD " << TheFile); | |
9a51593d | 673 | return true; |
254912f3 AR |
674 | } |
675 | ||
676 | static void | |
677 | DiskerClose(const String &path) | |
678 | { | |
679 | if (TheFile >= 0) { | |
680 | file_close(TheFile); | |
681 | debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile); | |
682 | TheFile = -1; | |
683 | store_open_disk_fd--; | |
a1c98830 | 684 | } |
254912f3 | 685 | } |
f5591061 DK |
686 | |
687 | ||
688 | /// initializes shared memory segments used by IpcIoFile | |
689 | class IpcIoRr: public RegisteredRunner | |
690 | { | |
691 | public: | |
692 | /* RegisteredRunner API */ | |
693 | IpcIoRr(): owner(NULL) {} | |
694 | virtual void run(const RunnerRegistry &); | |
695 | virtual ~IpcIoRr(); | |
696 | ||
697 | private: | |
698 | Ipc::FewToFewBiQueue::Owner *owner; | |
699 | }; | |
700 | ||
701 | RunnerRegistrationEntry(rrAfterConfig, IpcIoRr); | |
702 | ||
703 | ||
704 | void IpcIoRr::run(const RunnerRegistry &) | |
705 | { | |
706 | if (!UsingSmp()) | |
707 | return; | |
708 | ||
709 | if (IamMasterProcess()) { | |
710 | Must(!owner); | |
711 | // XXX: make capacity configurable | |
712 | owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1, Config.cacheSwap.n_configured, 1 + Config.workers, sizeof(IpcIoMsg), 1024); | |
713 | } | |
714 | } | |
715 | ||
716 | IpcIoRr::~IpcIoRr() | |
717 | { | |
718 | delete owner; | |
719 | } |