]>
Commit | Line | Data |
---|---|---|
e2851fe7 | 1 | /* |
bbc27441 AJ |
2 | * Copyright (C) 1996-2014 The Squid Software Foundation and contributors |
3 | * | |
4 | * Squid software is distributed under GPLv2+ license and includes | |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
e2851fe7 AR |
7 | */ |
8 | ||
bbc27441 AJ |
9 | /* DEBUG: section 79 Disk IO Routines */ |
10 | ||
f7f3304a | 11 | #include "squid.h" |
50dc81ec | 12 | #include "base/TextException.h" |
dcd84f80 | 13 | #include "CollapsedForwarding.h" |
e2851fe7 AR |
14 | #include "DiskIO/DiskIOModule.h" |
15 | #include "DiskIO/DiskIOStrategy.h" | |
16 | #include "DiskIO/WriteRequest.h" | |
e2851fe7 | 17 | #include "fs/rock/RockIoRequests.h" |
602d9612 | 18 | #include "fs/rock/RockIoState.h" |
e2851fe7 | 19 | #include "fs/rock/RockSwapDir.h" |
582c2af2 | 20 | #include "globals.h" |
50dc81ec | 21 | #include "Mem.h" |
602d9612 | 22 | #include "MemObject.h" |
50dc81ec | 23 | #include "Parsing.h" |
e6d2c263 | 24 | #include "Transients.h" |
e2851fe7 | 25 | |
50dc81ec | 26 | Rock::IoState::IoState(Rock::SwapDir::Pointer &aDir, |
9199139f AR |
27 | StoreEntry *anEntry, |
28 | StoreIOState::STFNCB *cbFile, | |
29 | StoreIOState::STIOCB *cbIo, | |
30 | void *data): | |
50dc81ec AR |
31 | readableAnchor_(NULL), |
32 | writeableAnchor_(NULL), | |
33 | sidCurrent(-1), | |
93910d5c | 34 | dir(aDir), |
50dc81ec AR |
35 | slotSize(dir->slotSize), |
36 | objOffset(0), | |
37 | theBuf(dir->slotSize) | |
e2851fe7 AR |
38 | { |
39 | e = anEntry; | |
50dc81ec AR |
40 | e->lock("rock I/O"); |
41 | // anchor, swap_filen, and swap_dirn are set by the caller | |
e2851fe7 AR |
42 | file_callback = cbFile; |
43 | callback = cbIo; | |
44 | callback_data = cbdataReference(data); | |
45 | ++store_open_disk_fd; // TODO: use a dedicated counter? | |
46 | //theFile is set by SwapDir because it depends on DiskIOStrategy | |
47 | } | |
48 | ||
49 | Rock::IoState::~IoState() | |
50 | { | |
51 | --store_open_disk_fd; | |
50dc81ec AR |
52 | |
53 | // The dir map entry may still be open for reading at the point because | |
54 | // the map entry lock is associated with StoreEntry, not IoState. | |
55 | // assert(!readableAnchor_); | |
7ec749c6 | 56 | assert(shutting_down || !writeableAnchor_); |
50dc81ec | 57 | |
e2851fe7 AR |
58 | if (callback_data) |
59 | cbdataReferenceDone(callback_data); | |
60 | theFile = NULL; | |
50dc81ec AR |
61 | |
62 | e->unlock("rock I/O"); | |
e2851fe7 AR |
63 | } |
64 | ||
65 | void | |
66 | Rock::IoState::file(const RefCount<DiskFile> &aFile) | |
67 | { | |
68 | assert(!theFile); | |
69 | assert(aFile != NULL); | |
70 | theFile = aFile; | |
71 | } | |
72 | ||
50dc81ec AR |
73 | const Ipc::StoreMapAnchor & |
74 | Rock::IoState::readAnchor() const | |
75 | { | |
76 | assert(readableAnchor_); | |
77 | return *readableAnchor_; | |
78 | } | |
79 | ||
80 | Ipc::StoreMapAnchor & | |
81 | Rock::IoState::writeAnchor() | |
82 | { | |
83 | assert(writeableAnchor_); | |
84 | return *writeableAnchor_; | |
85 | } | |
86 | ||
87 | /// convenience wrapper returning the map slot we are reading now | |
88 | const Ipc::StoreMapSlice & | |
89 | Rock::IoState::currentReadableSlice() const | |
90 | { | |
91 | return dir->map->readableSlice(swap_filen, sidCurrent); | |
92 | } | |
93 | ||
e2851fe7 | 94 | void |
c728b6f9 | 95 | Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data) |
e2851fe7 | 96 | { |
50dc81ec AR |
97 | debugs(79, 7, swap_filen << " reads from " << coreOff); |
98 | ||
e2851fe7 | 99 | assert(theFile != NULL); |
c728b6f9 | 100 | assert(coreOff >= 0); |
c728b6f9 | 101 | |
50dc81ec AR |
102 | // if we are dealing with the first read or |
103 | // if the offset went backwords, start searching from the beginning | |
104 | if (sidCurrent < 0 || coreOff < objOffset) { | |
105 | sidCurrent = readAnchor().start; | |
93910d5c AR |
106 | objOffset = 0; |
107 | } | |
108 | ||
5296bbd9 | 109 | while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) { |
50dc81ec AR |
110 | objOffset += currentReadableSlice().size; |
111 | sidCurrent = currentReadableSlice().next; | |
93910d5c | 112 | } |
e2851fe7 | 113 | |
e2851fe7 AR |
114 | assert(read.callback == NULL); |
115 | assert(read.callback_data == NULL); | |
116 | read.callback = cb; | |
117 | read.callback_data = cbdataReference(data); | |
118 | ||
5296bbd9 AR |
119 | // punt if read offset is too big (because of client bugs or collapsing) |
120 | if (sidCurrent < 0) { | |
121 | debugs(79, 5, "no " << coreOff << " in " << *e); | |
122 | callReaderBack(buf, 0); | |
123 | return; | |
124 | } | |
125 | ||
126 | offset_ = coreOff; | |
127 | len = min(len, | |
9d4e9cfb | 128 | static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff)); |
50dc81ec | 129 | const uint64_t diskOffset = dir->diskOffset(sidCurrent); |
93910d5c | 130 | theFile->read(new ReadRequest(::ReadRequest(buf, |
9d4e9cfb | 131 | diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this)); |
e2851fe7 AR |
132 | } |
133 | ||
5296bbd9 AR |
134 | void |
135 | Rock::IoState::callReaderBack(const char *buf, int rlen) | |
136 | { | |
137 | debugs(79, 5, rlen << " bytes for " << *e); | |
138 | StoreIOState::STRCB *callb = read.callback; | |
139 | assert(callb); | |
140 | read.callback = NULL; | |
141 | void *cbdata; | |
142 | if (cbdataReferenceValidDone(read.callback_data, &cbdata)) | |
143 | callb(cbdata, buf, rlen, this); | |
144 | } | |
145 | ||
50dc81ec AR |
146 | /// wraps tryWrite() to handle deep write failures centrally and safely |
147 | bool | |
c728b6f9 | 148 | Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor) |
e2851fe7 | 149 | { |
50dc81ec AR |
150 | bool success = false; |
151 | try { | |
152 | tryWrite(buf, size, coreOff); | |
153 | success = true; | |
a57a662c AR |
154 | } catch (const std::exception &ex) { // TODO: should we catch ... as well? |
155 | debugs(79, 2, "db write error: " << ex.what()); | |
4475555f | 156 | dir->writeError(*e); |
50dc81ec AR |
157 | finishedWriting(DISK_ERROR); |
158 | // 'this' might be gone beyond this point; fall through to free buf | |
e2851fe7 AR |
159 | } |
160 | ||
50dc81ec | 161 | // careful: 'this' might be gone here |
9d4e9cfb | 162 | |
50dc81ec AR |
163 | if (dtor) |
164 | (dtor)(const_cast<char*>(buf)); // cast due to a broken API? | |
93910d5c | 165 | |
50dc81ec AR |
166 | return success; |
167 | } | |
93910d5c | 168 | |
e4d13993 AR |
169 | /** |
170 | * Possibly send data to be written to disk: | |
171 | * We only write data when full slot is accumulated or when close() is called. | |
172 | * We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of | |
9d4e9cfb | 173 | * the slot when the write does not end at the page or sector boundary. |
e4d13993 | 174 | */ |
50dc81ec AR |
175 | void |
176 | Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff) | |
177 | { | |
178 | debugs(79, 7, swap_filen << " writes " << size << " more"); | |
93910d5c | 179 | |
50dc81ec AR |
180 | // either this is the first write or append; we do not support write gaps |
181 | assert(!coreOff || coreOff == -1); | |
93910d5c | 182 | |
e4d13993 | 183 | // allocate the first slice during the first write |
50dc81ec AR |
184 | if (!coreOff) { |
185 | assert(sidCurrent < 0); | |
186 | sidCurrent = reserveSlotForWriting(); // throws on failures | |
187 | assert(sidCurrent >= 0); | |
188 | writeAnchor().start = sidCurrent; | |
93910d5c | 189 | } |
e2851fe7 | 190 | |
50dc81ec AR |
191 | // buffer incoming data in slot buffer and write overflowing or final slots |
192 | // quit when no data left or we stopped writing on reentrant error | |
193 | while (size > 0 && theFile != NULL) { | |
194 | assert(sidCurrent >= 0); | |
195 | const size_t processed = writeToBuffer(buf, size); | |
196 | buf += processed; | |
197 | size -= processed; | |
198 | const bool overflow = size > 0; | |
199 | ||
200 | // We do not write a full buffer without overflow because | |
201 | // we would not yet know what to set the nextSlot to. | |
202 | if (overflow) { | |
203 | const SlotId sidNext = reserveSlotForWriting(); // throws | |
204 | assert(sidNext >= 0); | |
205 | writeToDisk(sidNext); | |
5296bbd9 | 206 | } else if (Store::Root().transientReaders(*e)) { |
e6d2c263 | 207 | // write partial buffer for all remote hit readers to see |
5296bbd9 | 208 | writeBufToDisk(-1, false); |
50dc81ec AR |
209 | } |
210 | } | |
807feb1d | 211 | |
50dc81ec AR |
212 | } |
213 | ||
214 | /// Buffers incoming data for the current slot. | |
e4d13993 | 215 | /// \return the number of bytes buffered |
50dc81ec AR |
216 | size_t |
217 | Rock::IoState::writeToBuffer(char const *buf, size_t size) | |
218 | { | |
219 | // do not buffer a cell header for nothing | |
220 | if (!size) | |
221 | return 0; | |
222 | ||
223 | if (!theBuf.size) { | |
224 | // will fill the header in writeToDisk when the next slot is known | |
225 | theBuf.appended(sizeof(DbCellHeader)); | |
226 | } | |
227 | ||
228 | size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize())); | |
229 | theBuf.append(buf, forCurrentSlot); | |
230 | offset_ += forCurrentSlot; // so that Core thinks we wrote it | |
231 | return forCurrentSlot; | |
e2851fe7 AR |
232 | } |
233 | ||
50dc81ec AR |
234 | /// write what was buffered during write() calls |
235 | /// negative sidNext means this is the last write request for this entry | |
e2851fe7 | 236 | void |
50dc81ec | 237 | Rock::IoState::writeToDisk(const SlotId sidNext) |
e2851fe7 AR |
238 | { |
239 | assert(theFile != NULL); | |
50dc81ec AR |
240 | assert(theBuf.size >= sizeof(DbCellHeader)); |
241 | ||
fd23a2ca AR |
242 | // TODO: if DiskIO module is mmap-based, we should be writing whole pages |
243 | // to avoid triggering read-page;new_head+old_tail;write-page overheads | |
244 | ||
5296bbd9 AR |
245 | writeBufToDisk(sidNext, sidNext < 0); |
246 | theBuf.clear(); | |
247 | ||
248 | sidCurrent = sidNext; | |
249 | } | |
250 | ||
251 | /// creates and submits a request to write current slot buffer to disk | |
252 | /// eof is true if and only this is the last slot | |
253 | void | |
254 | Rock::IoState::writeBufToDisk(const SlotId sidNext, bool eof) | |
255 | { | |
256 | // no slots after the last/eof slot (but partial slots may have a nil next) | |
257 | assert(!eof || sidNext < 0); | |
258 | ||
50dc81ec AR |
259 | // finalize db cell header |
260 | DbCellHeader header; | |
261 | memcpy(header.key, e->key, sizeof(header.key)); | |
262 | header.firstSlot = writeAnchor().start; | |
263 | header.nextSlot = sidNext; | |
264 | header.payloadSize = theBuf.size - sizeof(DbCellHeader); | |
1ffb8602 | 265 | header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write |
50dc81ec AR |
266 | header.version = writeAnchor().basics.timestamp; |
267 | ||
268 | // copy finalized db cell header into buffer | |
269 | memcpy(theBuf.mem, &header, sizeof(DbCellHeader)); | |
270 | ||
271 | // and now allocate another buffer for the WriteRequest so that | |
272 | // we can support concurrent WriteRequests (and to ease cleaning) | |
273 | // TODO: should we limit the number of outstanding requests? | |
274 | size_t wBufCap = 0; | |
275 | void *wBuf = memAllocBuf(theBuf.size, &wBufCap); | |
276 | memcpy(wBuf, theBuf.mem, theBuf.size); | |
93910d5c | 277 | |
807feb1d DK |
278 | const uint64_t diskOffset = dir->diskOffset(sidCurrent); |
279 | debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' << | |
280 | theBuf.size); | |
281 | ||
93910d5c | 282 | WriteRequest *const r = new WriteRequest( |
50dc81ec | 283 | ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size, |
9d4e9cfb | 284 | memFreeBufFunc(wBufCap)), this); |
ce49546e AR |
285 | r->sidCurrent = sidCurrent; |
286 | r->sidNext = sidNext; | |
5296bbd9 | 287 | r->eof = eof; |
50dc81ec AR |
288 | |
289 | // theFile->write may call writeCompleted immediatelly | |
93910d5c | 290 | theFile->write(r); |
e2851fe7 AR |
291 | } |
292 | ||
50dc81ec AR |
293 | /// finds and returns a free db slot to fill or throws |
294 | Rock::SlotId | |
295 | Rock::IoState::reserveSlotForWriting() | |
296 | { | |
297 | Ipc::Mem::PageId pageId; | |
298 | if (dir->useFreeSlot(pageId)) | |
299 | return pageId.number-1; | |
300 | ||
301 | // This may happen when the number of available db slots is close to the | |
302 | // number of concurrent requests reading or writing those slots, which may | |
303 | // happen when the db is "small" compared to the request traffic OR when we | |
304 | // are rebuilding and have not loaded "many" entries or empty slots yet. | |
305 | throw TexcHere("ran out of free db slots"); | |
306 | } | |
307 | ||
e2851fe7 AR |
308 | void |
309 | Rock::IoState::finishedWriting(const int errFlag) | |
310 | { | |
c728b6f9 | 311 | // we incremented offset_ while accumulating data in write() |
49769258 | 312 | // we do not reset writeableAnchor_ here because we still keep the lock |
2912daee | 313 | CollapsedForwarding::Broadcast(*e); |
e2851fe7 AR |
314 | callBack(errFlag); |
315 | } | |
316 | ||
317 | void | |
c728b6f9 | 318 | Rock::IoState::close(int how) |
e2851fe7 | 319 | { |
50dc81ec AR |
320 | debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how << |
321 | " buf: " << theBuf.size << " callback: " << callback); | |
322 | ||
323 | if (!theFile) { | |
324 | debugs(79, 3, "I/O already canceled"); | |
325 | assert(!callback); | |
49769258 | 326 | // We keep writeableAnchor_ after callBack() on I/O errors. |
50dc81ec AR |
327 | assert(!readableAnchor_); |
328 | return; | |
329 | } | |
330 | ||
331 | switch (how) { | |
332 | case wroteAll: | |
333 | assert(theBuf.size > 0); // we never flush last bytes on our own | |
334 | writeToDisk(-1); // flush last, yet unwritten slot to disk | |
335 | return; // writeCompleted() will callBack() | |
336 | ||
337 | case writerGone: | |
338 | assert(writeableAnchor_); | |
4475555f | 339 | dir->writeError(*e); // abort a partially stored entry |
50dc81ec AR |
340 | finishedWriting(DISK_ERROR); |
341 | return; | |
342 | ||
343 | case readerDone: | |
344 | callBack(0); | |
345 | return; | |
346 | } | |
e2851fe7 AR |
347 | } |
348 | ||
9199139f | 349 | /// close callback (STIOCB) dialer: breaks dependencies and |
e2851fe7 | 350 | /// counts IOState concurrency level |
9199139f | 351 | class StoreIOStateCb: public CallDialer |
e2851fe7 AR |
352 | { |
353 | public: | |
354 | StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio): | |
9199139f AR |
355 | callback(NULL), |
356 | callback_data(NULL), | |
357 | errflag(err), | |
358 | sio(anSio) { | |
e2851fe7 AR |
359 | |
360 | callback = cb; | |
361 | callback_data = cbdataReference(data); | |
362 | } | |
363 | ||
364 | StoreIOStateCb(const StoreIOStateCb &cb): | |
9199139f AR |
365 | callback(NULL), |
366 | callback_data(NULL), | |
367 | errflag(cb.errflag), | |
368 | sio(cb.sio) { | |
e2851fe7 AR |
369 | |
370 | callback = cb.callback; | |
371 | callback_data = cbdataReference(cb.callback_data); | |
372 | } | |
373 | ||
374 | virtual ~StoreIOStateCb() { | |
375 | cbdataReferenceDone(callback_data); // may be nil already | |
376 | } | |
377 | ||
378 | void dial(AsyncCall &call) { | |
379 | void *cbd; | |
380 | if (cbdataReferenceValidDone(callback_data, &cbd) && callback) | |
381 | callback(cbd, errflag, sio.getRaw()); | |
382 | } | |
383 | ||
384 | bool canDial(AsyncCall &call) const { | |
385 | return cbdataReferenceValid(callback_data) && callback; | |
386 | } | |
387 | ||
388 | virtual void print(std::ostream &os) const { | |
389 | os << '(' << callback_data << ", err=" << errflag << ')'; | |
390 | } | |
391 | ||
392 | private: | |
393 | StoreIOStateCb &operator =(const StoreIOStateCb &cb); // not defined | |
394 | ||
395 | StoreIOState::STIOCB *callback; | |
396 | void *callback_data; | |
397 | int errflag; | |
398 | Rock::IoState::Pointer sio; | |
399 | }; | |
400 | ||
e2851fe7 AR |
401 | void |
402 | Rock::IoState::callBack(int errflag) | |
403 | { | |
404 | debugs(79,3, HERE << "errflag=" << errflag); | |
405 | theFile = NULL; | |
406 | ||
407 | AsyncCall::Pointer call = asyncCall(79,3, "SomeIoStateCloseCb", | |
9199139f | 408 | StoreIOStateCb(callback, callback_data, errflag, this)); |
e2851fe7 AR |
409 | ScheduleCallHere(call); |
410 | ||
411 | callback = NULL; | |
412 | cbdataReferenceDone(callback_data); | |
413 | } | |
414 |