]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockIoState.cc
2 * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 79 Disk IO Routines */
12 #include "base/TextException.h"
13 #include "CollapsedForwarding.h"
14 #include "DiskIO/DiskIOModule.h"
15 #include "DiskIO/DiskIOStrategy.h"
16 #include "DiskIO/WriteRequest.h"
17 #include "fs/rock/RockIoRequests.h"
18 #include "fs/rock/RockIoState.h"
19 #include "fs/rock/RockSwapDir.h"
21 #include "MemObject.h"
23 #include "Transients.h"
25 Rock::IoState::IoState(Rock::SwapDir::Pointer
&aDir
,
27 StoreIOState::STFNCB
*cbFile
,
28 StoreIOState::STIOCB
*cbIo
,
30 StoreIOState(cbFile
, cbIo
, data
),
31 readableAnchor_(nullptr),
32 writeableAnchor_(nullptr),
34 staleSplicingPointNext(-1),
36 slotSize(dir
->slotSize
),
48 // anchor, swap_filen, and swap_dirn are set by the caller
49 ++store_open_disk_fd
; // TODO: use a dedicated counter?
50 //theFile is set by SwapDir because it depends on DiskIOStrategy
53 Rock::IoState::~IoState()
57 // The dir map entry may still be open for reading at the point because
58 // the map entry lock is associated with StoreEntry, not IoState.
59 // assert(!readableAnchor_);
60 assert(shutting_down
|| !writeableAnchor_
);
63 cbdataReferenceDone(callback_data
);
66 e
->unlock("rock I/O");
70 Rock::IoState::file(const RefCount
<DiskFile
> &aFile
)
73 assert(aFile
!= nullptr);
77 const Ipc::StoreMapAnchor
&
78 Rock::IoState::readAnchor() const
80 assert(readableAnchor_
);
81 return *readableAnchor_
;
85 Rock::IoState::writeAnchor()
87 assert(writeableAnchor_
);
88 return *writeableAnchor_
;
91 /// convenience wrapper returning the map slot we are reading now
92 const Ipc::StoreMapSlice
&
93 Rock::IoState::currentReadableSlice() const
95 return dir
->map
->readableSlice(swap_filen
, sidCurrent
);
99 Rock::IoState::read_(char *buf
, size_t len
, off_t coreOff
, STRCB
*cb
, void *data
)
101 debugs(79, 7, swap_filen
<< " reads from " << coreOff
);
103 assert(theFile
!= nullptr);
104 assert(coreOff
>= 0);
106 // if we are dealing with the first read or
107 // if the offset went backwords, start searching from the beginning
108 if (sidCurrent
< 0 || coreOff
< objOffset
) {
109 // readers do not need sidFirst but set it for consistency/triage sake
110 sidCurrent
= sidFirst
= readAnchor().start
;
114 while (sidCurrent
>= 0 && coreOff
>= objOffset
+ currentReadableSlice().size
) {
115 objOffset
+= currentReadableSlice().size
;
116 sidCurrent
= currentReadableSlice().next
;
119 assert(read
.callback
== nullptr);
120 assert(read
.callback_data
== nullptr);
122 read
.callback_data
= cbdataReference(data
);
124 // punt if read offset is too big (because of client bugs or collapsing)
125 if (sidCurrent
< 0) {
126 debugs(79, 5, "no " << coreOff
<< " in " << *e
);
127 callReaderBack(buf
, 0);
133 static_cast<size_t>(objOffset
+ currentReadableSlice().size
- coreOff
));
134 const uint64_t diskOffset
= dir
->diskOffset(sidCurrent
);
135 const auto start
= diskOffset
+ sizeof(DbCellHeader
) + coreOff
- objOffset
;
136 const auto id
= ++requestsSent
;
137 const auto request
= new ReadRequest(::ReadRequest(buf
, start
, len
), this, id
);
138 theFile
->read(request
);
142 Rock::IoState::handleReadCompletion(Rock::ReadRequest
&request
, const int rlen
, const int errFlag
)
144 if (errFlag
!= DISK_OK
|| rlen
< 0) {
145 debugs(79, 3, errFlag
<< " failure for " << *e
);
146 return callReaderBack(request
.buf
, -1);
149 if (!expectedReply(request
.id
))
150 return callReaderBack(request
.buf
, -1);
152 debugs(79, 5, '#' << request
.id
<< " read " << rlen
<< " bytes at " << offset_
<< " for " << *e
);
154 callReaderBack(request
.buf
, rlen
);
157 /// report (already sanitized/checked) I/O results to the read initiator
159 Rock::IoState::callReaderBack(const char *buf
, int rlen
)
161 splicingPoint
= rlen
>= 0 ? sidCurrent
: -1;
162 if (splicingPoint
< 0)
163 staleSplicingPointNext
= -1;
165 staleSplicingPointNext
= currentReadableSlice().next
;
166 StoreIOState::STRCB
*callb
= read
.callback
;
168 read
.callback
= nullptr;
170 if (cbdataReferenceValidDone(read
.callback_data
, &cbdata
))
171 callb(cbdata
, buf
, rlen
, this);
174 /// wraps tryWrite() to handle deep write failures centrally and safely
176 Rock::IoState::write(char const *buf
, size_t size
, off_t coreOff
, FREE
*dtor
)
178 bool success
= false;
180 tryWrite(buf
, size
, coreOff
);
182 } catch (const std::exception
&ex
) { // TODO: should we catch ... as well?
183 debugs(79, 2, "db write error: " << ex
.what());
184 dir
->writeError(*this);
185 finishedWriting(DISK_ERROR
);
186 // 'this' might be gone beyond this point; fall through to free buf
189 // careful: 'this' might be gone here
192 (dtor
)(const_cast<char*>(buf
)); // cast due to a broken API?
198 * Possibly send data to be written to disk:
199 * We only write data when full slot is accumulated or when close() is called.
200 * We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
201 * the slot when the write does not end at the page or sector boundary.
204 Rock::IoState::tryWrite(char const *buf
, size_t size
, off_t coreOff
)
206 debugs(79, 7, swap_filen
<< " writes " << size
<< " more");
208 // either this is the first write or append;
209 // we do not support write gaps or rewrites
210 assert(!coreOff
|| coreOff
== -1);
212 // throw if an accepted unknown-size entry grew too big or max-size changed
213 Must(static_cast<uint64_t>(offset_
+ size
) <= static_cast<uint64_t>(dir
->maxObjectSize()));
215 // buffer incoming data in slot buffer and write overflowing or final slots
216 // quit when no data left or we stopped writing on reentrant error
217 while (size
> 0 && theFile
!= nullptr) {
218 const size_t processed
= writeToBuffer(buf
, size
);
221 const bool overflow
= size
> 0;
223 // We do not write a full buffer without overflow because
224 // we do not want to risk writing a payload-free slot on EOF.
227 sidNext
= dir
->reserveSlotForWriting();
228 assert(sidNext
>= 0);
230 Must(sidNext
< 0); // short sidNext lifetime simplifies code logic
236 /// Buffers incoming data for the current slot.
237 /// \return the number of bytes buffered
239 Rock::IoState::writeToBuffer(char const *buf
, size_t size
)
241 // do not buffer a cell header for nothing
246 // eventually, writeToDisk() will fill this header space
247 theBuf
.appended(sizeof(DbCellHeader
));
250 size_t forCurrentSlot
= min(size
, static_cast<size_t>(theBuf
.spaceSize()));
251 theBuf
.append(buf
, forCurrentSlot
);
252 offset_
+= forCurrentSlot
; // so that Core thinks we wrote it
253 return forCurrentSlot
;
256 /// write what was buffered during write() calls
258 Rock::IoState::writeToDisk()
260 assert(theFile
!= nullptr);
261 assert(theBuf
.size
>= sizeof(DbCellHeader
));
263 assert((sidFirst
< 0) == (sidCurrent
< 0));
264 if (sidFirst
< 0) // this is the first disk write
265 sidCurrent
= sidFirst
= dir
->reserveSlotForWriting();
267 // negative sidNext means this is the last write request for this entry
268 const bool lastWrite
= sidNext
< 0;
269 // here, eof means that we are writing the right-most entry slot
270 const bool eof
= lastWrite
&&
271 // either not updating or the updating reader has loaded everything
272 (touchingStoreEntry() || staleSplicingPointNext
< 0);
273 debugs(79, 5, "sidCurrent=" << sidCurrent
<< " sidNext=" << sidNext
<< " eof=" << eof
);
275 // TODO: if DiskIO module is mmap-based, we should be writing whole pages
276 // to avoid triggering read-page;new_head+old_tail;write-page overheads
278 assert(!eof
|| sidNext
< 0); // no slots after eof
280 // finalize db cell header
282 memcpy(header
.key
, e
->key
, sizeof(header
.key
));
283 header
.firstSlot
= sidFirst
;
285 const auto lastUpdatingWrite
= lastWrite
&& !touchingStoreEntry();
286 assert(!lastUpdatingWrite
|| sidNext
< 0);
287 header
.nextSlot
= lastUpdatingWrite
? staleSplicingPointNext
: sidNext
;
289 header
.payloadSize
= theBuf
.size
- sizeof(DbCellHeader
);
290 header
.entrySize
= eof
? offset_
: 0; // storeSwapOutFileClosed sets swap_file_sz after write
291 header
.version
= writeAnchor().basics
.timestamp
;
293 // copy finalized db cell header into buffer
294 memcpy(theBuf
.mem
, &header
, sizeof(DbCellHeader
));
296 // and now allocate another buffer for the WriteRequest so that
297 // we can support concurrent WriteRequests (and to ease cleaning)
298 // TODO: should we limit the number of outstanding requests?
300 void *wBuf
= memAllocBuf(theBuf
.size
, &wBufCap
);
301 memcpy(wBuf
, theBuf
.mem
, theBuf
.size
);
303 const uint64_t diskOffset
= dir
->diskOffset(sidCurrent
);
304 debugs(79, 5, swap_filen
<< " at " << diskOffset
<< '+' <<
306 const auto id
= ++requestsSent
;
307 WriteRequest
*const r
= new WriteRequest(
308 ::WriteRequest(static_cast<char*>(wBuf
), diskOffset
, theBuf
.size
,
309 memFreeBufFunc(wBufCap
)), this, id
);
310 r
->sidCurrent
= sidCurrent
;
311 r
->sidPrevious
= sidPrevious
;
314 sidPrevious
= sidCurrent
;
315 sidCurrent
= sidNext
; // sidNext may be cleared/negative already
320 // theFile->write may call writeCompleted immediately
325 Rock::IoState::expectedReply(const IoXactionId receivedId
)
327 Must(requestsSent
); // paranoid: we sent some requests
328 Must(receivedId
); // paranoid: the request was sent by some sio
329 Must(receivedId
<= requestsSent
); // paranoid: within our range
331 const auto expectedId
= repliesReceived
;
332 if (receivedId
== expectedId
)
335 debugs(79, 3, "no; expected reply #" << expectedId
<<
336 ", but got #" << receivedId
);
341 Rock::IoState::finishedWriting(const int errFlag
)
343 if (sidCurrent
>= 0) {
344 dir
->noteFreeMapSlice(sidCurrent
);
348 dir
->noteFreeMapSlice(sidNext
);
352 // we incremented offset_ while accumulating data in write()
353 // we do not reset writeableAnchor_ here because we still keep the lock
354 if (touchingStoreEntry())
355 CollapsedForwarding::Broadcast(*e
);
360 Rock::IoState::close(int how
)
362 debugs(79, 3, swap_filen
<< " offset: " << offset_
<< " how: " << how
<<
363 " leftovers: " << theBuf
.size
<<
364 " after " << requestsSent
<< '/' << repliesReceived
<<
365 " callback: " << callback
);
368 debugs(79, 3, "I/O already canceled");
370 // We keep writeableAnchor_ after callBack() on I/O errors.
371 assert(!readableAnchor_
);
377 assert(theBuf
.size
> 0); // we never flush last bytes on our own
379 writeToDisk(); // flush last, yet unwritten slot to disk
380 return; // writeCompleted() will callBack()
383 debugs(79, 2, "db flush error: " << CurrentException
);
384 // TODO: Move finishedWriting() into SwapDir::writeError().
385 dir
->writeError(*this);
386 finishedWriting(DISK_ERROR
);
391 dir
->writeError(*this); // abort a partially stored entry
392 finishedWriting(DISK_ERROR
);
401 /// close callback (STIOCB) dialer: breaks dependencies and
402 /// counts IOState concurrency level
403 class StoreIOStateCb
: public CallDialer
406 StoreIOStateCb(StoreIOState::STIOCB
*cb
, void *data
, int err
, const Rock::IoState::Pointer
&anSio
):
408 callback_data(nullptr),
413 callback_data
= cbdataReference(data
);
416 StoreIOStateCb(const StoreIOStateCb
&cb
):
418 callback_data(nullptr),
422 callback
= cb
.callback
;
423 callback_data
= cbdataReference(cb
.callback_data
);
426 virtual ~StoreIOStateCb() {
427 cbdataReferenceDone(callback_data
); // may be nil already
430 void dial(AsyncCall
&) {
432 if (cbdataReferenceValidDone(callback_data
, &cbd
) && callback
)
433 callback(cbd
, errflag
, sio
.getRaw());
436 bool canDial(AsyncCall
&) const {
437 return cbdataReferenceValid(callback_data
) && callback
;
440 virtual void print(std::ostream
&os
) const {
441 os
<< '(' << callback_data
<< ", err=" << errflag
<< ')';
445 StoreIOStateCb
&operator =(const StoreIOStateCb
&); // not defined
447 StoreIOState::STIOCB
*callback
;
450 Rock::IoState::Pointer sio
;
454 Rock::IoState::callBack(int errflag
)
456 debugs(79,3, "errflag=" << errflag
);
459 AsyncCall::Pointer call
= asyncCall(79,3, "SomeIoStateCloseCb",
460 StoreIOStateCb(callback
, callback_data
, errflag
, this));
461 ScheduleCallHere(call
);
464 cbdataReferenceDone(callback_data
);