]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockIoState.cc
Polishing touches to address PREVIEW review concerns dated 2013/07/03.
[thirdparty/squid.git] / src / fs / rock / RockIoState.cc
1 /*
2 * DEBUG: section 79 Disk IO Routines
3 */
4
5 #include "squid.h"
6 #include "base/TextException.h"
7 #include "CollapsedForwarding.h"
8 #include "DiskIO/DiskIOModule.h"
9 #include "DiskIO/DiskIOStrategy.h"
10 #include "DiskIO/WriteRequest.h"
11 #include "fs/rock/RockIoRequests.h"
12 #include "fs/rock/RockIoState.h"
13 #include "fs/rock/RockSwapDir.h"
14 #include "globals.h"
15 #include "Mem.h"
16 #include "MemObject.h"
17 #include "Parsing.h"
18 #include "Transients.h"
19
20 Rock::IoState::IoState(Rock::SwapDir::Pointer &aDir,
21 StoreEntry *anEntry,
22 StoreIOState::STFNCB *cbFile,
23 StoreIOState::STIOCB *cbIo,
24 void *data):
25 readableAnchor_(NULL),
26 writeableAnchor_(NULL),
27 sidCurrent(-1),
28 dir(aDir),
29 slotSize(dir->slotSize),
30 objOffset(0),
31 theBuf(dir->slotSize)
32 {
33 e = anEntry;
34 e->lock("rock I/O");
35 // anchor, swap_filen, and swap_dirn are set by the caller
36 file_callback = cbFile;
37 callback = cbIo;
38 callback_data = cbdataReference(data);
39 ++store_open_disk_fd; // TODO: use a dedicated counter?
40 //theFile is set by SwapDir because it depends on DiskIOStrategy
41 }
42
43 Rock::IoState::~IoState()
44 {
45 --store_open_disk_fd;
46
47 // The dir map entry may still be open for reading at the point because
48 // the map entry lock is associated with StoreEntry, not IoState.
49 // assert(!readableAnchor_);
50 assert(shutting_down || !writeableAnchor_);
51
52 if (callback_data)
53 cbdataReferenceDone(callback_data);
54 theFile = NULL;
55
56 e->unlock("rock I/O");
57 }
58
59 void
60 Rock::IoState::file(const RefCount<DiskFile> &aFile)
61 {
62 assert(!theFile);
63 assert(aFile != NULL);
64 theFile = aFile;
65 }
66
67 const Ipc::StoreMapAnchor &
68 Rock::IoState::readAnchor() const
69 {
70 assert(readableAnchor_);
71 return *readableAnchor_;
72 }
73
74 Ipc::StoreMapAnchor &
75 Rock::IoState::writeAnchor()
76 {
77 assert(writeableAnchor_);
78 return *writeableAnchor_;
79 }
80
81 /// convenience wrapper returning the map slot we are reading now
82 const Ipc::StoreMapSlice &
83 Rock::IoState::currentReadableSlice() const
84 {
85 return dir->map->readableSlice(swap_filen, sidCurrent);
86 }
87
88 void
89 Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
90 {
91 debugs(79, 7, swap_filen << " reads from " << coreOff);
92
93 assert(theFile != NULL);
94 assert(coreOff >= 0);
95
96 // if we are dealing with the first read or
97 // if the offset went backwords, start searching from the beginning
98 if (sidCurrent < 0 || coreOff < objOffset) {
99 sidCurrent = readAnchor().start;
100 objOffset = 0;
101 }
102
103 while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
104 objOffset += currentReadableSlice().size;
105 sidCurrent = currentReadableSlice().next;
106 }
107
108 assert(read.callback == NULL);
109 assert(read.callback_data == NULL);
110 read.callback = cb;
111 read.callback_data = cbdataReference(data);
112
113 // punt if read offset is too big (because of client bugs or collapsing)
114 if (sidCurrent < 0) {
115 debugs(79, 5, "no " << coreOff << " in " << *e);
116 callReaderBack(buf, 0);
117 return;
118 }
119
120 offset_ = coreOff;
121 len = min(len,
122 static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
123 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
124 theFile->read(new ReadRequest(::ReadRequest(buf,
125 diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
126 }
127
128 void
129 Rock::IoState::callReaderBack(const char *buf, int rlen)
130 {
131 debugs(79, 5, rlen << " bytes for " << *e);
132 StoreIOState::STRCB *callb = read.callback;
133 assert(callb);
134 read.callback = NULL;
135 void *cbdata;
136 if (cbdataReferenceValidDone(read.callback_data, &cbdata))
137 callb(cbdata, buf, rlen, this);
138 }
139
140
141 /// wraps tryWrite() to handle deep write failures centrally and safely
142 bool
143 Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
144 {
145 bool success = false;
146 try {
147 tryWrite(buf, size, coreOff);
148 success = true;
149 } catch (const std::exception &ex) { // TODO: should we catch ... as well?
150 debugs(79, 2, "db write error: " << ex.what());
151 dir->writeError(*e);
152 finishedWriting(DISK_ERROR);
153 // 'this' might be gone beyond this point; fall through to free buf
154 }
155
156 // careful: 'this' might be gone here
157
158 if (dtor)
159 (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
160
161 return success;
162 }
163
164 /**
165 * Possibly send data to be written to disk:
166 * We only write data when full slot is accumulated or when close() is called.
167 * We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
168 * the slot when the write does not end at the page or sector boundary.
169 */
170 void
171 Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
172 {
173 debugs(79, 7, swap_filen << " writes " << size << " more");
174
175 // either this is the first write or append; we do not support write gaps
176 assert(!coreOff || coreOff == -1);
177
178 // allocate the first slice during the first write
179 if (!coreOff) {
180 assert(sidCurrent < 0);
181 sidCurrent = reserveSlotForWriting(); // throws on failures
182 assert(sidCurrent >= 0);
183 writeAnchor().start = sidCurrent;
184 }
185
186 // buffer incoming data in slot buffer and write overflowing or final slots
187 // quit when no data left or we stopped writing on reentrant error
188 while (size > 0 && theFile != NULL) {
189 assert(sidCurrent >= 0);
190 const size_t processed = writeToBuffer(buf, size);
191 buf += processed;
192 size -= processed;
193 const bool overflow = size > 0;
194
195 // We do not write a full buffer without overflow because
196 // we would not yet know what to set the nextSlot to.
197 if (overflow) {
198 const SlotId sidNext = reserveSlotForWriting(); // throws
199 assert(sidNext >= 0);
200 writeToDisk(sidNext);
201 } else if (Store::Root().transientReaders(*e)) {
202 // write partial buffer for all remote hit readers to see
203 writeBufToDisk(-1, false);
204 }
205 }
206
207 }
208
209 /// Buffers incoming data for the current slot.
210 /// \return the number of bytes buffered
211 size_t
212 Rock::IoState::writeToBuffer(char const *buf, size_t size)
213 {
214 // do not buffer a cell header for nothing
215 if (!size)
216 return 0;
217
218 if (!theBuf.size) {
219 // will fill the header in writeToDisk when the next slot is known
220 theBuf.appended(sizeof(DbCellHeader));
221 }
222
223 size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
224 theBuf.append(buf, forCurrentSlot);
225 offset_ += forCurrentSlot; // so that Core thinks we wrote it
226 return forCurrentSlot;
227 }
228
229 /// write what was buffered during write() calls
230 /// negative sidNext means this is the last write request for this entry
231 void
232 Rock::IoState::writeToDisk(const SlotId sidNext)
233 {
234 assert(theFile != NULL);
235 assert(theBuf.size >= sizeof(DbCellHeader));
236
237 // TODO: if DiskIO module is mmap-based, we should be writing whole pages
238 // to avoid triggering read-page;new_head+old_tail;write-page overheads
239
240 writeBufToDisk(sidNext, sidNext < 0);
241 theBuf.clear();
242
243 sidCurrent = sidNext;
244 }
245
246 /// creates and submits a request to write current slot buffer to disk
247 /// eof is true if and only this is the last slot
248 void
249 Rock::IoState::writeBufToDisk(const SlotId sidNext, bool eof)
250 {
251 // no slots after the last/eof slot (but partial slots may have a nil next)
252 assert(!eof || sidNext < 0);
253
254 // finalize db cell header
255 DbCellHeader header;
256 memcpy(header.key, e->key, sizeof(header.key));
257 header.firstSlot = writeAnchor().start;
258 header.nextSlot = sidNext;
259 header.payloadSize = theBuf.size - sizeof(DbCellHeader);
260 header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
261 header.version = writeAnchor().basics.timestamp;
262
263 // copy finalized db cell header into buffer
264 memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
265
266 // and now allocate another buffer for the WriteRequest so that
267 // we can support concurrent WriteRequests (and to ease cleaning)
268 // TODO: should we limit the number of outstanding requests?
269 size_t wBufCap = 0;
270 void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
271 memcpy(wBuf, theBuf.mem, theBuf.size);
272
273 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
274 debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
275 theBuf.size);
276
277 WriteRequest *const r = new WriteRequest(
278 ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
279 memFreeBufFunc(wBufCap)), this);
280 r->sidCurrent = sidCurrent;
281 r->sidNext = sidNext;
282 r->eof = eof;
283
284 // theFile->write may call writeCompleted immediatelly
285 theFile->write(r);
286 }
287
288 /// finds and returns a free db slot to fill or throws
289 Rock::SlotId
290 Rock::IoState::reserveSlotForWriting()
291 {
292 Ipc::Mem::PageId pageId;
293 if (dir->useFreeSlot(pageId))
294 return pageId.number-1;
295
296 // This may happen when the number of available db slots is close to the
297 // number of concurrent requests reading or writing those slots, which may
298 // happen when the db is "small" compared to the request traffic OR when we
299 // are rebuilding and have not loaded "many" entries or empty slots yet.
300 throw TexcHere("ran out of free db slots");
301 }
302
303 void
304 Rock::IoState::finishedWriting(const int errFlag)
305 {
306 // we incremented offset_ while accumulating data in write()
307 // we do not reset writeableAnchor_ here because we still keep the lock
308 CollapsedForwarding::Broadcast(*e);
309 callBack(errFlag);
310 }
311
312 void
313 Rock::IoState::close(int how)
314 {
315 debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
316 " buf: " << theBuf.size << " callback: " << callback);
317
318 if (!theFile) {
319 debugs(79, 3, "I/O already canceled");
320 assert(!callback);
321 // We keep writeableAnchor_ after callBack() on I/O errors.
322 assert(!readableAnchor_);
323 return;
324 }
325
326 switch (how) {
327 case wroteAll:
328 assert(theBuf.size > 0); // we never flush last bytes on our own
329 writeToDisk(-1); // flush last, yet unwritten slot to disk
330 return; // writeCompleted() will callBack()
331
332 case writerGone:
333 assert(writeableAnchor_);
334 dir->writeError(*e); // abort a partially stored entry
335 finishedWriting(DISK_ERROR);
336 return;
337
338 case readerDone:
339 callBack(0);
340 return;
341 }
342 }
343
344 /// close callback (STIOCB) dialer: breaks dependencies and
345 /// counts IOState concurrency level
346 class StoreIOStateCb: public CallDialer
347 {
348 public:
349 StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio):
350 callback(NULL),
351 callback_data(NULL),
352 errflag(err),
353 sio(anSio) {
354
355 callback = cb;
356 callback_data = cbdataReference(data);
357 }
358
359 StoreIOStateCb(const StoreIOStateCb &cb):
360 callback(NULL),
361 callback_data(NULL),
362 errflag(cb.errflag),
363 sio(cb.sio) {
364
365 callback = cb.callback;
366 callback_data = cbdataReference(cb.callback_data);
367 }
368
369 virtual ~StoreIOStateCb() {
370 cbdataReferenceDone(callback_data); // may be nil already
371 }
372
373 void dial(AsyncCall &call) {
374 void *cbd;
375 if (cbdataReferenceValidDone(callback_data, &cbd) && callback)
376 callback(cbd, errflag, sio.getRaw());
377 }
378
379 bool canDial(AsyncCall &call) const {
380 return cbdataReferenceValid(callback_data) && callback;
381 }
382
383 virtual void print(std::ostream &os) const {
384 os << '(' << callback_data << ", err=" << errflag << ')';
385 }
386
387 private:
388 StoreIOStateCb &operator =(const StoreIOStateCb &cb); // not defined
389
390 StoreIOState::STIOCB *callback;
391 void *callback_data;
392 int errflag;
393 Rock::IoState::Pointer sio;
394 };
395
396 void
397 Rock::IoState::callBack(int errflag)
398 {
399 debugs(79,3, HERE << "errflag=" << errflag);
400 theFile = NULL;
401
402 AsyncCall::Pointer call = asyncCall(79,3, "SomeIoStateCloseCb",
403 StoreIOStateCb(callback, callback_data, errflag, this));
404 ScheduleCallHere(call);
405
406 callback = NULL;
407 cbdataReferenceDone(callback_data);
408 }
409