]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockIoState.cc
2 * DEBUG: section 79 Disk IO Routines
6 #include "base/TextException.h"
7 #include "DiskIO/DiskIOModule.h"
8 #include "DiskIO/DiskIOStrategy.h"
9 #include "DiskIO/WriteRequest.h"
10 #include "fs/rock/RockIoState.h"
11 #include "fs/rock/RockIoRequests.h"
12 #include "fs/rock/RockSwapDir.h"
14 #include "MemObject.h"
18 Rock::IoState::IoState(Rock::SwapDir::Pointer
&aDir
,
20 StoreIOState::STFNCB
*cbFile
,
21 StoreIOState::STIOCB
*cbIo
,
23 readableAnchor_(NULL
),
24 writeableAnchor_(NULL
),
27 slotSize(dir
->slotSize
),
33 // anchor, swap_filen, and swap_dirn are set by the caller
34 file_callback
= cbFile
;
36 callback_data
= cbdataReference(data
);
37 ++store_open_disk_fd
; // TODO: use a dedicated counter?
38 //theFile is set by SwapDir because it depends on DiskIOStrategy
41 Rock::IoState::~IoState()
45 // The dir map entry may still be open for reading at the point because
46 // the map entry lock is associated with StoreEntry, not IoState.
47 // assert(!readableAnchor_);
48 assert(!writeableAnchor_
);
51 cbdataReferenceDone(callback_data
);
54 e
->unlock("rock I/O");
58 Rock::IoState::file(const RefCount
<DiskFile
> &aFile
)
61 assert(aFile
!= NULL
);
65 const Ipc::StoreMapAnchor
&
66 Rock::IoState::readAnchor() const
68 assert(readableAnchor_
);
69 return *readableAnchor_
;
73 Rock::IoState::writeAnchor()
75 assert(writeableAnchor_
);
76 return *writeableAnchor_
;
79 /// convenience wrapper returning the map slot we are reading now
80 const Ipc::StoreMapSlice
&
81 Rock::IoState::currentReadableSlice() const
83 return dir
->map
->readableSlice(swap_filen
, sidCurrent
);
87 Rock::IoState::read_(char *buf
, size_t len
, off_t coreOff
, STRCB
*cb
, void *data
)
89 debugs(79, 7, swap_filen
<< " reads from " << coreOff
);
91 assert(theFile
!= NULL
);
94 // if we are dealing with the first read or
95 // if the offset went backwords, start searching from the beginning
96 if (sidCurrent
< 0 || coreOff
< objOffset
) {
97 sidCurrent
= readAnchor().start
;
101 while (coreOff
>= objOffset
+ currentReadableSlice().size
) {
102 objOffset
+= currentReadableSlice().size
;
103 sidCurrent
= currentReadableSlice().next
;
104 assert(sidCurrent
>= 0); // XXX: handle "read offset too big" error
109 static_cast<size_t>(objOffset
+ currentReadableSlice().size
- coreOff
));
111 assert(read
.callback
== NULL
);
112 assert(read
.callback_data
== NULL
);
114 read
.callback_data
= cbdataReference(data
);
116 const uint64_t diskOffset
= dir
->diskOffset(sidCurrent
);
117 theFile
->read(new ReadRequest(::ReadRequest(buf
,
118 diskOffset
+ sizeof(DbCellHeader
) + coreOff
- objOffset
, len
), this));
121 /// wraps tryWrite() to handle deep write failures centrally and safely
123 Rock::IoState::write(char const *buf
, size_t size
, off_t coreOff
, FREE
*dtor
)
125 bool success
= false;
127 tryWrite(buf
, size
, coreOff
);
129 } catch (const std::exception
&ex
) { // TODO: should we catch ... as well?
130 debugs(79, 2, "db write error: " << ex
.what());
131 dir
->writeError(swap_filen
);
132 finishedWriting(DISK_ERROR
);
133 // 'this' might be gone beyond this point; fall through to free buf
136 // careful: 'this' might be gone here
139 (dtor
)(const_cast<char*>(buf
)); // cast due to a broken API?
144 /** We only write data when full slot is accumulated or when close() is called.
145 We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
146 the slot when the write does not end at the page or sector boundary. */
148 Rock::IoState::tryWrite(char const *buf
, size_t size
, off_t coreOff
)
150 debugs(79, 7, swap_filen
<< " writes " << size
<< " more");
152 // either this is the first write or append; we do not support write gaps
153 assert(!coreOff
|| coreOff
== -1);
155 // allocate the first slice diring the first write
157 assert(sidCurrent
< 0);
158 sidCurrent
= reserveSlotForWriting(); // throws on failures
159 assert(sidCurrent
>= 0);
160 writeAnchor().start
= sidCurrent
;
163 // buffer incoming data in slot buffer and write overflowing or final slots
164 // quit when no data left or we stopped writing on reentrant error
165 while (size
> 0 && theFile
!= NULL
) {
166 assert(sidCurrent
>= 0);
167 const size_t processed
= writeToBuffer(buf
, size
);
170 const bool overflow
= size
> 0;
172 // We do not write a full buffer without overflow because
173 // we would not yet know what to set the nextSlot to.
175 const SlotId sidNext
= reserveSlotForWriting(); // throws
176 assert(sidNext
>= 0);
177 writeToDisk(sidNext
);
178 } else if (Config
.onoff
.collapsed_forwarding
) {
179 // write partial buffer for all collapsed hit readers to see
180 // XXX: can we check that this is needed w/o stalling readers
181 // that appear right after our check?
182 writeBufToDisk(false);
188 /// Buffers incoming data for the current slot.
189 /// Returns the number of bytes buffered.
191 Rock::IoState::writeToBuffer(char const *buf
, size_t size
)
193 // do not buffer a cell header for nothing
198 // will fill the header in writeToDisk when the next slot is known
199 theBuf
.appended(sizeof(DbCellHeader
));
202 size_t forCurrentSlot
= min(size
, static_cast<size_t>(theBuf
.spaceSize()));
203 theBuf
.append(buf
, forCurrentSlot
);
204 offset_
+= forCurrentSlot
; // so that Core thinks we wrote it
205 return forCurrentSlot
;
208 /// write what was buffered during write() calls
209 /// negative sidNext means this is the last write request for this entry
211 Rock::IoState::writeToDisk(const SlotId sidNext
)
213 assert(theFile
!= NULL
);
214 assert(theBuf
.size
>= sizeof(DbCellHeader
));
216 if (sidNext
< 0) { // we are writing the last slot
217 e
->swap_file_sz
= offset_
;
218 writeAnchor().basics
.swap_file_sz
= offset_
; // would not hurt, right?
221 // TODO: if DiskIO module is mmap-based, we should be writing whole pages
222 // to avoid triggering read-page;new_head+old_tail;write-page overheads
224 // finalize map slice
225 Ipc::StoreMap::Slice
&slice
=
226 dir
->map
->writeableSlice(swap_filen
, sidCurrent
);
227 slice
.next
= sidNext
;
228 slice
.size
= theBuf
.size
- sizeof(DbCellHeader
);
230 // finalize db cell header
232 memcpy(header
.key
, e
->key
, sizeof(header
.key
));
233 header
.firstSlot
= writeAnchor().start
;
234 header
.nextSlot
= sidNext
;
235 header
.payloadSize
= theBuf
.size
- sizeof(DbCellHeader
);
236 header
.entrySize
= e
->swap_file_sz
; // may still be zero unless sidNext < 0
237 header
.version
= writeAnchor().basics
.timestamp
;
239 // copy finalized db cell header into buffer
240 memcpy(theBuf
.mem
, &header
, sizeof(DbCellHeader
));
242 writeBufToDisk(sidNext
< 0);
245 sidCurrent
= sidNext
;
248 /// Write header-less (XXX) or complete buffer to disk.
250 Rock::IoState::writeBufToDisk(const bool last
)
252 // and now allocate another buffer for the WriteRequest so that
253 // we can support concurrent WriteRequests (and to ease cleaning)
254 // TODO: should we limit the number of outstanding requests?
256 void *wBuf
= memAllocBuf(theBuf
.size
, &wBufCap
);
257 memcpy(wBuf
, theBuf
.mem
, theBuf
.size
);
259 const uint64_t diskOffset
= dir
->diskOffset(sidCurrent
);
260 debugs(79, 5, HERE
<< swap_filen
<< " at " << diskOffset
<< '+' <<
263 WriteRequest
*const r
= new WriteRequest(
264 ::WriteRequest(static_cast<char*>(wBuf
), diskOffset
, theBuf
.size
,
265 memFreeBufFunc(wBufCap
)), this, last
);
267 // theFile->write may call writeCompleted immediatelly
271 /// finds and returns a free db slot to fill or throws
273 Rock::IoState::reserveSlotForWriting()
275 Ipc::Mem::PageId pageId
;
276 if (dir
->useFreeSlot(pageId
))
277 return pageId
.number
-1;
279 // This may happen when the number of available db slots is close to the
280 // number of concurrent requests reading or writing those slots, which may
281 // happen when the db is "small" compared to the request traffic OR when we
282 // are rebuilding and have not loaded "many" entries or empty slots yet.
283 throw TexcHere("ran out of free db slots");
287 Rock::IoState::finishedWriting(const int errFlag
)
289 // we incremented offset_ while accumulating data in write()
290 writeableAnchor_
= NULL
;
295 Rock::IoState::close(int how
)
297 debugs(79, 3, swap_filen
<< " offset: " << offset_
<< " how: " << how
<<
298 " buf: " << theBuf
.size
<< " callback: " << callback
);
301 debugs(79, 3, "I/O already canceled");
303 assert(!writeableAnchor_
);
304 assert(!readableAnchor_
);
310 assert(theBuf
.size
> 0); // we never flush last bytes on our own
311 writeToDisk(-1); // flush last, yet unwritten slot to disk
312 return; // writeCompleted() will callBack()
315 assert(writeableAnchor_
);
316 dir
->writeError(swap_filen
); // abort a partially stored entry
317 finishedWriting(DISK_ERROR
);
326 /// close callback (STIOCB) dialer: breaks dependencies and
327 /// counts IOState concurrency level
328 class StoreIOStateCb
: public CallDialer
331 StoreIOStateCb(StoreIOState::STIOCB
*cb
, void *data
, int err
, const Rock::IoState::Pointer
&anSio
):
338 callback_data
= cbdataReference(data
);
341 StoreIOStateCb(const StoreIOStateCb
&cb
):
347 callback
= cb
.callback
;
348 callback_data
= cbdataReference(cb
.callback_data
);
351 virtual ~StoreIOStateCb() {
352 cbdataReferenceDone(callback_data
); // may be nil already
355 void dial(AsyncCall
&call
) {
357 if (cbdataReferenceValidDone(callback_data
, &cbd
) && callback
)
358 callback(cbd
, errflag
, sio
.getRaw());
361 bool canDial(AsyncCall
&call
) const {
362 return cbdataReferenceValid(callback_data
) && callback
;
365 virtual void print(std::ostream
&os
) const {
366 os
<< '(' << callback_data
<< ", err=" << errflag
<< ')';
370 StoreIOStateCb
&operator =(const StoreIOStateCb
&cb
); // not defined
372 StoreIOState::STIOCB
*callback
;
375 Rock::IoState::Pointer sio
;
379 Rock::IoState::callBack(int errflag
)
381 debugs(79,3, HERE
<< "errflag=" << errflag
);
384 AsyncCall::Pointer call
= asyncCall(79,3, "SomeIoStateCloseCb",
385 StoreIOStateCb(callback
, callback_data
, errflag
, this));
386 ScheduleCallHere(call
);
389 cbdataReferenceDone(callback_data
);