]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockIoState.cc
Source Format Enforcement (#532)
[thirdparty/squid.git] / src / fs / rock / RockIoState.cc
1 /*
2 * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 79 Disk IO Routines */
10
11 #include "squid.h"
12 #include "base/TextException.h"
13 #include "CollapsedForwarding.h"
14 #include "DiskIO/DiskIOModule.h"
15 #include "DiskIO/DiskIOStrategy.h"
16 #include "DiskIO/WriteRequest.h"
17 #include "fs/rock/RockIoRequests.h"
18 #include "fs/rock/RockIoState.h"
19 #include "fs/rock/RockSwapDir.h"
20 #include "globals.h"
21 #include "MemObject.h"
22 #include "Parsing.h"
23 #include "Transients.h"
24
25 Rock::IoState::IoState(Rock::SwapDir::Pointer &aDir,
26 StoreEntry *anEntry,
27 StoreIOState::STFNCB *cbFile,
28 StoreIOState::STIOCB *cbIo,
29 void *data) :
30 StoreIOState(cbFile, cbIo, data),
31 readableAnchor_(NULL),
32 writeableAnchor_(NULL),
33 splicingPoint(-1),
34 staleSplicingPointNext(-1),
35 dir(aDir),
36 slotSize(dir->slotSize),
37 objOffset(0),
38 sidFirst(-1),
39 sidPrevious(-1),
40 sidCurrent(-1),
41 sidNext(-1),
42 requestsSent(0),
43 repliesReceived(0),
44 theBuf(dir->slotSize)
45 {
46 e = anEntry;
47 e->lock("rock I/O");
48 // anchor, swap_filen, and swap_dirn are set by the caller
49 ++store_open_disk_fd; // TODO: use a dedicated counter?
50 //theFile is set by SwapDir because it depends on DiskIOStrategy
51 }
52
53 Rock::IoState::~IoState()
54 {
55 --store_open_disk_fd;
56
57 // The dir map entry may still be open for reading at the point because
58 // the map entry lock is associated with StoreEntry, not IoState.
59 // assert(!readableAnchor_);
60 assert(shutting_down || !writeableAnchor_);
61
62 if (callback_data)
63 cbdataReferenceDone(callback_data);
64 theFile = NULL;
65
66 e->unlock("rock I/O");
67 }
68
69 void
70 Rock::IoState::file(const RefCount<DiskFile> &aFile)
71 {
72 assert(!theFile);
73 assert(aFile != NULL);
74 theFile = aFile;
75 }
76
77 const Ipc::StoreMapAnchor &
78 Rock::IoState::readAnchor() const
79 {
80 assert(readableAnchor_);
81 return *readableAnchor_;
82 }
83
84 Ipc::StoreMapAnchor &
85 Rock::IoState::writeAnchor()
86 {
87 assert(writeableAnchor_);
88 return *writeableAnchor_;
89 }
90
91 /// convenience wrapper returning the map slot we are reading now
92 const Ipc::StoreMapSlice &
93 Rock::IoState::currentReadableSlice() const
94 {
95 return dir->map->readableSlice(swap_filen, sidCurrent);
96 }
97
98 void
99 Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
100 {
101 debugs(79, 7, swap_filen << " reads from " << coreOff);
102
103 assert(theFile != NULL);
104 assert(coreOff >= 0);
105
106 // if we are dealing with the first read or
107 // if the offset went backwords, start searching from the beginning
108 if (sidCurrent < 0 || coreOff < objOffset) {
109 // readers do not need sidFirst but set it for consistency/triage sake
110 sidCurrent = sidFirst = readAnchor().start;
111 objOffset = 0;
112 }
113
114 while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
115 objOffset += currentReadableSlice().size;
116 sidCurrent = currentReadableSlice().next;
117 }
118
119 assert(read.callback == NULL);
120 assert(read.callback_data == NULL);
121 read.callback = cb;
122 read.callback_data = cbdataReference(data);
123
124 // punt if read offset is too big (because of client bugs or collapsing)
125 if (sidCurrent < 0) {
126 debugs(79, 5, "no " << coreOff << " in " << *e);
127 callReaderBack(buf, 0);
128 return;
129 }
130
131 offset_ = coreOff;
132 len = min(len,
133 static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
134 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
135 const auto start = diskOffset + sizeof(DbCellHeader) + coreOff - objOffset;
136 const auto id = ++requestsSent;
137 const auto request = new ReadRequest(::ReadRequest(buf, start, len), this, id);
138 theFile->read(request);
139 }
140
141 void
142 Rock::IoState::handleReadCompletion(Rock::ReadRequest &request, const int rlen, const int errFlag)
143 {
144 if (errFlag != DISK_OK || rlen < 0) {
145 debugs(79, 3, errFlag << " failure for " << *e);
146 return callReaderBack(request.buf, -1);
147 }
148
149 if (!expectedReply(request.id))
150 return callReaderBack(request.buf, -1);
151
152 debugs(79, 5, '#' << request.id << " read " << rlen << " bytes at " << offset_ << " for " << *e);
153 offset_ += rlen;
154 callReaderBack(request.buf, rlen);
155 }
156
157 /// report (already sanitized/checked) I/O results to the read initiator
158 void
159 Rock::IoState::callReaderBack(const char *buf, int rlen)
160 {
161 splicingPoint = rlen >= 0 ? sidCurrent : -1;
162 if (splicingPoint < 0)
163 staleSplicingPointNext = -1;
164 else
165 staleSplicingPointNext = currentReadableSlice().next;
166 StoreIOState::STRCB *callb = read.callback;
167 assert(callb);
168 read.callback = NULL;
169 void *cbdata;
170 if (cbdataReferenceValidDone(read.callback_data, &cbdata))
171 callb(cbdata, buf, rlen, this);
172 }
173
174 /// wraps tryWrite() to handle deep write failures centrally and safely
175 bool
176 Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
177 {
178 bool success = false;
179 try {
180 tryWrite(buf, size, coreOff);
181 success = true;
182 } catch (const std::exception &ex) { // TODO: should we catch ... as well?
183 debugs(79, 2, "db write error: " << ex.what());
184 dir->writeError(*this);
185 finishedWriting(DISK_ERROR);
186 // 'this' might be gone beyond this point; fall through to free buf
187 }
188
189 // careful: 'this' might be gone here
190
191 if (dtor)
192 (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
193
194 return success;
195 }
196
197 /**
198 * Possibly send data to be written to disk:
199 * We only write data when full slot is accumulated or when close() is called.
200 * We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
201 * the slot when the write does not end at the page or sector boundary.
202 */
203 void
204 Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
205 {
206 debugs(79, 7, swap_filen << " writes " << size << " more");
207
208 // either this is the first write or append;
209 // we do not support write gaps or rewrites
210 assert(!coreOff || coreOff == -1);
211
212 // throw if an accepted unknown-size entry grew too big or max-size changed
213 Must(static_cast<uint64_t>(offset_ + size) <= static_cast<uint64_t>(dir->maxObjectSize()));
214
215 // buffer incoming data in slot buffer and write overflowing or final slots
216 // quit when no data left or we stopped writing on reentrant error
217 while (size > 0 && theFile != NULL) {
218 const size_t processed = writeToBuffer(buf, size);
219 buf += processed;
220 size -= processed;
221 const bool overflow = size > 0;
222
223 // We do not write a full buffer without overflow because
224 // we do not want to risk writing a payload-free slot on EOF.
225 if (overflow) {
226 Must(sidNext < 0);
227 sidNext = dir->reserveSlotForWriting();
228 assert(sidNext >= 0);
229 writeToDisk();
230 Must(sidNext < 0); // short sidNext lifetime simplifies code logic
231 }
232 }
233
234 }
235
236 /// Buffers incoming data for the current slot.
237 /// \return the number of bytes buffered
238 size_t
239 Rock::IoState::writeToBuffer(char const *buf, size_t size)
240 {
241 // do not buffer a cell header for nothing
242 if (!size)
243 return 0;
244
245 if (!theBuf.size) {
246 // eventually, writeToDisk() will fill this header space
247 theBuf.appended(sizeof(DbCellHeader));
248 }
249
250 size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
251 theBuf.append(buf, forCurrentSlot);
252 offset_ += forCurrentSlot; // so that Core thinks we wrote it
253 return forCurrentSlot;
254 }
255
256 /// write what was buffered during write() calls
257 void
258 Rock::IoState::writeToDisk()
259 {
260 assert(theFile != NULL);
261 assert(theBuf.size >= sizeof(DbCellHeader));
262
263 assert((sidFirst < 0) == (sidCurrent < 0));
264 if (sidFirst < 0) // this is the first disk write
265 sidCurrent = sidFirst = dir->reserveSlotForWriting();
266
267 // negative sidNext means this is the last write request for this entry
268 const bool lastWrite = sidNext < 0;
269 // here, eof means that we are writing the right-most entry slot
270 const bool eof = lastWrite &&
271 // either not updating or the updating reader has loaded everything
272 (touchingStoreEntry() || staleSplicingPointNext < 0);
273 debugs(79, 5, "sidCurrent=" << sidCurrent << " sidNext=" << sidNext << " eof=" << eof);
274
275 // TODO: if DiskIO module is mmap-based, we should be writing whole pages
276 // to avoid triggering read-page;new_head+old_tail;write-page overheads
277
278 assert(!eof || sidNext < 0); // no slots after eof
279
280 // finalize db cell header
281 DbCellHeader header;
282 memcpy(header.key, e->key, sizeof(header.key));
283 header.firstSlot = sidFirst;
284
285 const auto lastUpdatingWrite = lastWrite && !touchingStoreEntry();
286 assert(!lastUpdatingWrite || sidNext < 0);
287 header.nextSlot = lastUpdatingWrite ? staleSplicingPointNext : sidNext;
288
289 header.payloadSize = theBuf.size - sizeof(DbCellHeader);
290 header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
291 header.version = writeAnchor().basics.timestamp;
292
293 // copy finalized db cell header into buffer
294 memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
295
296 // and now allocate another buffer for the WriteRequest so that
297 // we can support concurrent WriteRequests (and to ease cleaning)
298 // TODO: should we limit the number of outstanding requests?
299 size_t wBufCap = 0;
300 void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
301 memcpy(wBuf, theBuf.mem, theBuf.size);
302
303 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
304 debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
305 theBuf.size);
306 const auto id = ++requestsSent;
307 WriteRequest *const r = new WriteRequest(
308 ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
309 memFreeBufFunc(wBufCap)), this, id);
310 r->sidCurrent = sidCurrent;
311 r->sidPrevious = sidPrevious;
312 r->eof = lastWrite;
313
314 sidPrevious = sidCurrent;
315 sidCurrent = sidNext; // sidNext may be cleared/negative already
316 sidNext = -1;
317
318 theBuf.clear();
319
320 // theFile->write may call writeCompleted immediatelly
321 theFile->write(r);
322 }
323
324 bool
325 Rock::IoState::expectedReply(const IoXactionId receivedId)
326 {
327 Must(requestsSent); // paranoid: we sent some requests
328 Must(receivedId); // paranoid: the request was sent by some sio
329 Must(receivedId <= requestsSent); // paranoid: within our range
330 ++repliesReceived;
331 const auto expectedId = repliesReceived;
332 if (receivedId == expectedId)
333 return true;
334
335 debugs(79, 3, "no; expected reply #" << expectedId <<
336 ", but got #" << receivedId);
337 return false;
338 }
339
340 void
341 Rock::IoState::finishedWriting(const int errFlag)
342 {
343 if (sidCurrent >= 0) {
344 dir->noteFreeMapSlice(sidCurrent);
345 sidCurrent = -1;
346 }
347 if (sidNext >= 0) {
348 dir->noteFreeMapSlice(sidNext);
349 sidNext = -1;
350 }
351
352 // we incremented offset_ while accumulating data in write()
353 // we do not reset writeableAnchor_ here because we still keep the lock
354 if (touchingStoreEntry())
355 CollapsedForwarding::Broadcast(*e);
356 callBack(errFlag);
357 }
358
359 void
360 Rock::IoState::close(int how)
361 {
362 debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
363 " leftovers: " << theBuf.size <<
364 " after " << requestsSent << '/' << repliesReceived <<
365 " callback: " << callback);
366
367 if (!theFile) {
368 debugs(79, 3, "I/O already canceled");
369 assert(!callback);
370 // We keep writeableAnchor_ after callBack() on I/O errors.
371 assert(!readableAnchor_);
372 return;
373 }
374
375 switch (how) {
376 case wroteAll:
377 assert(theBuf.size > 0); // we never flush last bytes on our own
378 try {
379 writeToDisk(); // flush last, yet unwritten slot to disk
380 return; // writeCompleted() will callBack()
381 }
382 catch (...) {
383 debugs(79, 2, "db flush error: " << CurrentException);
384 // TODO: Move finishedWriting() into SwapDir::writeError().
385 dir->writeError(*this);
386 finishedWriting(DISK_ERROR);
387 }
388 return;
389
390 case writerGone:
391 dir->writeError(*this); // abort a partially stored entry
392 finishedWriting(DISK_ERROR);
393 return;
394
395 case readerDone:
396 callBack(0);
397 return;
398 }
399 }
400
401 /// close callback (STIOCB) dialer: breaks dependencies and
402 /// counts IOState concurrency level
403 class StoreIOStateCb: public CallDialer
404 {
405 public:
406 StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio):
407 callback(NULL),
408 callback_data(NULL),
409 errflag(err),
410 sio(anSio) {
411
412 callback = cb;
413 callback_data = cbdataReference(data);
414 }
415
416 StoreIOStateCb(const StoreIOStateCb &cb):
417 callback(NULL),
418 callback_data(NULL),
419 errflag(cb.errflag),
420 sio(cb.sio) {
421
422 callback = cb.callback;
423 callback_data = cbdataReference(cb.callback_data);
424 }
425
426 virtual ~StoreIOStateCb() {
427 cbdataReferenceDone(callback_data); // may be nil already
428 }
429
430 void dial(AsyncCall &) {
431 void *cbd;
432 if (cbdataReferenceValidDone(callback_data, &cbd) && callback)
433 callback(cbd, errflag, sio.getRaw());
434 }
435
436 bool canDial(AsyncCall &) const {
437 return cbdataReferenceValid(callback_data) && callback;
438 }
439
440 virtual void print(std::ostream &os) const {
441 os << '(' << callback_data << ", err=" << errflag << ')';
442 }
443
444 private:
445 StoreIOStateCb &operator =(const StoreIOStateCb &); // not defined
446
447 StoreIOState::STIOCB *callback;
448 void *callback_data;
449 int errflag;
450 Rock::IoState::Pointer sio;
451 };
452
453 void
454 Rock::IoState::callBack(int errflag)
455 {
456 debugs(79,3, HERE << "errflag=" << errflag);
457 theFile = NULL;
458
459 AsyncCall::Pointer call = asyncCall(79,3, "SomeIoStateCloseCb",
460 StoreIOStateCb(callback, callback_data, errflag, this));
461 ScheduleCallHere(call);
462
463 callback = NULL;
464 cbdataReferenceDone(callback_data);
465 }
466