]>
Commit | Line | Data |
---|---|---|
254912f3 AR |
1 | /* |
2 | * $Id$ | |
3 | * | |
4 | * DEBUG: section 47 Store Directory Routines | |
5 | */ | |
6 | ||
f7f3304a | 7 | #include "squid.h" |
f5591061 | 8 | #include "base/RunnersRegistry.h" |
254912f3 AR |
9 | #include "base/TextException.h" |
10 | #include "DiskIO/IORequestor.h" | |
11 | #include "DiskIO/IpcIo/IpcIoFile.h" | |
12 | #include "DiskIO/ReadRequest.h" | |
13 | #include "DiskIO/WriteRequest.h" | |
582c2af2 | 14 | #include "globals.h" |
21d845b1 | 15 | #include "ipc/mem/Pages.h" |
254912f3 AR |
16 | #include "ipc/Messages.h" |
17 | #include "ipc/Port.h" | |
f5591061 | 18 | #include "ipc/Queue.h" |
9a51593d | 19 | #include "ipc/StrandSearch.h" |
254912f3 | 20 | #include "ipc/UdsOp.h" |
582c2af2 | 21 | #include "protos.h" |
21d845b1 FC |
22 | #include "SquidTime.h" |
23 | #include "StatCounters.h" | |
254912f3 | 24 | |
21d845b1 FC |
25 | #if HAVE_ERRNO_H |
26 | #include <errno.h> | |
27 | #endif | |
254912f3 AR |
28 | CBDATA_CLASS_INIT(IpcIoFile); |
29 | ||
f5591061 DK |
30 | /// shared memory segment path to use for IpcIoFile maps |
31 | static const char *const ShmLabel = "io_file"; | |
ea2cdeb6 DK |
32 | /// a single worker-to-disker or disker-to-worker queue capacity; up |
33 | /// to 2*QueueCapacity I/O requests queued between a single worker and | |
34 | /// a single disker | |
35 | // TODO: make configurable or compute from squid.conf settings if possible | |
36 | static const int QueueCapacity = 1024; | |
f5591061 | 37 | |
fa61cefe | 38 | const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more |
b2aa0934 DK |
39 | IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen; |
40 | IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles; | |
f5591061 | 41 | std::auto_ptr<IpcIoFile::Queue> IpcIoFile::queue; |
254912f3 | 42 | |
c792f7fc AR |
43 | bool IpcIoFile::DiskerHandleMoreRequestsScheduled = false; |
44 | ||
254912f3 AR |
45 | static bool DiskerOpen(const String &path, int flags, mode_t mode); |
46 | static void DiskerClose(const String &path); | |
47 | ||
fa61cefe AR |
48 | /// IpcIo wrapper for debugs() streams; XXX: find a better class name |
49 | struct SipcIo { | |
50 | SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker): | |
9199139f | 51 | worker(aWorker), msg(aMsg), disker(aDisker) {} |
fa61cefe AR |
52 | |
53 | int worker; | |
54 | const IpcIoMsg &msg; | |
55 | int disker; | |
56 | }; | |
57 | ||
58 | std::ostream & | |
59 | operator <<(std::ostream &os, const SipcIo &sio) | |
60 | { | |
61 | return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId << | |
9199139f | 62 | (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker; |
fa61cefe AR |
63 | } |
64 | ||
254912f3 | 65 | IpcIoFile::IpcIoFile(char const *aDb): |
9199139f AR |
66 | dbName(aDb), diskId(-1), error_(false), lastRequestId(0), |
67 | olderRequests(&requestMap1), newerRequests(&requestMap2), | |
68 | timeoutCheckScheduled(false) | |
254912f3 AR |
69 | { |
70 | } | |
71 | ||
72 | IpcIoFile::~IpcIoFile() | |
73 | { | |
b2aa0934 DK |
74 | if (diskId >= 0) { |
75 | const IpcIoFilesMap::iterator i = IpcIoFiles.find(diskId); | |
76 | // XXX: warn and continue? | |
77 | Must(i != IpcIoFiles.end()); | |
78 | Must(i->second == this); | |
79 | IpcIoFiles.erase(i); | |
80 | } | |
254912f3 AR |
81 | } |
82 | ||
43ebbac3 AR |
83 | void |
84 | IpcIoFile::configure(const Config &cfg) | |
85 | { | |
86 | DiskFile::configure(cfg); | |
87 | config = cfg; | |
88 | } | |
89 | ||
254912f3 AR |
90 | void |
91 | IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback) | |
92 | { | |
93 | ioRequestor = callback; | |
94 | Must(diskId < 0); // we do not know our disker yet | |
f5591061 DK |
95 | |
96 | if (!queue.get()) | |
97 | queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier)); | |
254912f3 AR |
98 | |
99 | if (IamDiskProcess()) { | |
100 | error_ = !DiskerOpen(dbName, flags, mode); | |
f7091279 DK |
101 | if (error_) |
102 | return; | |
103 | ||
b2aa0934 DK |
104 | diskId = KidIdentifier; |
105 | const bool inserted = | |
106 | IpcIoFiles.insert(std::make_pair(diskId, this)).second; | |
107 | Must(inserted); | |
9a51593d | 108 | |
df881a0f AR |
109 | queue->localRateLimit() = |
110 | static_cast<Ipc::QueueReader::Rate::Value>(config.ioRate); | |
111 | ||
f7091279 DK |
112 | Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid())); |
113 | ann.strand.tag = dbName; | |
114 | Ipc::TypedMsgHdr message; | |
115 | ann.pack(message); | |
116 | SendMessage(Ipc::coordinatorAddr, message); | |
117 | ||
e528e570 | 118 | ioRequestor->ioCompletedNotification(); |
254912f3 | 119 | return; |
9a51593d | 120 | } |
254912f3 | 121 | |
9a51593d DK |
122 | Ipc::StrandSearchRequest request; |
123 | request.requestorId = KidIdentifier; | |
124 | request.tag = dbName; | |
254912f3 | 125 | |
9a51593d DK |
126 | Ipc::TypedMsgHdr msg; |
127 | request.pack(msg); | |
128 | Ipc::SendMessage(Ipc::coordinatorAddr, msg); | |
129 | ||
b2aa0934 DK |
130 | WaitingForOpen.push_back(this); |
131 | ||
132 | eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout, | |
133 | this, Timeout, 0, false); // "this" pointer is used as id | |
254912f3 AR |
134 | } |
135 | ||
136 | void | |
9199139f AR |
137 | IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) |
138 | { | |
9a51593d | 139 | Must(diskId < 0); // we do not know our disker yet |
9a51593d DK |
140 | |
141 | if (!response) { | |
e0236918 | 142 | debugs(79, DBG_IMPORTANT, HERE << "error: timeout"); |
caca86d7 | 143 | error_ = true; |
9a51593d DK |
144 | } else { |
145 | diskId = response->strand.kidId; | |
146 | if (diskId >= 0) { | |
b2aa0934 DK |
147 | const bool inserted = |
148 | IpcIoFiles.insert(std::make_pair(diskId, this)).second; | |
149 | Must(inserted); | |
9a51593d | 150 | } else { |
254912f3 | 151 | error_ = true; |
e0236918 | 152 | debugs(79, DBG_IMPORTANT, HERE << "error: no disker claimed " << dbName); |
9a51593d DK |
153 | } |
154 | } | |
254912f3 AR |
155 | |
156 | ioRequestor->ioCompletedNotification(); | |
157 | } | |
158 | ||
159 | /** | |
160 | * Alias for IpcIoFile::open(...) | |
161 | \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback) | |
162 | */ | |
163 | void | |
164 | IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback) | |
165 | { | |
166 | assert(false); // check | |
167 | /* We use the same logic path for open */ | |
168 | open(flags, mode, callback); | |
169 | } | |
170 | ||
171 | void | |
172 | IpcIoFile::close() | |
173 | { | |
174 | assert(ioRequestor != NULL); | |
175 | ||
176 | if (IamDiskProcess()) | |
177 | DiskerClose(dbName); | |
9a51593d | 178 | // XXX: else nothing to do? |
254912f3 AR |
179 | |
180 | ioRequestor->closeCompleted(); | |
181 | } | |
182 | ||
183 | bool | |
184 | IpcIoFile::canRead() const | |
185 | { | |
0a11e039 | 186 | return diskId >= 0 && canWait(); |
254912f3 AR |
187 | } |
188 | ||
189 | bool | |
190 | IpcIoFile::canWrite() const | |
191 | { | |
0a11e039 | 192 | return diskId >= 0 && canWait(); |
254912f3 AR |
193 | } |
194 | ||
195 | bool | |
196 | IpcIoFile::error() const | |
197 | { | |
198 | return error_; | |
199 | } | |
200 | ||
201 | void | |
202 | IpcIoFile::read(ReadRequest *readRequest) | |
203 | { | |
204 | debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " << | |
9199139f | 205 | readRequest->offset << ")"); |
254912f3 AR |
206 | |
207 | assert(ioRequestor != NULL); | |
254912f3 AR |
208 | assert(readRequest->offset >= 0); |
209 | Must(!error_); | |
210 | ||
211 | //assert(minOffset < 0 || minOffset <= readRequest->offset); | |
212 | //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset); | |
213 | ||
9a51593d | 214 | IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this); |
254912f3 | 215 | pending->readRequest = readRequest; |
7a907247 | 216 | push(pending); |
254912f3 AR |
217 | } |
218 | ||
219 | void | |
220 | IpcIoFile::readCompleted(ReadRequest *readRequest, | |
8ed94021 | 221 | IpcIoMsg *const response) |
254912f3 | 222 | { |
caca86d7 | 223 | bool ioError = false; |
9a51593d | 224 | if (!response) { |
0ef0509e | 225 | debugs(79, 3, HERE << "error: timeout"); |
caca86d7 | 226 | ioError = true; // I/O timeout does not warrant setting error_? |
9a51593d | 227 | } else { |
70e3f706 | 228 | if (response->xerrno) { |
e0236918 | 229 | debugs(79, DBG_IMPORTANT, HERE << "error: " << xstrerr(response->xerrno)); |
70e3f706 | 230 | ioError = error_ = true; |
9199139f | 231 | } else if (!response->page) { |
e0236918 | 232 | debugs(79, DBG_IMPORTANT, HERE << "error: run out of shared memory pages"); |
70e3f706 DK |
233 | ioError = true; |
234 | } else { | |
235 | const char *const buf = Ipc::Mem::PagePointer(response->page); | |
236 | memcpy(readRequest->buf, buf, response->len); | |
237 | } | |
254912f3 | 238 | |
70e3f706 DK |
239 | Ipc::Mem::PutPage(response->page); |
240 | } | |
8ed94021 | 241 | |
caca86d7 | 242 | const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len; |
9199139f | 243 | const int errflag = ioError ? DISK_ERROR :DISK_OK; |
20b0e1fe | 244 | ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest); |
254912f3 AR |
245 | } |
246 | ||
247 | void | |
248 | IpcIoFile::write(WriteRequest *writeRequest) | |
249 | { | |
250 | debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " << | |
9199139f | 251 | writeRequest->offset << ")"); |
254912f3 AR |
252 | |
253 | assert(ioRequestor != NULL); | |
254912f3 AR |
254 | assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len? |
255 | assert(writeRequest->offset >= 0); | |
256 | Must(!error_); | |
257 | ||
258 | //assert(minOffset < 0 || minOffset <= writeRequest->offset); | |
259 | //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset); | |
260 | ||
9a51593d | 261 | IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this); |
254912f3 | 262 | pending->writeRequest = writeRequest; |
7a907247 | 263 | push(pending); |
254912f3 AR |
264 | } |
265 | ||
266 | void | |
267 | IpcIoFile::writeCompleted(WriteRequest *writeRequest, | |
9a51593d | 268 | const IpcIoMsg *const response) |
254912f3 | 269 | { |
caca86d7 | 270 | bool ioError = false; |
9a51593d | 271 | if (!response) { |
0ef0509e | 272 | debugs(79, 3, HERE << "error: timeout"); |
caca86d7 | 273 | ioError = true; // I/O timeout does not warrant setting error_? |
9199139f | 274 | } else if (response->xerrno) { |
e0236918 | 275 | debugs(79, DBG_IMPORTANT, HERE << "error: " << xstrerr(response->xerrno)); |
9a51593d | 276 | ioError = error_ = true; |
9199139f | 277 | } else if (response->len != writeRequest->len) { |
e0236918 | 278 | debugs(79, DBG_IMPORTANT, HERE << "problem: " << response->len << " < " << writeRequest->len); |
254912f3 AR |
279 | error_ = true; |
280 | } | |
281 | ||
282 | if (writeRequest->free_func) | |
283 | (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API? | |
284 | ||
caca86d7 | 285 | if (!ioError) { |
254912f3 | 286 | debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" << |
9199139f AR |
287 | diskId << " at " << writeRequest->offset); |
288 | } | |
254912f3 | 289 | |
caca86d7 | 290 | const ssize_t rlen = ioError ? 0 : (ssize_t)writeRequest->len; |
9199139f | 291 | const int errflag = ioError ? DISK_ERROR :DISK_OK; |
254912f3 AR |
292 | ioRequestor->writeCompleted(errflag, rlen, writeRequest); |
293 | } | |
294 | ||
295 | bool | |
296 | IpcIoFile::ioInProgress() const | |
297 | { | |
9a51593d | 298 | return !olderRequests->empty() || !newerRequests->empty(); |
254912f3 AR |
299 | } |
300 | ||
9a51593d | 301 | /// track a new pending request |
254912f3 | 302 | void |
7a907247 | 303 | IpcIoFile::trackPendingRequest(IpcIoPendingRequest *const pending) |
254912f3 | 304 | { |
7a907247 | 305 | newerRequests->insert(std::make_pair(lastRequestId, pending)); |
9a51593d DK |
306 | if (!timeoutCheckScheduled) |
307 | scheduleTimeoutCheck(); | |
308 | } | |
254912f3 | 309 | |
9a51593d DK |
310 | /// push an I/O request to disker |
311 | void | |
7a907247 | 312 | IpcIoFile::push(IpcIoPendingRequest *const pending) |
9a51593d | 313 | { |
fa61cefe | 314 | // prevent queue overflows: check for responses to earlier requests |
f5591061 | 315 | HandleResponses("before push"); |
fa61cefe | 316 | |
a1c98830 | 317 | debugs(47, 7, HERE); |
9a51593d | 318 | Must(diskId >= 0); |
7a907247 DK |
319 | Must(pending); |
320 | Must(pending->readRequest || pending->writeRequest); | |
9a51593d DK |
321 | |
322 | IpcIoMsg ipcIo; | |
8ed94021 DK |
323 | try { |
324 | ipcIo.requestId = lastRequestId; | |
0a11e039 | 325 | ipcIo.start = current_time; |
8ed94021 DK |
326 | if (pending->readRequest) { |
327 | ipcIo.command = IpcIo::cmdRead; | |
328 | ipcIo.offset = pending->readRequest->offset; | |
329 | ipcIo.len = pending->readRequest->len; | |
330 | } else { // pending->writeRequest | |
331 | Must(pending->writeRequest->len <= Ipc::Mem::PageSize()); | |
551f8a18 | 332 | if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) { |
8ed94021 | 333 | ipcIo.len = 0; |
551f8a18 | 334 | throw TexcHere("run out of shared memory pages for IPC I/O"); |
8ed94021 DK |
335 | } |
336 | ipcIo.command = IpcIo::cmdWrite; | |
337 | ipcIo.offset = pending->writeRequest->offset; | |
338 | ipcIo.len = pending->writeRequest->len; | |
339 | char *const buf = Ipc::Mem::PagePointer(ipcIo.page); | |
340 | memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away | |
341 | } | |
254912f3 | 342 | |
f5591061 | 343 | debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId)); |
fa61cefe | 344 | |
f5591061 | 345 | if (queue->push(diskId, ipcIo)) |
fa61cefe | 346 | Notify(diskId); // must notify disker |
7a907247 | 347 | trackPendingRequest(pending); |
f5591061 | 348 | } catch (const Queue::Full &) { |
fa61cefe AR |
349 | debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " << |
350 | SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len | |
351 | // TODO: grow queue size | |
9199139f | 352 | |
0ef0509e | 353 | pending->completeIo(NULL); |
8ed94021 DK |
354 | delete pending; |
355 | } catch (const TextException &e) { | |
356 | debugs(47, DBG_IMPORTANT, HERE << e.what()); | |
0ef0509e | 357 | pending->completeIo(NULL); |
7a907247 | 358 | delete pending; |
9a51593d | 359 | } |
254912f3 AR |
360 | } |
361 | ||
0a11e039 AR |
362 | /// whether we think there is enough time to complete the I/O |
363 | bool | |
9199139f AR |
364 | IpcIoFile::canWait() const |
365 | { | |
43ebbac3 | 366 | if (!config.ioTimeout) |
0a11e039 AR |
367 | return true; // no timeout specified |
368 | ||
369 | IpcIoMsg oldestIo; | |
5aac671b | 370 | if (!queue->findOldest(diskId, oldestIo) || oldestIo.start.tv_sec <= 0) |
0a11e039 AR |
371 | return true; // we cannot estimate expected wait time; assume it is OK |
372 | ||
55939a01 AR |
373 | const int oldestWait = tvSubMsec(oldestIo.start, current_time); |
374 | ||
375 | int rateWait = -1; // time in millisecons | |
376 | const Ipc::QueueReader::Rate::Value ioRate = queue->rateLimit(diskId); | |
377 | if (ioRate > 0) { | |
378 | // if there are N requests pending, the new one will wait at | |
379 | // least N/max-swap-rate seconds | |
75017bc9 | 380 | rateWait = static_cast<int>(1e3 * queue->outSize(diskId) / ioRate); |
55939a01 AR |
381 | // adjust N/max-swap-rate value based on the queue "balance" |
382 | // member, in case we have been borrowing time against future | |
383 | // I/O already | |
384 | rateWait += queue->balance(diskId); | |
385 | } | |
386 | ||
387 | const int expectedWait = max(oldestWait, rateWait); | |
0a11e039 | 388 | if (expectedWait < 0 || |
43ebbac3 | 389 | static_cast<time_msec_t>(expectedWait) < config.ioTimeout) |
0a11e039 AR |
390 | return true; // expected wait time is acceptible |
391 | ||
c792f7fc AR |
392 | debugs(47,2, HERE << "cannot wait: " << expectedWait << |
393 | " oldest: " << SipcIo(KidIdentifier, oldestIo, diskId)); | |
0a11e039 AR |
394 | return false; // do not want to wait that long |
395 | } | |
396 | ||
9a51593d | 397 | /// called when coordinator responds to worker open request |
254912f3 | 398 | void |
9a51593d | 399 | IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response) |
254912f3 | 400 | { |
9a51593d | 401 | debugs(47, 7, HERE << "coordinator response to open request"); |
b2aa0934 | 402 | for (IpcIoFileList::iterator i = WaitingForOpen.begin(); |
9199139f | 403 | i != WaitingForOpen.end(); ++i) { |
b2aa0934 DK |
404 | if (response.strand.tag == (*i)->dbName) { |
405 | (*i)->openCompleted(&response); | |
406 | WaitingForOpen.erase(i); | |
407 | return; | |
408 | } | |
409 | } | |
410 | ||
411 | debugs(47, 4, HERE << "LATE disker response to open for " << | |
412 | response.strand.tag); | |
413 | // nothing we can do about it; completeIo() has been called already | |
9a51593d | 414 | } |
254912f3 | 415 | |
9a51593d | 416 | void |
f5591061 | 417 | IpcIoFile::HandleResponses(const char *const when) |
fa61cefe AR |
418 | { |
419 | debugs(47, 4, HERE << "popping all " << when); | |
fa61cefe AR |
420 | IpcIoMsg ipcIo; |
421 | // get all responses we can: since we are not pushing, this will stop | |
f5591061 DK |
422 | int diskId; |
423 | while (queue->pop(diskId, ipcIo)) { | |
424 | const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId); | |
425 | Must(i != IpcIoFiles.end()); // TODO: warn but continue | |
426 | i->second->handleResponse(ipcIo); | |
427 | } | |
9a51593d | 428 | } |
254912f3 | 429 | |
9a51593d | 430 | void |
8ed94021 | 431 | IpcIoFile::handleResponse(IpcIoMsg &ipcIo) |
9a51593d DK |
432 | { |
433 | const int requestId = ipcIo.requestId; | |
fa61cefe | 434 | debugs(47, 7, HERE << "popped disker response: " << |
9199139f | 435 | SipcIo(KidIdentifier, ipcIo, diskId)); |
fa61cefe | 436 | |
9a51593d DK |
437 | Must(requestId); |
438 | if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) { | |
439 | pending->completeIo(&ipcIo); | |
caca86d7 AR |
440 | delete pending; // XXX: leaking if throwing |
441 | } else { | |
9a51593d DK |
442 | debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command << |
443 | "; ipcIo" << KidIdentifier << '.' << requestId); | |
caca86d7 AR |
444 | // nothing we can do about it; completeIo() has been called already |
445 | } | |
254912f3 AR |
446 | } |
447 | ||
254912f3 | 448 | void |
9a51593d | 449 | IpcIoFile::Notify(const int peerId) |
254912f3 | 450 | { |
fa61cefe | 451 | // TODO: Count and report the total number of notifications, pops, pushes. |
a1c98830 | 452 | debugs(47, 7, HERE << "kid" << peerId); |
9a51593d DK |
453 | Ipc::TypedMsgHdr msg; |
454 | msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type? | |
455 | msg.putInt(KidIdentifier); | |
456 | const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, peerId); | |
457 | Ipc::SendMessage(addr, msg); | |
458 | } | |
459 | ||
460 | void | |
461 | IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg) | |
462 | { | |
fa61cefe AR |
463 | const int from = msg.getInt(); |
464 | debugs(47, 7, HERE << "from " << from); | |
f5591061 DK |
465 | queue->clearReaderSignal(from); |
466 | if (IamDiskProcess()) | |
467 | DiskerHandleRequests(); | |
468 | else | |
469 | HandleResponses("after notification"); | |
9a51593d DK |
470 | } |
471 | ||
b2aa0934 DK |
472 | /// handles open request timeout |
473 | void | |
474 | IpcIoFile::OpenTimeout(void *const param) | |
475 | { | |
476 | Must(param); | |
477 | // the pointer is used for comparison only and not dereferenced | |
478 | const IpcIoFile *const ipcIoFile = | |
479 | reinterpret_cast<const IpcIoFile *>(param); | |
480 | for (IpcIoFileList::iterator i = WaitingForOpen.begin(); | |
9199139f | 481 | i != WaitingForOpen.end(); ++i) { |
b2aa0934 DK |
482 | if (*i == ipcIoFile) { |
483 | (*i)->openCompleted(NULL); | |
484 | WaitingForOpen.erase(i); | |
485 | break; | |
486 | } | |
487 | } | |
488 | } | |
489 | ||
9a51593d DK |
490 | /// IpcIoFile::checkTimeouts wrapper |
491 | void | |
492 | IpcIoFile::CheckTimeouts(void *const param) | |
493 | { | |
9a51593d | 494 | Must(param); |
b2aa0934 DK |
495 | const int diskId = reinterpret_cast<uintptr_t>(param); |
496 | debugs(47, 7, HERE << "diskId=" << diskId); | |
497 | const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId); | |
498 | if (i != IpcIoFiles.end()) | |
499 | i->second->checkTimeouts(); | |
9a51593d DK |
500 | } |
501 | ||
502 | void | |
503 | IpcIoFile::checkTimeouts() | |
504 | { | |
9a51593d | 505 | timeoutCheckScheduled = false; |
254912f3 | 506 | |
0ef0509e AR |
507 | // last chance to recover in case a notification message was lost, etc. |
508 | const RequestMap::size_type timeoutsBefore = olderRequests->size(); | |
509 | HandleResponses("before timeout"); | |
510 | const RequestMap::size_type timeoutsNow = olderRequests->size(); | |
511 | ||
512 | if (timeoutsBefore > timeoutsNow) { // some requests were rescued | |
513 | // notification message lost or significantly delayed? | |
514 | debugs(47, DBG_IMPORTANT, "WARNING: communication with disker " << | |
515 | "may be too slow or disrupted for about " << | |
516 | Timeout << "s; rescued " << (timeoutsBefore - timeoutsNow) << | |
517 | " out of " << timeoutsBefore << " I/Os"); | |
518 | } | |
519 | ||
520 | if (timeoutsNow) { | |
521 | debugs(47, DBG_IMPORTANT, "WARNING: abandoning " << | |
522 | timeoutsNow << " I/Os after at least " << | |
523 | Timeout << "s timeout"); | |
524 | } | |
525 | ||
caca86d7 AR |
526 | // any old request would have timed out by now |
527 | typedef RequestMap::const_iterator RMCI; | |
9a51593d DK |
528 | for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) { |
529 | IpcIoPendingRequest *const pending = i->second; | |
254912f3 | 530 | |
b2aa0934 | 531 | const unsigned int requestId = i->first; |
caca86d7 | 532 | debugs(47, 7, HERE << "disker timeout; ipcIo" << |
b2aa0934 | 533 | KidIdentifier << '.' << requestId); |
254912f3 | 534 | |
caca86d7 AR |
535 | pending->completeIo(NULL); // no response |
536 | delete pending; // XXX: leaking if throwing | |
9a51593d DK |
537 | } |
538 | olderRequests->clear(); | |
caca86d7 | 539 | |
9a51593d | 540 | swap(olderRequests, newerRequests); // switches pointers around |
0ef0509e | 541 | if (!olderRequests->empty() && !timeoutCheckScheduled) |
9a51593d | 542 | scheduleTimeoutCheck(); |
254912f3 AR |
543 | } |
544 | ||
caca86d7 | 545 | /// prepare to check for timeouts in a little while |
254912f3 | 546 | void |
9a51593d | 547 | IpcIoFile::scheduleTimeoutCheck() |
254912f3 | 548 | { |
b2aa0934 | 549 | // we check all older requests at once so some may be wait for 2*Timeout |
caca86d7 | 550 | eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts, |
b2aa0934 | 551 | reinterpret_cast<void *>(diskId), Timeout, 0, false); |
9a51593d | 552 | timeoutCheckScheduled = true; |
254912f3 AR |
553 | } |
554 | ||
555 | /// returns and forgets the right IpcIoFile pending request | |
556 | IpcIoPendingRequest * | |
9a51593d | 557 | IpcIoFile::dequeueRequest(const unsigned int requestId) |
254912f3 | 558 | { |
254912f3 | 559 | Must(requestId != 0); |
caca86d7 | 560 | |
9a51593d DK |
561 | RequestMap *map = NULL; |
562 | RequestMap::iterator i = requestMap1.find(requestId); | |
caca86d7 | 563 | |
9a51593d DK |
564 | if (i != requestMap1.end()) |
565 | map = &requestMap1; | |
caca86d7 | 566 | else { |
9a51593d DK |
567 | i = requestMap2.find(requestId); |
568 | if (i != requestMap2.end()) | |
569 | map = &requestMap2; | |
caca86d7 AR |
570 | } |
571 | ||
572 | if (!map) // not found in both maps | |
573 | return NULL; | |
574 | ||
575 | IpcIoPendingRequest *pending = i->second; | |
576 | map->erase(i); | |
577 | return pending; | |
254912f3 AR |
578 | } |
579 | ||
580 | int | |
9a51593d | 581 | IpcIoFile::getFD() const |
254912f3 AR |
582 | { |
583 | assert(false); // not supported; TODO: remove this method from API | |
584 | return -1; | |
585 | } | |
586 | ||
9a51593d | 587 | /* IpcIoMsg */ |
254912f3 | 588 | |
9a51593d | 589 | IpcIoMsg::IpcIoMsg(): |
9199139f | 590 | requestId(0), offset(0), len(0), command(IpcIo::cmdNone), xerrno(0) |
254912f3 | 591 | { |
0a11e039 | 592 | start.tv_sec = 0; |
254912f3 AR |
593 | } |
594 | ||
caca86d7 | 595 | /* IpcIoPendingRequest */ |
254912f3 AR |
596 | |
597 | IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile): | |
9199139f | 598 | file(aFile), readRequest(NULL), writeRequest(NULL) |
254912f3 | 599 | { |
b2aa0934 | 600 | Must(file != NULL); |
9a51593d DK |
601 | if (++file->lastRequestId == 0) // don't use zero value as requestId |
602 | ++file->lastRequestId; | |
254912f3 AR |
603 | } |
604 | ||
caca86d7 | 605 | void |
8ed94021 | 606 | IpcIoPendingRequest::completeIo(IpcIoMsg *const response) |
caca86d7 | 607 | { |
caca86d7 AR |
608 | if (readRequest) |
609 | file->readCompleted(readRequest, response); | |
9199139f | 610 | else if (writeRequest) |
caca86d7 | 611 | file->writeCompleted(writeRequest, response); |
9a51593d DK |
612 | else { |
613 | Must(!response); // only timeouts are handled here | |
614 | file->openCompleted(NULL); | |
615 | } | |
caca86d7 AR |
616 | } |
617 | ||
254912f3 AR |
618 | /* XXX: disker code that should probably be moved elsewhere */ |
619 | ||
620 | static int TheFile = -1; ///< db file descriptor | |
621 | ||
9a51593d DK |
622 | static void |
623 | diskerRead(IpcIoMsg &ipcIo) | |
254912f3 | 624 | { |
551f8a18 | 625 | if (!Ipc::Mem::GetPage(Ipc::Mem::PageId::ioPage, ipcIo.page)) { |
8ed94021 | 626 | ipcIo.len = 0; |
c792f7fc | 627 | debugs(47,2, HERE << "run out of shared memory pages for IPC I/O"); |
8ed94021 DK |
628 | return; |
629 | } | |
630 | ||
631 | char *const buf = Ipc::Mem::PagePointer(ipcIo.page); | |
632 | const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset); | |
e4f1fdae | 633 | ++statCounter.syscalls.disk.reads; |
2d338731 AR |
634 | fd_bytes(TheFile, read, FD_READ); |
635 | ||
254912f3 | 636 | if (read >= 0) { |
9a51593d DK |
637 | ipcIo.xerrno = 0; |
638 | const size_t len = static_cast<size_t>(read); // safe because read > 0 | |
254912f3 | 639 | debugs(47,8, HERE << "disker" << KidIdentifier << " read " << |
9199139f | 640 | (len == ipcIo.len ? "all " : "just ") << read); |
9a51593d DK |
641 | ipcIo.len = len; |
642 | } else { | |
643 | ipcIo.xerrno = errno; | |
644 | ipcIo.len = 0; | |
254912f3 | 645 | debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " << |
9199139f | 646 | ipcIo.xerrno); |
9a51593d | 647 | } |
254912f3 AR |
648 | } |
649 | ||
9a51593d DK |
650 | static void |
651 | diskerWrite(IpcIoMsg &ipcIo) | |
254912f3 | 652 | { |
8ed94021 DK |
653 | const char *const buf = Ipc::Mem::PagePointer(ipcIo.page); |
654 | const ssize_t wrote = pwrite(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset); | |
e4f1fdae | 655 | ++statCounter.syscalls.disk.writes; |
2d338731 AR |
656 | fd_bytes(TheFile, wrote, FD_WRITE); |
657 | ||
254912f3 | 658 | if (wrote >= 0) { |
9a51593d DK |
659 | ipcIo.xerrno = 0; |
660 | const size_t len = static_cast<size_t>(wrote); // safe because wrote > 0 | |
254912f3 | 661 | debugs(47,8, HERE << "disker" << KidIdentifier << " wrote " << |
9199139f | 662 | (len == ipcIo.len ? "all " : "just ") << wrote); |
9a51593d DK |
663 | ipcIo.len = len; |
664 | } else { | |
665 | ipcIo.xerrno = errno; | |
666 | ipcIo.len = 0; | |
254912f3 | 667 | debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " << |
9a51593d DK |
668 | ipcIo.xerrno); |
669 | } | |
8ed94021 DK |
670 | |
671 | Ipc::Mem::PutPage(ipcIo.page); | |
254912f3 AR |
672 | } |
673 | ||
c792f7fc | 674 | void |
df881a0f | 675 | IpcIoFile::DiskerHandleMoreRequests(void *source) |
c792f7fc | 676 | { |
df881a0f AR |
677 | debugs(47, 7, HERE << "resuming handling requests after " << |
678 | static_cast<const char *>(source)); | |
c792f7fc AR |
679 | DiskerHandleMoreRequestsScheduled = false; |
680 | IpcIoFile::DiskerHandleRequests(); | |
681 | } | |
682 | ||
df881a0f AR |
683 | bool |
684 | IpcIoFile::WaitBeforePop() | |
685 | { | |
686 | const Ipc::QueueReader::Rate::Value ioRate = queue->localRateLimit(); | |
687 | const double maxRate = ioRate/1e3; // req/ms | |
688 | ||
689 | // do we need to enforce configured I/O rate? | |
690 | if (maxRate <= 0) | |
691 | return false; | |
692 | ||
693 | // is there an I/O request we could potentially delay? | |
1e614370 DK |
694 | int processId; |
695 | IpcIoMsg ipcIo; | |
696 | if (!queue->peek(processId, ipcIo)) { | |
697 | // unlike pop(), peek() is not reliable and does not block reader | |
df881a0f AR |
698 | // so we must proceed with pop() even if it is likely to fail |
699 | return false; | |
700 | } | |
701 | ||
9a585a30 | 702 | static timeval LastIo = current_time; |
df881a0f AR |
703 | |
704 | const double ioDuration = 1.0 / maxRate; // ideal distance between two I/Os | |
705 | // do not accumulate more than 100ms or 100 I/Os, whichever is smaller | |
706 | const int64_t maxImbalance = min(static_cast<int64_t>(100), static_cast<int64_t>(100 * ioDuration)); | |
707 | ||
708 | const double credit = ioDuration; // what the last I/O should have cost us | |
709 | const double debit = tvSubMsec(LastIo, current_time); // actual distance from the last I/O | |
710 | LastIo = current_time; | |
711 | ||
712 | Ipc::QueueReader::Balance &balance = queue->localBalance(); | |
713 | balance += static_cast<int64_t>(credit - debit); | |
714 | ||
715 | debugs(47, 7, HERE << "rate limiting balance: " << balance << " after +" << credit << " -" << debit); | |
716 | ||
1e614370 DK |
717 | if (ipcIo.command == IpcIo::cmdWrite && balance > maxImbalance) { |
718 | // if the next request is (likely) write and we accumulated | |
719 | // too much time for future slow I/Os, then shed accumulated | |
720 | // time to keep just half of the excess | |
df881a0f | 721 | const int64_t toSpend = balance - maxImbalance/2; |
69321ae9 AR |
722 | |
723 | if (toSpend/1e3 > Timeout) | |
724 | debugs(47, DBG_IMPORTANT, "WARNING: Rock disker delays I/O " << | |
725 | "requests for " << (toSpend/1e3) << " seconds to obey " << | |
726 | ioRate << "/sec rate limit"); | |
727 | ||
df881a0f AR |
728 | debugs(47, 3, HERE << "rate limiting by " << toSpend << " ms to get" << |
729 | (1e3*maxRate) << "/sec rate"); | |
730 | eventAdd("IpcIoFile::DiskerHandleMoreRequests", | |
731 | &IpcIoFile::DiskerHandleMoreRequests, | |
732 | const_cast<char*>("rate limiting"), | |
733 | toSpend/1e3, 0, false); | |
734 | DiskerHandleMoreRequestsScheduled = true; | |
735 | return true; | |
e29ccb57 | 736 | } else if (balance < -maxImbalance) { |
df881a0f AR |
737 | // do not owe "too much" to avoid "too large" bursts of I/O |
738 | balance = -maxImbalance; | |
739 | } | |
740 | ||
741 | return false; | |
742 | } | |
743 | ||
254912f3 | 744 | void |
f5591061 | 745 | IpcIoFile::DiskerHandleRequests() |
254912f3 | 746 | { |
c792f7fc | 747 | // Balance our desire to maximize the number of concurrent I/O requests |
fdb3059b | 748 | // (reordred by OS to minimize seek time) with a requirement to |
c792f7fc | 749 | // send 1st-I/O notification messages, process Coordinator events, etc. |
fdb3059b AR |
750 | const int maxSpentMsec = 10; // keep small: most RAM I/Os are under 1ms |
751 | const timeval loopStart = current_time; | |
752 | ||
c792f7fc | 753 | int popped = 0; |
fa61cefe AR |
754 | int workerId = 0; |
755 | IpcIoMsg ipcIo; | |
df881a0f | 756 | while (!WaitBeforePop() && queue->pop(workerId, ipcIo)) { |
fdb3059b AR |
757 | ++popped; |
758 | ||
759 | // at least one I/O per call is guaranteed if the queue is not empty | |
fa61cefe AR |
760 | DiskerHandleRequest(workerId, ipcIo); |
761 | ||
fdb3059b AR |
762 | getCurrentTime(); |
763 | const double elapsedMsec = tvSubMsec(loopStart, current_time); | |
764 | if (elapsedMsec > maxSpentMsec || elapsedMsec < 0) { | |
765 | if (!DiskerHandleMoreRequestsScheduled) { | |
766 | // the gap must be positive for select(2) to be given a chance | |
767 | const double minBreakSecs = 0.001; | |
768 | eventAdd("IpcIoFile::DiskerHandleMoreRequests", | |
9199139f | 769 | &IpcIoFile::DiskerHandleMoreRequests, |
df881a0f AR |
770 | const_cast<char*>("long I/O loop"), |
771 | minBreakSecs, 0, false); | |
fdb3059b AR |
772 | DiskerHandleMoreRequestsScheduled = true; |
773 | } | |
774 | debugs(47, 3, HERE << "pausing after " << popped << " I/Os in " << | |
9199139f | 775 | elapsedMsec << "ms; " << (elapsedMsec/popped) << "ms per I/O"); |
fdb3059b AR |
776 | break; |
777 | } | |
c792f7fc AR |
778 | } |
779 | ||
fdb3059b | 780 | // TODO: consider using O_DIRECT with "elevator" optimization where we pop |
fa61cefe AR |
781 | // requests first, then reorder the popped requests to optimize seek time, |
782 | // then do I/O, then take a break, and come back for the next set of I/O | |
783 | // requests. | |
9a51593d | 784 | } |
254912f3 | 785 | |
9a51593d DK |
786 | /// called when disker receives an I/O request |
787 | void | |
788 | IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo) | |
789 | { | |
9a51593d | 790 | if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) { |
fa84c01d | 791 | debugs(0, DBG_CRITICAL, HERE << "disker" << KidIdentifier << |
9a51593d DK |
792 | " should not receive " << ipcIo.command << |
793 | " ipcIo" << workerId << '.' << ipcIo.requestId); | |
794 | return; | |
795 | } | |
796 | ||
797 | debugs(47,5, HERE << "disker" << KidIdentifier << | |
798 | (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") << | |
799 | ipcIo.len << " at " << ipcIo.offset << | |
800 | " ipcIo" << workerId << '.' << ipcIo.requestId); | |
801 | ||
802 | if (ipcIo.command == IpcIo::cmdRead) | |
803 | diskerRead(ipcIo); | |
804 | else // ipcIo.command == IpcIo::cmdWrite | |
805 | diskerWrite(ipcIo); | |
806 | ||
f5591061 | 807 | debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier)); |
fa61cefe | 808 | |
7a907247 | 809 | try { |
f5591061 | 810 | if (queue->push(workerId, ipcIo)) |
5e44782e | 811 | Notify(workerId); // must notify worker |
f5591061 | 812 | } catch (const Queue::Full &) { |
fa61cefe AR |
813 | // The worker queue should not overflow because the worker should pop() |
814 | // before push()ing and because if disker pops N requests at a time, | |
815 | // we should make sure the worker pop() queue length is the worker | |
816 | // push queue length plus N+1. XXX: implement the N+1 difference. | |
817 | debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue overflow: " << | |
5e44782e | 818 | SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len |
fa61cefe AR |
819 | |
820 | // the I/O request we could not push will timeout | |
7a907247 | 821 | } |
254912f3 AR |
822 | } |
823 | ||
824 | static bool | |
825 | DiskerOpen(const String &path, int flags, mode_t mode) | |
826 | { | |
827 | assert(TheFile < 0); | |
828 | ||
829 | TheFile = file_open(path.termedBuf(), flags); | |
830 | ||
831 | if (TheFile < 0) { | |
832 | const int xerrno = errno; | |
fa84c01d | 833 | debugs(47, DBG_CRITICAL, HERE << "rock db error opening " << path << ": " << |
254912f3 AR |
834 | xstrerr(xerrno)); |
835 | return false; | |
9a51593d | 836 | } |
254912f3 | 837 | |
cb4185f1 | 838 | ++store_open_disk_fd; |
254912f3 | 839 | debugs(79,3, HERE << "rock db opened " << path << ": FD " << TheFile); |
9a51593d | 840 | return true; |
254912f3 AR |
841 | } |
842 | ||
843 | static void | |
844 | DiskerClose(const String &path) | |
845 | { | |
846 | if (TheFile >= 0) { | |
847 | file_close(TheFile); | |
848 | debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile); | |
849 | TheFile = -1; | |
5e263176 | 850 | --store_open_disk_fd; |
a1c98830 | 851 | } |
254912f3 | 852 | } |
f5591061 | 853 | |
ea2cdeb6 DK |
854 | /// reports our needs for shared memory pages to Ipc::Mem::Pages |
855 | class IpcIoClaimMemoryNeedsRr: public RegisteredRunner | |
856 | { | |
857 | public: | |
858 | /* RegisteredRunner API */ | |
859 | virtual void run(const RunnerRegistry &r); | |
860 | }; | |
861 | ||
862 | RunnerRegistrationEntry(rrClaimMemoryNeeds, IpcIoClaimMemoryNeedsRr); | |
863 | ||
ea2cdeb6 DK |
864 | void |
865 | IpcIoClaimMemoryNeedsRr::run(const RunnerRegistry &) | |
866 | { | |
867 | const int itemsCount = Ipc::FewToFewBiQueue::MaxItemsCount( | |
c11211d9 | 868 | ::Config.workers, ::Config.cacheSwap.n_strands, QueueCapacity); |
ea2cdeb6 DK |
869 | // the maximum number of shared I/O pages is approximately the |
870 | // number of queue slots, we add a fudge factor to that to account | |
871 | // for corner cases where I/O pages are created before queue | |
872 | // limits are checked or destroyed long after the I/O is dequeued | |
29a9ff4e DK |
873 | Ipc::Mem::NotePageNeed(Ipc::Mem::PageId::ioPage, |
874 | static_cast<int>(itemsCount * 1.1)); | |
ea2cdeb6 DK |
875 | } |
876 | ||
f5591061 | 877 | /// initializes shared memory segments used by IpcIoFile |
4404f1c5 | 878 | class IpcIoRr: public Ipc::Mem::RegisteredRunner |
f5591061 DK |
879 | { |
880 | public: | |
881 | /* RegisteredRunner API */ | |
882 | IpcIoRr(): owner(NULL) {} | |
f5591061 DK |
883 | virtual ~IpcIoRr(); |
884 | ||
4404f1c5 DK |
885 | protected: |
886 | virtual void create(const RunnerRegistry &); | |
887 | ||
f5591061 DK |
888 | private: |
889 | Ipc::FewToFewBiQueue::Owner *owner; | |
890 | }; | |
891 | ||
892 | RunnerRegistrationEntry(rrAfterConfig, IpcIoRr); | |
893 | ||
4404f1c5 | 894 | void IpcIoRr::create(const RunnerRegistry &) |
f5591061 | 895 | { |
e9f68413 | 896 | if (Config.cacheSwap.n_strands <= 0) |
f5591061 DK |
897 | return; |
898 | ||
4404f1c5 | 899 | Must(!owner); |
4404f1c5 | 900 | owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1, |
3b581957 | 901 | Config.cacheSwap.n_strands, |
4404f1c5 | 902 | 1 + Config.workers, sizeof(IpcIoMsg), |
ea2cdeb6 | 903 | QueueCapacity); |
f5591061 DK |
904 | } |
905 | ||
906 | IpcIoRr::~IpcIoRr() | |
907 | { | |
908 | delete owner; | |
909 | } |