]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockIoState.cc
7d5e8e44f2f61e91ba9461f324e11de2b1aca4fe
2 * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 79 Disk IO Routines */
12 #include "base/TextException.h"
13 #include "CollapsedForwarding.h"
14 #include "DiskIO/DiskIOModule.h"
15 #include "DiskIO/DiskIOStrategy.h"
16 #include "DiskIO/WriteRequest.h"
17 #include "fs/rock/RockIoRequests.h"
18 #include "fs/rock/RockIoState.h"
19 #include "fs/rock/RockSwapDir.h"
21 #include "MemObject.h"
23 #include "Transients.h"
25 Rock::IoState::IoState(Rock::SwapDir::Pointer
&aDir
,
27 StoreIOState::STFNCB
*cbFile
,
28 StoreIOState::STIOCB
*cbIo
,
30 StoreIOState(cbFile
, cbIo
, data
),
31 readableAnchor_(NULL
),
32 writeableAnchor_(NULL
),
34 staleSplicingPointNext(-1),
36 slotSize(dir
->slotSize
),
43 // anchor, swap_filen, and swap_dirn are set by the caller
44 ++store_open_disk_fd
; // TODO: use a dedicated counter?
45 //theFile is set by SwapDir because it depends on DiskIOStrategy
48 Rock::IoState::~IoState()
52 // The dir map entry may still be open for reading at the point because
53 // the map entry lock is associated with StoreEntry, not IoState.
54 // assert(!readableAnchor_);
55 assert(shutting_down
|| !writeableAnchor_
);
58 cbdataReferenceDone(callback_data
);
61 e
->unlock("rock I/O");
65 Rock::IoState::file(const RefCount
<DiskFile
> &aFile
)
68 assert(aFile
!= NULL
);
72 const Ipc::StoreMapAnchor
&
73 Rock::IoState::readAnchor() const
75 assert(readableAnchor_
);
76 return *readableAnchor_
;
80 Rock::IoState::writeAnchor()
82 assert(writeableAnchor_
);
83 return *writeableAnchor_
;
86 /// convenience wrapper returning the map slot we are reading now
87 const Ipc::StoreMapSlice
&
88 Rock::IoState::currentReadableSlice() const
90 return dir
->map
->readableSlice(swap_filen
, sidCurrent
);
94 Rock::IoState::read_(char *buf
, size_t len
, off_t coreOff
, STRCB
*cb
, void *data
)
96 debugs(79, 7, swap_filen
<< " reads from " << coreOff
);
98 assert(theFile
!= NULL
);
101 // if we are dealing with the first read or
102 // if the offset went backwords, start searching from the beginning
103 if (sidCurrent
< 0 || coreOff
< objOffset
) {
104 sidCurrent
= readAnchor().start
;
108 while (sidCurrent
>= 0 && coreOff
>= objOffset
+ currentReadableSlice().size
) {
109 objOffset
+= currentReadableSlice().size
;
110 sidCurrent
= currentReadableSlice().next
;
113 assert(read
.callback
== NULL
);
114 assert(read
.callback_data
== NULL
);
116 read
.callback_data
= cbdataReference(data
);
118 // punt if read offset is too big (because of client bugs or collapsing)
119 if (sidCurrent
< 0) {
120 debugs(79, 5, "no " << coreOff
<< " in " << *e
);
121 callReaderBack(buf
, 0);
127 static_cast<size_t>(objOffset
+ currentReadableSlice().size
- coreOff
));
128 const uint64_t diskOffset
= dir
->diskOffset(sidCurrent
);
129 theFile
->read(new ReadRequest(::ReadRequest(buf
,
130 diskOffset
+ sizeof(DbCellHeader
) + coreOff
- objOffset
, len
), this));
134 Rock::IoState::callReaderBack(const char *buf
, int rlen
)
136 debugs(79, 5, rlen
<< " bytes for " << *e
);
137 splicingPoint
= rlen
>= 0 ? sidCurrent
: -1;
138 if (splicingPoint
< 0)
139 staleSplicingPointNext
= -1;
141 staleSplicingPointNext
= currentReadableSlice().next
;
142 StoreIOState::STRCB
*callb
= read
.callback
;
144 read
.callback
= NULL
;
146 if (cbdataReferenceValidDone(read
.callback_data
, &cbdata
))
147 callb(cbdata
, buf
, rlen
, this);
150 /// wraps tryWrite() to handle deep write failures centrally and safely
152 Rock::IoState::write(char const *buf
, size_t size
, off_t coreOff
, FREE
*dtor
)
154 bool success
= false;
156 tryWrite(buf
, size
, coreOff
);
158 } catch (const std::exception
&ex
) { // TODO: should we catch ... as well?
159 debugs(79, 2, "db write error: " << ex
.what());
160 dir
->writeError(*this);
161 finishedWriting(DISK_ERROR
);
162 // 'this' might be gone beyond this point; fall through to free buf
165 // careful: 'this' might be gone here
168 (dtor
)(const_cast<char*>(buf
)); // cast due to a broken API?
174 * Possibly send data to be written to disk:
175 * We only write data when full slot is accumulated or when close() is called.
176 * We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
177 * the slot when the write does not end at the page or sector boundary.
180 Rock::IoState::tryWrite(char const *buf
, size_t size
, off_t coreOff
)
182 debugs(79, 7, swap_filen
<< " writes " << size
<< " more");
184 // either this is the first write or append; we do not support write gaps
185 assert(!coreOff
|| coreOff
== -1);
187 // throw if an accepted unknown-size entry grew too big or max-size changed
188 Must(static_cast<uint64_t>(offset_
+ size
) <= static_cast<uint64_t>(dir
->maxObjectSize()));
190 // allocate the first slice during the first write
192 assert(sidCurrent
< 0);
193 sidCurrent
= reserveSlotForWriting(); // throws on failures
194 assert(sidCurrent
>= 0);
195 writeAnchor().start
= sidCurrent
;
198 // buffer incoming data in slot buffer and write overflowing or final slots
199 // quit when no data left or we stopped writing on reentrant error
200 while (size
> 0 && theFile
!= NULL
) {
201 assert(sidCurrent
>= 0);
202 const size_t processed
= writeToBuffer(buf
, size
);
205 const bool overflow
= size
> 0;
207 // We do not write a full buffer without overflow because
208 // we would not yet know what to set the nextSlot to.
210 const SlotId sidNext
= reserveSlotForWriting(); // throws
211 assert(sidNext
>= 0);
212 writeToDisk(sidNext
);
213 } else if (Store::Root().transientReaders(*e
)) {
214 // write partial buffer for all remote hit readers to see
215 writeBufToDisk(-1, false, false);
221 /// Buffers incoming data for the current slot.
222 /// \return the number of bytes buffered
224 Rock::IoState::writeToBuffer(char const *buf
, size_t size
)
226 // do not buffer a cell header for nothing
231 // will fill the header in writeToDisk when the next slot is known
232 theBuf
.appended(sizeof(DbCellHeader
));
235 size_t forCurrentSlot
= min(size
, static_cast<size_t>(theBuf
.spaceSize()));
236 theBuf
.append(buf
, forCurrentSlot
);
237 offset_
+= forCurrentSlot
; // so that Core thinks we wrote it
238 return forCurrentSlot
;
241 /// write what was buffered during write() calls
242 /// negative sidNext means this is the last write request for this entry
244 Rock::IoState::writeToDisk(const SlotId sidNextProposal
)
246 assert(theFile
!= NULL
);
247 assert(theBuf
.size
>= sizeof(DbCellHeader
));
249 const bool lastWrite
= sidNextProposal
< 0;
250 const bool eof
= lastWrite
&&
251 // either not updating or the updating reader has loaded everything
252 (touchingStoreEntry() || staleSplicingPointNext
< 0);
253 // approve sidNextProposal unless _updating_ the last slot
254 const SlotId sidNext
= (!touchingStoreEntry() && lastWrite
) ?
255 staleSplicingPointNext
: sidNextProposal
;
256 debugs(79, 5, "sidNext:" << sidNextProposal
<< "=>" << sidNext
<< " eof=" << eof
);
258 // TODO: if DiskIO module is mmap-based, we should be writing whole pages
259 // to avoid triggering read-page;new_head+old_tail;write-page overheads
261 writeBufToDisk(sidNext
, eof
, lastWrite
);
264 sidCurrent
= sidNext
;
267 /// creates and submits a request to write current slot buffer to disk
268 /// eof is true if and only this is the last slot
270 Rock::IoState::writeBufToDisk(const SlotId sidNext
, const bool eof
, const bool lastWrite
)
272 // no slots after the last/eof slot (but partial slots may have a nil next)
273 assert(!eof
|| sidNext
< 0);
275 // finalize db cell header
277 memcpy(header
.key
, e
->key
, sizeof(header
.key
));
278 header
.firstSlot
= writeAnchor().start
;
279 header
.nextSlot
= sidNext
;
280 header
.payloadSize
= theBuf
.size
- sizeof(DbCellHeader
);
281 header
.entrySize
= eof
? offset_
: 0; // storeSwapOutFileClosed sets swap_file_sz after write
282 header
.version
= writeAnchor().basics
.timestamp
;
284 // copy finalized db cell header into buffer
285 memcpy(theBuf
.mem
, &header
, sizeof(DbCellHeader
));
287 // and now allocate another buffer for the WriteRequest so that
288 // we can support concurrent WriteRequests (and to ease cleaning)
289 // TODO: should we limit the number of outstanding requests?
291 void *wBuf
= memAllocBuf(theBuf
.size
, &wBufCap
);
292 memcpy(wBuf
, theBuf
.mem
, theBuf
.size
);
294 const uint64_t diskOffset
= dir
->diskOffset(sidCurrent
);
295 debugs(79, 5, HERE
<< swap_filen
<< " at " << diskOffset
<< '+' <<
298 WriteRequest
*const r
= new WriteRequest(
299 ::WriteRequest(static_cast<char*>(wBuf
), diskOffset
, theBuf
.size
,
300 memFreeBufFunc(wBufCap
)), this);
301 r
->sidCurrent
= sidCurrent
;
302 r
->sidNext
= sidNext
;
305 // theFile->write may call writeCompleted immediatelly
309 /// finds and returns a free db slot to fill or throws
311 Rock::IoState::reserveSlotForWriting()
313 Ipc::Mem::PageId pageId
;
314 if (dir
->useFreeSlot(pageId
))
315 return pageId
.number
-1;
317 // This may happen when the number of available db slots is close to the
318 // number of concurrent requests reading or writing those slots, which may
319 // happen when the db is "small" compared to the request traffic OR when we
320 // are rebuilding and have not loaded "many" entries or empty slots yet.
321 throw TexcHere("ran out of free db slots");
325 Rock::IoState::finishedWriting(const int errFlag
)
327 // we incremented offset_ while accumulating data in write()
328 // we do not reset writeableAnchor_ here because we still keep the lock
329 if (touchingStoreEntry())
330 CollapsedForwarding::Broadcast(*e
);
335 Rock::IoState::close(int how
)
337 debugs(79, 3, swap_filen
<< " offset: " << offset_
<< " how: " << how
<<
338 " buf: " << theBuf
.size
<< " callback: " << callback
);
341 debugs(79, 3, "I/O already canceled");
343 // We keep writeableAnchor_ after callBack() on I/O errors.
344 assert(!readableAnchor_
);
350 assert(theBuf
.size
> 0); // we never flush last bytes on our own
351 writeToDisk(-1); // flush last, yet unwritten slot to disk
352 return; // writeCompleted() will callBack()
355 dir
->writeError(*this); // abort a partially stored entry
356 finishedWriting(DISK_ERROR
);
365 /// close callback (STIOCB) dialer: breaks dependencies and
366 /// counts IOState concurrency level
367 class StoreIOStateCb
: public CallDialer
370 StoreIOStateCb(StoreIOState::STIOCB
*cb
, void *data
, int err
, const Rock::IoState::Pointer
&anSio
):
377 callback_data
= cbdataReference(data
);
380 StoreIOStateCb(const StoreIOStateCb
&cb
):
386 callback
= cb
.callback
;
387 callback_data
= cbdataReference(cb
.callback_data
);
390 virtual ~StoreIOStateCb() {
391 cbdataReferenceDone(callback_data
); // may be nil already
394 void dial(AsyncCall
&) {
396 if (cbdataReferenceValidDone(callback_data
, &cbd
) && callback
)
397 callback(cbd
, errflag
, sio
.getRaw());
400 bool canDial(AsyncCall
&) const {
401 return cbdataReferenceValid(callback_data
) && callback
;
404 virtual void print(std::ostream
&os
) const {
405 os
<< '(' << callback_data
<< ", err=" << errflag
<< ')';
409 StoreIOStateCb
&operator =(const StoreIOStateCb
&); // not defined
411 StoreIOState::STIOCB
*callback
;
414 Rock::IoState::Pointer sio
;
418 Rock::IoState::callBack(int errflag
)
420 debugs(79,3, HERE
<< "errflag=" << errflag
);
423 AsyncCall::Pointer call
= asyncCall(79,3, "SomeIoStateCloseCb",
424 StoreIOStateCb(callback
, callback_data
, errflag
, this));
425 ScheduleCallHere(call
);
428 cbdataReferenceDone(callback_data
);