]>
Commit | Line | Data |
---|---|---|
254912f3 AR |
1 | /* |
2 | * $Id$ | |
3 | * | |
4 | * DEBUG: section 47 Store Directory Routines | |
5 | */ | |
6 | ||
7 | #include "config.h" | |
f5591061 | 8 | #include "base/RunnersRegistry.h" |
254912f3 AR |
9 | #include "base/TextException.h" |
10 | #include "DiskIO/IORequestor.h" | |
11 | #include "DiskIO/IpcIo/IpcIoFile.h" | |
12 | #include "DiskIO/ReadRequest.h" | |
13 | #include "DiskIO/WriteRequest.h" | |
14 | #include "ipc/Messages.h" | |
15 | #include "ipc/Port.h" | |
f5591061 | 16 | #include "ipc/Queue.h" |
9a51593d | 17 | #include "ipc/StrandSearch.h" |
254912f3 | 18 | #include "ipc/UdsOp.h" |
8ed94021 | 19 | #include "ipc/mem/Pages.h" |
0a11e039 | 20 | #include "SquidTime.h" |
254912f3 AR |
21 | |
22 | CBDATA_CLASS_INIT(IpcIoFile); | |
23 | ||
f5591061 DK |
24 | /// shared memory segment path to use for IpcIoFile maps |
25 | static const char *const ShmLabel = "io_file"; | |
26 | ||
fa61cefe | 27 | const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more |
b2aa0934 DK |
28 | IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen; |
29 | IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles; | |
f5591061 | 30 | std::auto_ptr<IpcIoFile::Queue> IpcIoFile::queue; |
254912f3 | 31 | |
c792f7fc AR |
32 | bool IpcIoFile::DiskerHandleMoreRequestsScheduled = false; |
33 | ||
254912f3 AR |
34 | static bool DiskerOpen(const String &path, int flags, mode_t mode); |
35 | static void DiskerClose(const String &path); | |
36 | ||
fa61cefe AR |
37 | /// IpcIo wrapper for debugs() streams; XXX: find a better class name |
38 | struct SipcIo { | |
39 | SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker): | |
9199139f | 40 | worker(aWorker), msg(aMsg), disker(aDisker) {} |
fa61cefe AR |
41 | |
42 | int worker; | |
43 | const IpcIoMsg &msg; | |
44 | int disker; | |
45 | }; | |
46 | ||
47 | std::ostream & | |
48 | operator <<(std::ostream &os, const SipcIo &sio) | |
49 | { | |
50 | return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId << | |
9199139f | 51 | (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker; |
fa61cefe AR |
52 | } |
53 | ||
254912f3 AR |
54 | |
55 | IpcIoFile::IpcIoFile(char const *aDb): | |
9199139f AR |
56 | dbName(aDb), diskId(-1), error_(false), lastRequestId(0), |
57 | olderRequests(&requestMap1), newerRequests(&requestMap2), | |
58 | timeoutCheckScheduled(false) | |
254912f3 AR |
59 | { |
60 | } | |
61 | ||
62 | IpcIoFile::~IpcIoFile() | |
63 | { | |
b2aa0934 DK |
64 | if (diskId >= 0) { |
65 | const IpcIoFilesMap::iterator i = IpcIoFiles.find(diskId); | |
66 | // XXX: warn and continue? | |
67 | Must(i != IpcIoFiles.end()); | |
68 | Must(i->second == this); | |
69 | IpcIoFiles.erase(i); | |
70 | } | |
254912f3 AR |
71 | } |
72 | ||
43ebbac3 AR |
73 | void |
74 | IpcIoFile::configure(const Config &cfg) | |
75 | { | |
76 | DiskFile::configure(cfg); | |
77 | config = cfg; | |
78 | } | |
79 | ||
254912f3 AR |
80 | void |
81 | IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback) | |
82 | { | |
83 | ioRequestor = callback; | |
84 | Must(diskId < 0); // we do not know our disker yet | |
f5591061 DK |
85 | |
86 | if (!queue.get()) | |
87 | queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier)); | |
254912f3 AR |
88 | |
89 | if (IamDiskProcess()) { | |
90 | error_ = !DiskerOpen(dbName, flags, mode); | |
f7091279 DK |
91 | if (error_) |
92 | return; | |
93 | ||
b2aa0934 DK |
94 | diskId = KidIdentifier; |
95 | const bool inserted = | |
96 | IpcIoFiles.insert(std::make_pair(diskId, this)).second; | |
97 | Must(inserted); | |
9a51593d | 98 | |
df881a0f AR |
99 | queue->localRateLimit() = |
100 | static_cast<Ipc::QueueReader::Rate::Value>(config.ioRate); | |
101 | ||
f7091279 DK |
102 | Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid())); |
103 | ann.strand.tag = dbName; | |
104 | Ipc::TypedMsgHdr message; | |
105 | ann.pack(message); | |
106 | SendMessage(Ipc::coordinatorAddr, message); | |
107 | ||
e528e570 | 108 | ioRequestor->ioCompletedNotification(); |
254912f3 | 109 | return; |
9a51593d | 110 | } |
254912f3 | 111 | |
9a51593d DK |
112 | Ipc::StrandSearchRequest request; |
113 | request.requestorId = KidIdentifier; | |
114 | request.tag = dbName; | |
254912f3 | 115 | |
9a51593d DK |
116 | Ipc::TypedMsgHdr msg; |
117 | request.pack(msg); | |
118 | Ipc::SendMessage(Ipc::coordinatorAddr, msg); | |
119 | ||
b2aa0934 DK |
120 | WaitingForOpen.push_back(this); |
121 | ||
122 | eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout, | |
123 | this, Timeout, 0, false); // "this" pointer is used as id | |
254912f3 AR |
124 | } |
125 | ||
126 | void | |
9199139f AR |
127 | IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) |
128 | { | |
9a51593d | 129 | Must(diskId < 0); // we do not know our disker yet |
9a51593d DK |
130 | |
131 | if (!response) { | |
caca86d7 AR |
132 | debugs(79,1, HERE << "error: timeout"); |
133 | error_ = true; | |
9a51593d DK |
134 | } else { |
135 | diskId = response->strand.kidId; | |
136 | if (diskId >= 0) { | |
b2aa0934 DK |
137 | const bool inserted = |
138 | IpcIoFiles.insert(std::make_pair(diskId, this)).second; | |
139 | Must(inserted); | |
9a51593d | 140 | } else { |
254912f3 AR |
141 | error_ = true; |
142 | debugs(79,1, HERE << "error: no disker claimed " << dbName); | |
9a51593d DK |
143 | } |
144 | } | |
254912f3 AR |
145 | |
146 | ioRequestor->ioCompletedNotification(); | |
147 | } | |
148 | ||
149 | /** | |
150 | * Alias for IpcIoFile::open(...) | |
151 | \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback) | |
152 | */ | |
153 | void | |
154 | IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback) | |
155 | { | |
156 | assert(false); // check | |
157 | /* We use the same logic path for open */ | |
158 | open(flags, mode, callback); | |
159 | } | |
160 | ||
161 | void | |
162 | IpcIoFile::close() | |
163 | { | |
164 | assert(ioRequestor != NULL); | |
165 | ||
166 | if (IamDiskProcess()) | |
167 | DiskerClose(dbName); | |
9a51593d | 168 | // XXX: else nothing to do? |
254912f3 AR |
169 | |
170 | ioRequestor->closeCompleted(); | |
171 | } | |
172 | ||
173 | bool | |
174 | IpcIoFile::canRead() const | |
175 | { | |
0a11e039 | 176 | return diskId >= 0 && canWait(); |
254912f3 AR |
177 | } |
178 | ||
179 | bool | |
180 | IpcIoFile::canWrite() const | |
181 | { | |
0a11e039 | 182 | return diskId >= 0 && canWait(); |
254912f3 AR |
183 | } |
184 | ||
185 | bool | |
186 | IpcIoFile::error() const | |
187 | { | |
188 | return error_; | |
189 | } | |
190 | ||
191 | void | |
192 | IpcIoFile::read(ReadRequest *readRequest) | |
193 | { | |
194 | debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " << | |
9199139f | 195 | readRequest->offset << ")"); |
254912f3 AR |
196 | |
197 | assert(ioRequestor != NULL); | |
254912f3 AR |
198 | assert(readRequest->offset >= 0); |
199 | Must(!error_); | |
200 | ||
201 | //assert(minOffset < 0 || minOffset <= readRequest->offset); | |
202 | //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset); | |
203 | ||
9a51593d | 204 | IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this); |
254912f3 | 205 | pending->readRequest = readRequest; |
7a907247 | 206 | push(pending); |
254912f3 AR |
207 | } |
208 | ||
209 | void | |
210 | IpcIoFile::readCompleted(ReadRequest *readRequest, | |
8ed94021 | 211 | IpcIoMsg *const response) |
254912f3 | 212 | { |
caca86d7 | 213 | bool ioError = false; |
9a51593d | 214 | if (!response) { |
0ef0509e | 215 | debugs(79, 3, HERE << "error: timeout"); |
caca86d7 | 216 | ioError = true; // I/O timeout does not warrant setting error_? |
9a51593d | 217 | } else { |
70e3f706 DK |
218 | if (response->xerrno) { |
219 | debugs(79,1, HERE << "error: " << xstrerr(response->xerrno)); | |
220 | ioError = error_ = true; | |
9199139f | 221 | } else if (!response->page) { |
70e3f706 DK |
222 | debugs(79,1, HERE << "error: run out of shared memory pages"); |
223 | ioError = true; | |
224 | } else { | |
225 | const char *const buf = Ipc::Mem::PagePointer(response->page); | |
226 | memcpy(readRequest->buf, buf, response->len); | |
227 | } | |
254912f3 | 228 | |
70e3f706 DK |
229 | Ipc::Mem::PutPage(response->page); |
230 | } | |
8ed94021 | 231 | |
caca86d7 | 232 | const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len; |
9199139f | 233 | const int errflag = ioError ? DISK_ERROR :DISK_OK; |
20b0e1fe | 234 | ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest); |
254912f3 AR |
235 | } |
236 | ||
237 | void | |
238 | IpcIoFile::write(WriteRequest *writeRequest) | |
239 | { | |
240 | debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " << | |
9199139f | 241 | writeRequest->offset << ")"); |
254912f3 AR |
242 | |
243 | assert(ioRequestor != NULL); | |
254912f3 AR |
244 | assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len? |
245 | assert(writeRequest->offset >= 0); | |
246 | Must(!error_); | |
247 | ||
248 | //assert(minOffset < 0 || minOffset <= writeRequest->offset); | |
249 | //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset); | |
250 | ||
9a51593d | 251 | IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this); |
254912f3 | 252 | pending->writeRequest = writeRequest; |
7a907247 | 253 | push(pending); |
254912f3 AR |
254 | } |
255 | ||
256 | void | |
257 | IpcIoFile::writeCompleted(WriteRequest *writeRequest, | |
9a51593d | 258 | const IpcIoMsg *const response) |
254912f3 | 259 | { |
caca86d7 | 260 | bool ioError = false; |
9a51593d | 261 | if (!response) { |
0ef0509e | 262 | debugs(79, 3, HERE << "error: timeout"); |
caca86d7 | 263 | ioError = true; // I/O timeout does not warrant setting error_? |
9199139f | 264 | } else if (response->xerrno) { |
9a51593d DK |
265 | debugs(79,1, HERE << "error: " << xstrerr(response->xerrno)); |
266 | ioError = error_ = true; | |
9199139f | 267 | } else if (response->len != writeRequest->len) { |
9a51593d | 268 | debugs(79,1, HERE << "problem: " << response->len << " < " << writeRequest->len); |
254912f3 AR |
269 | error_ = true; |
270 | } | |
271 | ||
272 | if (writeRequest->free_func) | |
273 | (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API? | |
274 | ||
caca86d7 | 275 | if (!ioError) { |
254912f3 | 276 | debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" << |
9199139f AR |
277 | diskId << " at " << writeRequest->offset); |
278 | } | |
254912f3 | 279 | |
caca86d7 | 280 | const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len; |
9199139f | 281 | const int errflag = ioError ? DISK_ERROR :DISK_OK; |
254912f3 AR |
282 | ioRequestor->writeCompleted(errflag, rlen, writeRequest); |
283 | } | |
284 | ||
285 | bool | |
286 | IpcIoFile::ioInProgress() const | |
287 | { | |
9a51593d | 288 | return !olderRequests->empty() || !newerRequests->empty(); |
254912f3 AR |
289 | } |
290 | ||
9a51593d | 291 | /// track a new pending request |
254912f3 | 292 | void |
7a907247 | 293 | IpcIoFile::trackPendingRequest(IpcIoPendingRequest *const pending) |
254912f3 | 294 | { |
7a907247 | 295 | newerRequests->insert(std::make_pair(lastRequestId, pending)); |
9a51593d DK |
296 | if (!timeoutCheckScheduled) |
297 | scheduleTimeoutCheck(); | |
298 | } | |
254912f3 | 299 | |
9a51593d DK |
300 | /// push an I/O request to disker |
301 | void | |
7a907247 | 302 | IpcIoFile::push(IpcIoPendingRequest *const pending) |
9a51593d | 303 | { |
fa61cefe | 304 | // prevent queue overflows: check for responses to earlier requests |
f5591061 | 305 | HandleResponses("before push"); |
fa61cefe | 306 | |
a1c98830 | 307 | debugs(47, 7, HERE); |
9a51593d | 308 | Must(diskId >= 0); |
7a907247 DK |
309 | Must(pending); |
310 | Must(pending->readRequest || pending->writeRequest); | |
9a51593d DK |
311 | |
312 | IpcIoMsg ipcIo; | |
8ed94021 DK |
313 | try { |
314 | ipcIo.requestId = lastRequestId; | |
0a11e039 | 315 | ipcIo.start = current_time; |
8ed94021 DK |
316 | if (pending->readRequest) { |
317 | ipcIo.command = IpcIo::cmdRead; | |
318 | ipcIo.offset = pending->readRequest->offset; | |
319 | ipcIo.len = pending->readRequest->len; | |
320 | } else { // pending->writeRequest | |
321 | Must(pending->writeRequest->len <= Ipc::Mem::PageSize()); | |
551f8a18 | 322 | if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) { |
8ed94021 | 323 | ipcIo.len = 0; |
551f8a18 | 324 | throw TexcHere("run out of shared memory pages for IPC I/O"); |
8ed94021 DK |
325 | } |
326 | ipcIo.command = IpcIo::cmdWrite; | |
327 | ipcIo.offset = pending->writeRequest->offset; | |
328 | ipcIo.len = pending->writeRequest->len; | |
329 | char *const buf = Ipc::Mem::PagePointer(ipcIo.page); | |
330 | memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away | |
331 | } | |
254912f3 | 332 | |
f5591061 | 333 | debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId)); |
fa61cefe | 334 | |
f5591061 | 335 | if (queue->push(diskId, ipcIo)) |
fa61cefe | 336 | Notify(diskId); // must notify disker |
7a907247 | 337 | trackPendingRequest(pending); |
f5591061 | 338 | } catch (const Queue::Full &) { |
fa61cefe AR |
339 | debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " << |
340 | SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len | |
341 | // TODO: grow queue size | |
9199139f | 342 | |
0ef0509e | 343 | pending->completeIo(NULL); |
8ed94021 DK |
344 | delete pending; |
345 | } catch (const TextException &e) { | |
346 | debugs(47, DBG_IMPORTANT, HERE << e.what()); | |
0ef0509e | 347 | pending->completeIo(NULL); |
7a907247 | 348 | delete pending; |
9a51593d | 349 | } |
254912f3 AR |
350 | } |
351 | ||
0a11e039 AR |
352 | /// whether we think there is enough time to complete the I/O |
353 | bool | |
9199139f AR |
354 | IpcIoFile::canWait() const |
355 | { | |
43ebbac3 | 356 | if (!config.ioTimeout) |
0a11e039 AR |
357 | return true; // no timeout specified |
358 | ||
359 | IpcIoMsg oldestIo; | |
360 | if (!queue->peek(diskId, oldestIo) || oldestIo.start.tv_sec <= 0) | |
361 | return true; // we cannot estimate expected wait time; assume it is OK | |
362 | ||
55939a01 AR |
363 | const int oldestWait = tvSubMsec(oldestIo.start, current_time); |
364 | ||
365 | int rateWait = -1; // time in millisecons | |
366 | const Ipc::QueueReader::Rate::Value ioRate = queue->rateLimit(diskId); | |
367 | if (ioRate > 0) { | |
368 | // if there are N requests pending, the new one will wait at | |
369 | // least N/max-swap-rate seconds | |
75017bc9 | 370 | rateWait = static_cast<int>(1e3 * queue->outSize(diskId) / ioRate); |
55939a01 AR |
371 | // adjust N/max-swap-rate value based on the queue "balance" |
372 | // member, in case we have been borrowing time against future | |
373 | // I/O already | |
374 | rateWait += queue->balance(diskId); | |
375 | } | |
376 | ||
377 | const int expectedWait = max(oldestWait, rateWait); | |
0a11e039 | 378 | if (expectedWait < 0 || |
43ebbac3 | 379 | static_cast<time_msec_t>(expectedWait) < config.ioTimeout) |
0a11e039 AR |
380 | return true; // expected wait time is acceptible |
381 | ||
c792f7fc AR |
382 | debugs(47,2, HERE << "cannot wait: " << expectedWait << |
383 | " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId)); | |
0a11e039 AR |
384 | return false; // do not want to wait that long |
385 | } | |
386 | ||
9a51593d | 387 | /// called when coordinator responds to worker open request |
254912f3 | 388 | void |
9a51593d | 389 | IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response) |
254912f3 | 390 | { |
9a51593d | 391 | debugs(47, 7, HERE << "coordinator response to open request"); |
b2aa0934 | 392 | for (IpcIoFileList::iterator i = WaitingForOpen.begin(); |
9199139f | 393 | i != WaitingForOpen.end(); ++i) { |
b2aa0934 DK |
394 | if (response.strand.tag == (*i)->dbName) { |
395 | (*i)->openCompleted(&response); | |
396 | WaitingForOpen.erase(i); | |
397 | return; | |
398 | } | |
399 | } | |
400 | ||
401 | debugs(47, 4, HERE << "LATE disker response to open for " << | |
402 | response.strand.tag); | |
403 | // nothing we can do about it; completeIo() has been called already | |
9a51593d | 404 | } |
254912f3 | 405 | |
9a51593d | 406 | void |
f5591061 | 407 | IpcIoFile::HandleResponses(const char *const when) |
fa61cefe AR |
408 | { |
409 | debugs(47, 4, HERE << "popping all " << when); | |
fa61cefe AR |
410 | IpcIoMsg ipcIo; |
411 | // get all responses we can: since we are not pushing, this will stop | |
f5591061 DK |
412 | int diskId; |
413 | while (queue->pop(diskId, ipcIo)) { | |
414 | const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId); | |
415 | Must(i != IpcIoFiles.end()); // TODO: warn but continue | |
416 | i->second->handleResponse(ipcIo); | |
417 | } | |
9a51593d | 418 | } |
254912f3 | 419 | |
9a51593d | 420 | void |
8ed94021 | 421 | IpcIoFile::handleResponse(IpcIoMsg &ipcIo) |
9a51593d DK |
422 | { |
423 | const int requestId = ipcIo.requestId; | |
fa61cefe | 424 | debugs(47, 7, HERE << "popped disker response: " << |
9199139f | 425 | SipcIo(KidIdentifier, ipcIo, diskId)); |
fa61cefe | 426 | |
9a51593d DK |
427 | Must(requestId); |
428 | if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) { | |
429 | pending->completeIo(&ipcIo); | |
caca86d7 AR |
430 | delete pending; // XXX: leaking if throwing |
431 | } else { | |
9a51593d DK |
432 | debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command << |
433 | "; ipcIo" << KidIdentifier << '.' << requestId); | |
caca86d7 AR |
434 | // nothing we can do about it; completeIo() has been called already |
435 | } | |
254912f3 AR |
436 | } |
437 | ||
254912f3 | 438 | void |
9a51593d | 439 | IpcIoFile::Notify(const int peerId) |
254912f3 | 440 | { |
fa61cefe | 441 | // TODO: Count and report the total number of notifications, pops, pushes. |
a1c98830 | 442 | debugs(47, 7, HERE << "kid" << peerId); |
9a51593d DK |
443 | Ipc::TypedMsgHdr msg; |
444 | msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type? | |
445 | msg.putInt(KidIdentifier); | |
446 | const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, peerId); | |
447 | Ipc::SendMessage(addr, msg); | |
448 | } | |
449 | ||
450 | void | |
451 | IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg) | |
452 | { | |
fa61cefe AR |
453 | const int from = msg.getInt(); |
454 | debugs(47, 7, HERE << "from " << from); | |
f5591061 DK |
455 | queue->clearReaderSignal(from); |
456 | if (IamDiskProcess()) | |
457 | DiskerHandleRequests(); | |
458 | else | |
459 | HandleResponses("after notification"); | |
9a51593d DK |
460 | } |
461 | ||
b2aa0934 DK |
462 | /// handles open request timeout |
463 | void | |
464 | IpcIoFile::OpenTimeout(void *const param) | |
465 | { | |
466 | Must(param); | |
467 | // the pointer is used for comparison only and not dereferenced | |
468 | const IpcIoFile *const ipcIoFile = | |
469 | reinterpret_cast<const IpcIoFile *>(param); | |
470 | for (IpcIoFileList::iterator i = WaitingForOpen.begin(); | |
9199139f | 471 | i != WaitingForOpen.end(); ++i) { |
b2aa0934 DK |
472 | if (*i == ipcIoFile) { |
473 | (*i)->openCompleted(NULL); | |
474 | WaitingForOpen.erase(i); | |
475 | break; | |
476 | } | |
477 | } | |
478 | } | |
479 | ||
9a51593d DK |
480 | /// IpcIoFile::checkTimeouts wrapper |
481 | void | |
482 | IpcIoFile::CheckTimeouts(void *const param) | |
483 | { | |
9a51593d | 484 | Must(param); |
b2aa0934 DK |
485 | const int diskId = reinterpret_cast<uintptr_t>(param); |
486 | debugs(47, 7, HERE << "diskId=" << diskId); | |
487 | const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId); | |
488 | if (i != IpcIoFiles.end()) | |
489 | i->second->checkTimeouts(); | |
9a51593d DK |
490 | } |
491 | ||
492 | void | |
493 | IpcIoFile::checkTimeouts() | |
494 | { | |
9a51593d | 495 | timeoutCheckScheduled = false; |
254912f3 | 496 | |
0ef0509e AR |
497 | // last chance to recover in case a notification message was lost, etc. |
498 | const RequestMap::size_type timeoutsBefore = olderRequests->size(); | |
499 | HandleResponses("before timeout"); | |
500 | const RequestMap::size_type timeoutsNow = olderRequests->size(); | |
501 | ||
502 | if (timeoutsBefore > timeoutsNow) { // some requests were rescued | |
503 | // notification message lost or significantly delayed? | |
504 | debugs(47, DBG_IMPORTANT, "WARNING: communication with disker " << | |
505 | "may be too slow or disrupted for about " << | |
506 | Timeout << "s; rescued " << (timeoutsBefore - timeoutsNow) << | |
507 | " out of " << timeoutsBefore << " I/Os"); | |
508 | } | |
509 | ||
510 | if (timeoutsNow) { | |
511 | debugs(47, DBG_IMPORTANT, "WARNING: abandoning " << | |
512 | timeoutsNow << " I/Os after at least " << | |
513 | Timeout << "s timeout"); | |
514 | } | |
515 | ||
caca86d7 AR |
516 | // any old request would have timed out by now |
517 | typedef RequestMap::const_iterator RMCI; | |
9a51593d DK |
518 | for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) { |
519 | IpcIoPendingRequest *const pending = i->second; | |
254912f3 | 520 | |
b2aa0934 | 521 | const unsigned int requestId = i->first; |
caca86d7 | 522 | debugs(47, 7, HERE << "disker timeout; ipcIo" << |
b2aa0934 | 523 | KidIdentifier << '.' << requestId); |
254912f3 | 524 | |
caca86d7 AR |
525 | pending->completeIo(NULL); // no response |
526 | delete pending; // XXX: leaking if throwing | |
9a51593d DK |
527 | } |
528 | olderRequests->clear(); | |
caca86d7 | 529 | |
9a51593d | 530 | swap(olderRequests, newerRequests); // switches pointers around |
0ef0509e | 531 | if (!olderRequests->empty() && !timeoutCheckScheduled) |
9a51593d | 532 | scheduleTimeoutCheck(); |
254912f3 AR |
533 | } |
534 | ||
caca86d7 | 535 | /// prepare to check for timeouts in a little while |
254912f3 | 536 | void |
9a51593d | 537 | IpcIoFile::scheduleTimeoutCheck() |
254912f3 | 538 | { |
b2aa0934 | 539 | // we check all older requests at once so some may be wait for 2*Timeout |
caca86d7 | 540 | eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts, |
b2aa0934 | 541 | reinterpret_cast<void *>(diskId), Timeout, 0, false); |
9a51593d | 542 | timeoutCheckScheduled = true; |
254912f3 AR |
543 | } |
544 | ||
545 | /// returns and forgets the right IpcIoFile pending request | |
546 | IpcIoPendingRequest * | |
9a51593d | 547 | IpcIoFile::dequeueRequest(const unsigned int requestId) |
254912f3 | 548 | { |
254912f3 | 549 | Must(requestId != 0); |
caca86d7 | 550 | |
9a51593d DK |
551 | RequestMap *map = NULL; |
552 | RequestMap::iterator i = requestMap1.find(requestId); | |
caca86d7 | 553 | |
9a51593d DK |
554 | if (i != requestMap1.end()) |
555 | map = &requestMap1; | |
caca86d7 | 556 | else { |
9a51593d DK |
557 | i = requestMap2.find(requestId); |
558 | if (i != requestMap2.end()) | |
559 | map = &requestMap2; | |
caca86d7 AR |
560 | } |
561 | ||
562 | if (!map) // not found in both maps | |
563 | return NULL; | |
564 | ||
565 | IpcIoPendingRequest *pending = i->second; | |
566 | map->erase(i); | |
567 | return pending; | |
254912f3 AR |
568 | } |
569 | ||
570 | int | |
9a51593d | 571 | IpcIoFile::getFD() const |
254912f3 AR |
572 | { |
573 | assert(false); // not supported; TODO: remove this method from API | |
574 | return -1; | |
575 | } | |
576 | ||
577 | ||
9a51593d | 578 | /* IpcIoMsg */ |
254912f3 | 579 | |
9a51593d | 580 | IpcIoMsg::IpcIoMsg(): |
9199139f | 581 | requestId(0), offset(0), len(0), command(IpcIo::cmdNone), xerrno(0) |
254912f3 | 582 | { |
0a11e039 | 583 | start.tv_sec = 0; |
254912f3 AR |
584 | } |
585 | ||
caca86d7 | 586 | /* IpcIoPendingRequest */ |
254912f3 AR |
587 | |
588 | IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile): | |
9199139f | 589 | file(aFile), readRequest(NULL), writeRequest(NULL) |
254912f3 | 590 | { |
b2aa0934 | 591 | Must(file != NULL); |
9a51593d DK |
592 | if (++file->lastRequestId == 0) // don't use zero value as requestId |
593 | ++file->lastRequestId; | |
254912f3 AR |
594 | } |
595 | ||
caca86d7 | 596 | void |
8ed94021 | 597 | IpcIoPendingRequest::completeIo(IpcIoMsg *const response) |
caca86d7 | 598 | { |
caca86d7 AR |
599 | if (readRequest) |
600 | file->readCompleted(readRequest, response); | |
9199139f | 601 | else if (writeRequest) |
caca86d7 | 602 | file->writeCompleted(writeRequest, response); |
9a51593d DK |
603 | else { |
604 | Must(!response); // only timeouts are handled here | |
605 | file->openCompleted(NULL); | |
606 | } | |
caca86d7 AR |
607 | } |
608 | ||
609 | ||
254912f3 AR |
610 | |
611 | /* XXX: disker code that should probably be moved elsewhere */ | |
612 | ||
613 | static int TheFile = -1; ///< db file descriptor | |
614 | ||
9a51593d DK |
615 | static void |
616 | diskerRead(IpcIoMsg &ipcIo) | |
254912f3 | 617 | { |
551f8a18 | 618 | if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) { |
8ed94021 | 619 | ipcIo.len = 0; |
c792f7fc | 620 | debugs(47,2, HERE << "run out of shared memory pages for IPC I/O"); |
8ed94021 DK |
621 | return; |
622 | } | |
623 | ||
624 | char *const buf = Ipc::Mem::PagePointer(ipcIo.page); | |
625 | const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset); | |
2d338731 AR |
626 | statCounter.syscalls.disk.reads++; |
627 | fd_bytes(TheFile, read, FD_READ); | |
628 | ||
254912f3 | 629 | if (read >= 0) { |
9a51593d DK |
630 | ipcIo.xerrno = 0; |
631 | const size_t len = static_cast<size_t>(read); // safe because read > 0 | |
254912f3 | 632 | debugs(47,8, HERE << "disker" << KidIdentifier << " read " << |
9199139f | 633 | (len == ipcIo.len ? "all " : "just ") << read); |
9a51593d DK |
634 | ipcIo.len = len; |
635 | } else { | |
636 | ipcIo.xerrno = errno; | |
637 | ipcIo.len = 0; | |
254912f3 | 638 | debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " << |
9199139f | 639 | ipcIo.xerrno); |
9a51593d | 640 | } |
254912f3 AR |
641 | } |
642 | ||
9a51593d DK |
643 | static void |
644 | diskerWrite(IpcIoMsg &ipcIo) | |
254912f3 | 645 | { |
8ed94021 DK |
646 | const char *const buf = Ipc::Mem::PagePointer(ipcIo.page); |
647 | const ssize_t wrote = pwrite(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset); | |
2d338731 AR |
648 | statCounter.syscalls.disk.writes++; |
649 | fd_bytes(TheFile, wrote, FD_WRITE); | |
650 | ||
254912f3 | 651 | if (wrote >= 0) { |
9a51593d DK |
652 | ipcIo.xerrno = 0; |
653 | const size_t len = static_cast<size_t>(wrote); // safe because wrote > 0 | |
254912f3 | 654 | debugs(47,8, HERE << "disker" << KidIdentifier << " wrote " << |
9199139f | 655 | (len == ipcIo.len ? "all " : "just ") << wrote); |
9a51593d DK |
656 | ipcIo.len = len; |
657 | } else { | |
658 | ipcIo.xerrno = errno; | |
659 | ipcIo.len = 0; | |
254912f3 | 660 | debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " << |
9a51593d DK |
661 | ipcIo.xerrno); |
662 | } | |
8ed94021 DK |
663 | |
664 | Ipc::Mem::PutPage(ipcIo.page); | |
254912f3 AR |
665 | } |
666 | ||
c792f7fc AR |
667 | |
668 | void | |
df881a0f | 669 | IpcIoFile::DiskerHandleMoreRequests(void *source) |
c792f7fc | 670 | { |
df881a0f AR |
671 | debugs(47, 7, HERE << "resuming handling requests after " << |
672 | static_cast<const char *>(source)); | |
c792f7fc AR |
673 | DiskerHandleMoreRequestsScheduled = false; |
674 | IpcIoFile::DiskerHandleRequests(); | |
675 | } | |
676 | ||
df881a0f AR |
677 | bool |
678 | IpcIoFile::WaitBeforePop() | |
679 | { | |
680 | const Ipc::QueueReader::Rate::Value ioRate = queue->localRateLimit(); | |
681 | const double maxRate = ioRate/1e3; // req/ms | |
682 | ||
683 | // do we need to enforce configured I/O rate? | |
684 | if (maxRate <= 0) | |
685 | return false; | |
686 | ||
687 | // is there an I/O request we could potentially delay? | |
688 | if (!queue->popReady()) { | |
689 | // unlike pop(), popReady() is not reliable and does not block reader | |
690 | // so we must proceed with pop() even if it is likely to fail | |
691 | return false; | |
692 | } | |
693 | ||
9a585a30 | 694 | static timeval LastIo = current_time; |
df881a0f AR |
695 | |
696 | const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os | |
697 | // do not accumulate more than 100ms or 100 I/Os, whichever is smaller | |
698 | const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration)); | |
699 | ||
700 | const double credit = ioDuration; // what the last I/O should have cost us | |
701 | const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O | |
702 | LastIo = current_time; | |
703 | ||
704 | Ipc::QueueReader::Balance &balance = queue->localBalance(); | |
705 | balance += static_cast<int64_t>(credit - debit); | |
706 | ||
707 | debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit); | |
708 | ||
709 | if (balance > maxImbalance) { | |
710 | // if we accumulated too much time for future slow I/Os, | |
711 | // then shed accumulated time to keep just half of the excess | |
712 | const int64_t toSpend = balance - maxImbalance/2; | |
69321ae9 AR |
713 | |
714 | if (toSpend/1e3 > Timeout) | |
715 | debugs(47, DBG_IMPORTANT, "WARNING: Rock disker delays I/O " << | |
716 | "requests for " << (toSpend/1e3) << " seconds to obey " << | |
717 | ioRate << "/sec rate limit"); | |
718 | ||
df881a0f AR |
719 | debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" << |
720 | (1e3*maxRate) << "/sec rate"); | |
721 | eventAdd("IpcIoFile::DiskerHandleMoreRequests", | |
722 | &IpcIoFile::DiskerHandleMoreRequests, | |
723 | const_cast<char*>("rate limiting"), | |
724 | toSpend/1e3, 0, false); | |
725 | DiskerHandleMoreRequestsScheduled = true; | |
726 | return true; | |
e29ccb57 | 727 | } else if (balance < -maxImbalance) { |
df881a0f AR |
728 | // do not owe "too much" to avoid "too large" bursts of I/O |
729 | balance = -maxImbalance; | |
730 | } | |
731 | ||
732 | return false; | |
733 | } | |
734 | ||
254912f3 | 735 | void |
f5591061 | 736 | IpcIoFile::DiskerHandleRequests() |
254912f3 | 737 | { |
c792f7fc | 738 | // Balance our desire to maximize the number of concurrent I/O requests |
fdb3059b | 739 | // (reordred by OS to minimize seek time) with a requirement to |
c792f7fc | 740 | // send 1st-I/O notification messages, process Coordinator events, etc. |
fdb3059b AR |
741 | const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms |
742 | const timeval loopStart = current_time; | |
743 | ||
c792f7fc | 744 | int popped = 0; |
fa61cefe AR |
745 | int workerId = 0; |
746 | IpcIoMsg ipcIo; | |
df881a0f | 747 | while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) { |
fdb3059b AR |
748 | ++popped; |
749 | ||
750 | // at least one I/O per call is guaranteed if the queue is not empty | |
fa61cefe AR |
751 | DiskerHandleRequest(workerId, ipcIo); |
752 | ||
fdb3059b AR |
753 | getCurrentTime(); |
754 | const double elapsedMsec = tvSubMsec(loopStart, current_time); | |
755 | if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) { | |
756 | if (!DiskerHandleMoreRequestsScheduled) { | |
757 | // the gap must be positive for select(2) to be given a chance | |
758 | const double minBreakSecs = 0.001; | |
759 | eventAdd("IpcIoFile::DiskerHandleMoreRequests", | |
9199139f | 760 | &IpcIoFile::DiskerHandleMoreRequests, |
df881a0f AR |
761 | const_cast<char*>("long I/O loop"), |
762 | minBreakSecs, 0, false); | |
fdb3059b AR |
763 | DiskerHandleMoreRequestsScheduled = true; |
764 | } | |
765 | debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " << | |
9199139f | 766 | elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O"); |
fdb3059b AR |
767 | break; |
768 | } | |
c792f7fc AR |
769 | } |
770 | ||
fdb3059b | 771 | // TODO: consider using O_DIRECT with "elevator" optimization where we pop |
fa61cefe AR |
772 | // requests first, then reorder the popped requests to optimize seek time, |
773 | // then do I/O, then take a break, and come back for the next set of I/O | |
774 | // requests. | |
9a51593d | 775 | } |
254912f3 | 776 | |
9a51593d DK |
777 | /// called when disker receives an I/O request |
778 | void | |
779 | IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo) | |
780 | { | |
9a51593d | 781 | if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) { |
254912f3 | 782 | debugs(0,0, HERE << "disker" << KidIdentifier << |
9a51593d DK |
783 | " should not receive " << ipcIo.command << |
784 | " ipcIo" << workerId << '.' << ipcIo.requestId); | |
785 | return; | |
786 | } | |
787 | ||
788 | debugs(47,5, HERE << "disker" << KidIdentifier << | |
789 | (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") << | |
790 | ipcIo.len << " at " << ipcIo.offset << | |
791 | " ipcIo" << workerId << '.' << ipcIo.requestId); | |
792 | ||
793 | if (ipcIo.command == IpcIo::cmdRead) | |
794 | diskerRead(ipcIo); | |
795 | else // ipcIo.command == IpcIo::cmdWrite | |
796 | diskerWrite(ipcIo); | |
797 | ||
f5591061 | 798 | debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier)); |
fa61cefe | 799 | |
7a907247 | 800 | try { |
f5591061 | 801 | if (queue->push(workerId, ipcIo)) |
5e44782e | 802 | Notify(workerId); // must notify worker |
f5591061 | 803 | } catch (const Queue::Full &) { |
fa61cefe AR |
804 | // The worker queue should not overflow because the worker should pop() |
805 | // before push()ing and because if disker pops N requests at a time, | |
806 | // we should make sure the worker pop() queue length is the worker | |
807 | // push queue length plus N+1. XXX: implement the N+1 difference. | |
808 | debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue overflow: " << | |
5e44782e | 809 | SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len |
fa61cefe AR |
810 | |
811 | // the I/O request we could not push will timeout | |
7a907247 | 812 | } |
254912f3 AR |
813 | } |
814 | ||
815 | static bool | |
816 | DiskerOpen(const String &path, int flags, mode_t mode) | |
817 | { | |
818 | assert(TheFile < 0); | |
819 | ||
820 | TheFile = file_open(path.termedBuf(), flags); | |
821 | ||
822 | if (TheFile < 0) { | |
823 | const int xerrno = errno; | |
824 | debugs(47,0, HERE << "rock db error opening " << path << ": " << | |
825 | xstrerr(xerrno)); | |
826 | return false; | |
9a51593d | 827 | } |
254912f3 AR |
828 | |
829 | store_open_disk_fd++; | |
830 | debugs(79,3, HERE << "rock db opened " << path << ": FD " << TheFile); | |
9a51593d | 831 | return true; |
254912f3 AR |
832 | } |
833 | ||
834 | static void | |
835 | DiskerClose(const String &path) | |
836 | { | |
837 | if (TheFile >= 0) { | |
838 | file_close(TheFile); | |
839 | debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile); | |
840 | TheFile = -1; | |
841 | store_open_disk_fd--; | |
a1c98830 | 842 | } |
254912f3 | 843 | } |
f5591061 DK |
844 | |
845 | ||
846 | /// initializes shared memory segments used by IpcIoFile | |
4404f1c5 | 847 | class IpcIoRr: public Ipc::Mem::RegisteredRunner |
f5591061 DK |
848 | { |
849 | public: | |
850 | /* RegisteredRunner API */ | |
851 | IpcIoRr(): owner(NULL) {} | |
f5591061 DK |
852 | virtual ~IpcIoRr(); |
853 | ||
4404f1c5 DK |
854 | protected: |
855 | virtual void create(const RunnerRegistry &); | |
856 | ||
f5591061 DK |
857 | private: |
858 | Ipc::FewToFewBiQueue::Owner *owner; | |
859 | }; | |
860 | ||
861 | RunnerRegistrationEntry(rrAfterConfig, IpcIoRr); | |
862 | ||
863 | ||
4404f1c5 | 864 | void IpcIoRr::create(const RunnerRegistry &) |
f5591061 DK |
865 | { |
866 | if (!UsingSmp()) | |
867 | return; | |
868 | ||
4404f1c5 DK |
869 | Must(!owner); |
870 | // XXX: make capacity configurable | |
871 | owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1, | |
3b581957 | 872 | Config.cacheSwap.n_strands, |
4404f1c5 DK |
873 | 1 + Config.workers, sizeof(IpcIoMsg), |
874 | 1024); | |
f5591061 DK |
875 | } |
876 | ||
877 | IpcIoRr::~IpcIoRr() | |
878 | { | |
879 | delete owner; | |
880 | } |