]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockIoState.cc
Docs: Copyright updates for 2018 (#114)
[thirdparty/squid.git] / src / fs / rock / RockIoState.cc
1 /*
2 * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 79 Disk IO Routines */
10
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "CollapsedForwarding.h"
14 #include "DiskIO/DiskIOModule.h"
15 #include "DiskIO/DiskIOStrategy.h"
16 #include "DiskIO/WriteRequest.h"
17 #include "fs/rock/RockIoRequests.h"
18 #include "fs/rock/RockIoState.h"
19 #include "fs/rock/RockSwapDir.h"
20 #include "globals.h"
21 #include "MemObject.h"
22 #include "Parsing.h"
23 #include "Transients.h"
24
25 Rock::IoState::IoState(Rock::SwapDir::Pointer &aDir,
26 StoreEntry *anEntry,
27 StoreIOState::STFNCB *cbFile,
28 StoreIOState::STIOCB *cbIo,
29 void *data) :
30 StoreIOState(cbFile, cbIo, data),
31 readableAnchor_(NULL),
32 writeableAnchor_(NULL),
33 splicingPoint(-1),
34 staleSplicingPointNext(-1),
35 dir(aDir),
36 slotSize(dir->slotSize),
37 objOffset(0),
38 sidCurrent(-1),
39 theBuf(dir->slotSize)
40 {
41 e = anEntry;
42 e->lock("rock I/O");
43 // anchor, swap_filen, and swap_dirn are set by the caller
44 ++store_open_disk_fd; // TODO: use a dedicated counter?
45 //theFile is set by SwapDir because it depends on DiskIOStrategy
46 }
47
48 Rock::IoState::~IoState()
49 {
50 --store_open_disk_fd;
51
52 // The dir map entry may still be open for reading at the point because
53 // the map entry lock is associated with StoreEntry, not IoState.
54 // assert(!readableAnchor_);
55 assert(shutting_down || !writeableAnchor_);
56
57 if (callback_data)
58 cbdataReferenceDone(callback_data);
59 theFile = NULL;
60
61 e->unlock("rock I/O");
62 }
63
64 void
65 Rock::IoState::file(const RefCount<DiskFile> &aFile)
66 {
67 assert(!theFile);
68 assert(aFile != NULL);
69 theFile = aFile;
70 }
71
72 const Ipc::StoreMapAnchor &
73 Rock::IoState::readAnchor() const
74 {
75 assert(readableAnchor_);
76 return *readableAnchor_;
77 }
78
79 Ipc::StoreMapAnchor &
80 Rock::IoState::writeAnchor()
81 {
82 assert(writeableAnchor_);
83 return *writeableAnchor_;
84 }
85
86 /// convenience wrapper returning the map slot we are reading now
87 const Ipc::StoreMapSlice &
88 Rock::IoState::currentReadableSlice() const
89 {
90 return dir->map->readableSlice(swap_filen, sidCurrent);
91 }
92
93 void
94 Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
95 {
96 debugs(79, 7, swap_filen << " reads from " << coreOff);
97
98 assert(theFile != NULL);
99 assert(coreOff >= 0);
100
101 // if we are dealing with the first read or
102 // if the offset went backwords, start searching from the beginning
103 if (sidCurrent < 0 || coreOff < objOffset) {
104 sidCurrent = readAnchor().start;
105 objOffset = 0;
106 }
107
108 while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
109 objOffset += currentReadableSlice().size;
110 sidCurrent = currentReadableSlice().next;
111 }
112
113 assert(read.callback == NULL);
114 assert(read.callback_data == NULL);
115 read.callback = cb;
116 read.callback_data = cbdataReference(data);
117
118 // punt if read offset is too big (because of client bugs or collapsing)
119 if (sidCurrent < 0) {
120 debugs(79, 5, "no " << coreOff << " in " << *e);
121 callReaderBack(buf, 0);
122 return;
123 }
124
125 offset_ = coreOff;
126 len = min(len,
127 static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
128 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
129 theFile->read(new ReadRequest(::ReadRequest(buf,
130 diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
131 }
132
133 void
134 Rock::IoState::callReaderBack(const char *buf, int rlen)
135 {
136 debugs(79, 5, rlen << " bytes for " << *e);
137 splicingPoint = rlen >= 0 ? sidCurrent : -1;
138 if (splicingPoint < 0)
139 staleSplicingPointNext = -1;
140 else
141 staleSplicingPointNext = currentReadableSlice().next;
142 StoreIOState::STRCB *callb = read.callback;
143 assert(callb);
144 read.callback = NULL;
145 void *cbdata;
146 if (cbdataReferenceValidDone(read.callback_data, &cbdata))
147 callb(cbdata, buf, rlen, this);
148 }
149
150 /// wraps tryWrite() to handle deep write failures centrally and safely
151 bool
152 Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
153 {
154 bool success = false;
155 try {
156 tryWrite(buf, size, coreOff);
157 success = true;
158 } catch (const std::exception &ex) { // TODO: should we catch ... as well?
159 debugs(79, 2, "db write error: " << ex.what());
160 dir->writeError(*this);
161 finishedWriting(DISK_ERROR);
162 // 'this' might be gone beyond this point; fall through to free buf
163 }
164
165 // careful: 'this' might be gone here
166
167 if (dtor)
168 (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
169
170 return success;
171 }
172
173 /**
174 * Possibly send data to be written to disk:
175 * We only write data when full slot is accumulated or when close() is called.
176 * We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
177 * the slot when the write does not end at the page or sector boundary.
178 */
179 void
180 Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
181 {
182 debugs(79, 7, swap_filen << " writes " << size << " more");
183
184 // either this is the first write or append; we do not support write gaps
185 assert(!coreOff || coreOff == -1);
186
187 // throw if an accepted unknown-size entry grew too big or max-size changed
188 Must(static_cast<uint64_t>(offset_ + size) <= static_cast<uint64_t>(dir->maxObjectSize()));
189
190 // allocate the first slice during the first write
191 if (!coreOff) {
192 assert(sidCurrent < 0);
193 sidCurrent = reserveSlotForWriting(); // throws on failures
194 assert(sidCurrent >= 0);
195 writeAnchor().start = sidCurrent;
196 }
197
198 // buffer incoming data in slot buffer and write overflowing or final slots
199 // quit when no data left or we stopped writing on reentrant error
200 while (size > 0 && theFile != NULL) {
201 assert(sidCurrent >= 0);
202 const size_t processed = writeToBuffer(buf, size);
203 buf += processed;
204 size -= processed;
205 const bool overflow = size > 0;
206
207 // We do not write a full buffer without overflow because
208 // we would not yet know what to set the nextSlot to.
209 if (overflow) {
210 const SlotId sidNext = reserveSlotForWriting(); // throws
211 assert(sidNext >= 0);
212 writeToDisk(sidNext);
213 } else if (Store::Root().transientReaders(*e)) {
214 // write partial buffer for all remote hit readers to see
215 writeBufToDisk(-1, false, false);
216 }
217 }
218
219 }
220
221 /// Buffers incoming data for the current slot.
222 /// \return the number of bytes buffered
223 size_t
224 Rock::IoState::writeToBuffer(char const *buf, size_t size)
225 {
226 // do not buffer a cell header for nothing
227 if (!size)
228 return 0;
229
230 if (!theBuf.size) {
231 // will fill the header in writeToDisk when the next slot is known
232 theBuf.appended(sizeof(DbCellHeader));
233 }
234
235 size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
236 theBuf.append(buf, forCurrentSlot);
237 offset_ += forCurrentSlot; // so that Core thinks we wrote it
238 return forCurrentSlot;
239 }
240
241 /// write what was buffered during write() calls
242 /// negative sidNext means this is the last write request for this entry
243 void
244 Rock::IoState::writeToDisk(const SlotId sidNextProposal)
245 {
246 assert(theFile != NULL);
247 assert(theBuf.size >= sizeof(DbCellHeader));
248
249 const bool lastWrite = sidNextProposal < 0;
250 const bool eof = lastWrite &&
251 // either not updating or the updating reader has loaded everything
252 (touchingStoreEntry() || staleSplicingPointNext < 0);
253 // approve sidNextProposal unless _updating_ the last slot
254 const SlotId sidNext = (!touchingStoreEntry() && lastWrite) ?
255 staleSplicingPointNext : sidNextProposal;
256 debugs(79, 5, "sidNext:" << sidNextProposal << "=>" << sidNext << " eof=" << eof);
257
258 // TODO: if DiskIO module is mmap-based, we should be writing whole pages
259 // to avoid triggering read-page;new_head+old_tail;write-page overheads
260
261 writeBufToDisk(sidNext, eof, lastWrite);
262 theBuf.clear();
263
264 sidCurrent = sidNext;
265 }
266
267 /// creates and submits a request to write current slot buffer to disk
268 /// eof is true if and only this is the last slot
269 void
270 Rock::IoState::writeBufToDisk(const SlotId sidNext, const bool eof, const bool lastWrite)
271 {
272 // no slots after the last/eof slot (but partial slots may have a nil next)
273 assert(!eof || sidNext < 0);
274
275 // finalize db cell header
276 DbCellHeader header;
277 memcpy(header.key, e->key, sizeof(header.key));
278 header.firstSlot = writeAnchor().start;
279 header.nextSlot = sidNext;
280 header.payloadSize = theBuf.size - sizeof(DbCellHeader);
281 header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
282 header.version = writeAnchor().basics.timestamp;
283
284 // copy finalized db cell header into buffer
285 memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
286
287 // and now allocate another buffer for the WriteRequest so that
288 // we can support concurrent WriteRequests (and to ease cleaning)
289 // TODO: should we limit the number of outstanding requests?
290 size_t wBufCap = 0;
291 void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
292 memcpy(wBuf, theBuf.mem, theBuf.size);
293
294 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
295 debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
296 theBuf.size);
297
298 WriteRequest *const r = new WriteRequest(
299 ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
300 memFreeBufFunc(wBufCap)), this);
301 r->sidCurrent = sidCurrent;
302 r->sidNext = sidNext;
303 r->eof = lastWrite;
304
305 // theFile->write may call writeCompleted immediatelly
306 theFile->write(r);
307 }
308
309 /// finds and returns a free db slot to fill or throws
310 Rock::SlotId
311 Rock::IoState::reserveSlotForWriting()
312 {
313 Ipc::Mem::PageId pageId;
314 if (dir->useFreeSlot(pageId))
315 return pageId.number-1;
316
317 // This may happen when the number of available db slots is close to the
318 // number of concurrent requests reading or writing those slots, which may
319 // happen when the db is "small" compared to the request traffic OR when we
320 // are rebuilding and have not loaded "many" entries or empty slots yet.
321 throw TexcHere("ran out of free db slots");
322 }
323
324 void
325 Rock::IoState::finishedWriting(const int errFlag)
326 {
327 // we incremented offset_ while accumulating data in write()
328 // we do not reset writeableAnchor_ here because we still keep the lock
329 if (touchingStoreEntry())
330 CollapsedForwarding::Broadcast(*e);
331 callBack(errFlag);
332 }
333
334 void
335 Rock::IoState::close(int how)
336 {
337 debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
338 " buf: " << theBuf.size << " callback: " << callback);
339
340 if (!theFile) {
341 debugs(79, 3, "I/O already canceled");
342 assert(!callback);
343 // We keep writeableAnchor_ after callBack() on I/O errors.
344 assert(!readableAnchor_);
345 return;
346 }
347
348 switch (how) {
349 case wroteAll:
350 assert(theBuf.size > 0); // we never flush last bytes on our own
351 writeToDisk(-1); // flush last, yet unwritten slot to disk
352 return; // writeCompleted() will callBack()
353
354 case writerGone:
355 dir->writeError(*this); // abort a partially stored entry
356 finishedWriting(DISK_ERROR);
357 return;
358
359 case readerDone:
360 callBack(0);
361 return;
362 }
363 }
364
365 /// close callback (STIOCB) dialer: breaks dependencies and
366 /// counts IOState concurrency level
367 class StoreIOStateCb: public CallDialer
368 {
369 public:
370 StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio):
371 callback(NULL),
372 callback_data(NULL),
373 errflag(err),
374 sio(anSio) {
375
376 callback = cb;
377 callback_data = cbdataReference(data);
378 }
379
380 StoreIOStateCb(const StoreIOStateCb &cb):
381 callback(NULL),
382 callback_data(NULL),
383 errflag(cb.errflag),
384 sio(cb.sio) {
385
386 callback = cb.callback;
387 callback_data = cbdataReference(cb.callback_data);
388 }
389
390 virtual ~StoreIOStateCb() {
391 cbdataReferenceDone(callback_data); // may be nil already
392 }
393
394 void dial(AsyncCall &) {
395 void *cbd;
396 if (cbdataReferenceValidDone(callback_data, &cbd) && callback)
397 callback(cbd, errflag, sio.getRaw());
398 }
399
400 bool canDial(AsyncCall &) const {
401 return cbdataReferenceValid(callback_data) && callback;
402 }
403
404 virtual void print(std::ostream &os) const {
405 os << '(' << callback_data << ", err=" << errflag << ')';
406 }
407
408 private:
409 StoreIOStateCb &operator =(const StoreIOStateCb &); // not defined
410
411 StoreIOState::STIOCB *callback;
412 void *callback_data;
413 int errflag;
414 Rock::IoState::Pointer sio;
415 };
416
417 void
418 Rock::IoState::callBack(int errflag)
419 {
420 debugs(79,3, HERE << "errflag=" << errflag);
421 theFile = NULL;
422
423 AsyncCall::Pointer call = asyncCall(79,3, "SomeIoStateCloseCb",
424 StoreIOStateCb(callback, callback_data, errflag, this));
425 ScheduleCallHere(call);
426
427 callback = NULL;
428 cbdataReferenceDone(callback_data);
429 }
430