]> git.ipfire.org Git - thirdparty/squid.git/blame - src/fs/rock/RockIoState.cc
Boilerplate: update copyright blurbs on src/
[thirdparty/squid.git] / src / fs / rock / RockIoState.cc
CommitLineData
e2851fe7 1/*
bbc27441
AJ
2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
e2851fe7
AR
7 */
8
bbc27441
AJ
9/* DEBUG: section 79 Disk IO Routines */
10
f7f3304a 11#include "squid.h"
50dc81ec 12#include "base/TextException.h"
dcd84f80 13#include "CollapsedForwarding.h"
e2851fe7
AR
14#include "DiskIO/DiskIOModule.h"
15#include "DiskIO/DiskIOStrategy.h"
16#include "DiskIO/WriteRequest.h"
e2851fe7 17#include "fs/rock/RockIoRequests.h"
602d9612 18#include "fs/rock/RockIoState.h"
e2851fe7 19#include "fs/rock/RockSwapDir.h"
582c2af2 20#include "globals.h"
50dc81ec 21#include "Mem.h"
602d9612 22#include "MemObject.h"
50dc81ec 23#include "Parsing.h"
e6d2c263 24#include "Transients.h"
e2851fe7 25
50dc81ec 26Rock::IoState::IoState(Rock::SwapDir::Pointer &aDir,
9199139f
AR
27 StoreEntry *anEntry,
28 StoreIOState::STFNCB *cbFile,
29 StoreIOState::STIOCB *cbIo,
30 void *data):
50dc81ec
AR
31 readableAnchor_(NULL),
32 writeableAnchor_(NULL),
33 sidCurrent(-1),
93910d5c 34 dir(aDir),
50dc81ec
AR
35 slotSize(dir->slotSize),
36 objOffset(0),
37 theBuf(dir->slotSize)
e2851fe7
AR
38{
39 e = anEntry;
50dc81ec
AR
40 e->lock("rock I/O");
41 // anchor, swap_filen, and swap_dirn are set by the caller
e2851fe7
AR
42 file_callback = cbFile;
43 callback = cbIo;
44 callback_data = cbdataReference(data);
45 ++store_open_disk_fd; // TODO: use a dedicated counter?
46 //theFile is set by SwapDir because it depends on DiskIOStrategy
47}
48
49Rock::IoState::~IoState()
50{
51 --store_open_disk_fd;
50dc81ec
AR
52
53 // The dir map entry may still be open for reading at the point because
54 // the map entry lock is associated with StoreEntry, not IoState.
55 // assert(!readableAnchor_);
7ec749c6 56 assert(shutting_down || !writeableAnchor_);
50dc81ec 57
e2851fe7
AR
58 if (callback_data)
59 cbdataReferenceDone(callback_data);
60 theFile = NULL;
50dc81ec
AR
61
62 e->unlock("rock I/O");
e2851fe7
AR
63}
64
65void
66Rock::IoState::file(const RefCount<DiskFile> &aFile)
67{
68 assert(!theFile);
69 assert(aFile != NULL);
70 theFile = aFile;
71}
72
50dc81ec
AR
73const Ipc::StoreMapAnchor &
74Rock::IoState::readAnchor() const
75{
76 assert(readableAnchor_);
77 return *readableAnchor_;
78}
79
80Ipc::StoreMapAnchor &
81Rock::IoState::writeAnchor()
82{
83 assert(writeableAnchor_);
84 return *writeableAnchor_;
85}
86
87/// convenience wrapper returning the map slot we are reading now
88const Ipc::StoreMapSlice &
89Rock::IoState::currentReadableSlice() const
90{
91 return dir->map->readableSlice(swap_filen, sidCurrent);
92}
93
e2851fe7 94void
c728b6f9 95Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
e2851fe7 96{
50dc81ec
AR
97 debugs(79, 7, swap_filen << " reads from " << coreOff);
98
e2851fe7 99 assert(theFile != NULL);
c728b6f9 100 assert(coreOff >= 0);
c728b6f9 101
50dc81ec
AR
102 // if we are dealing with the first read or
103 // if the offset went backwords, start searching from the beginning
104 if (sidCurrent < 0 || coreOff < objOffset) {
105 sidCurrent = readAnchor().start;
93910d5c
AR
106 objOffset = 0;
107 }
108
5296bbd9 109 while (sidCurrent >= 0 && coreOff >= objOffset + currentReadableSlice().size) {
50dc81ec
AR
110 objOffset += currentReadableSlice().size;
111 sidCurrent = currentReadableSlice().next;
93910d5c 112 }
e2851fe7 113
e2851fe7
AR
114 assert(read.callback == NULL);
115 assert(read.callback_data == NULL);
116 read.callback = cb;
117 read.callback_data = cbdataReference(data);
118
5296bbd9
AR
119 // punt if read offset is too big (because of client bugs or collapsing)
120 if (sidCurrent < 0) {
121 debugs(79, 5, "no " << coreOff << " in " << *e);
122 callReaderBack(buf, 0);
123 return;
124 }
125
126 offset_ = coreOff;
127 len = min(len,
9d4e9cfb 128 static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
50dc81ec 129 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
93910d5c 130 theFile->read(new ReadRequest(::ReadRequest(buf,
9d4e9cfb 131 diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
e2851fe7
AR
132}
133
5296bbd9
AR
134void
135Rock::IoState::callReaderBack(const char *buf, int rlen)
136{
137 debugs(79, 5, rlen << " bytes for " << *e);
138 StoreIOState::STRCB *callb = read.callback;
139 assert(callb);
140 read.callback = NULL;
141 void *cbdata;
142 if (cbdataReferenceValidDone(read.callback_data, &cbdata))
143 callb(cbdata, buf, rlen, this);
144}
145
50dc81ec
AR
146/// wraps tryWrite() to handle deep write failures centrally and safely
147bool
c728b6f9 148Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
e2851fe7 149{
50dc81ec
AR
150 bool success = false;
151 try {
152 tryWrite(buf, size, coreOff);
153 success = true;
a57a662c
AR
154 } catch (const std::exception &ex) { // TODO: should we catch ... as well?
155 debugs(79, 2, "db write error: " << ex.what());
4475555f 156 dir->writeError(*e);
50dc81ec
AR
157 finishedWriting(DISK_ERROR);
158 // 'this' might be gone beyond this point; fall through to free buf
e2851fe7
AR
159 }
160
50dc81ec 161 // careful: 'this' might be gone here
9d4e9cfb 162
50dc81ec
AR
163 if (dtor)
164 (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
93910d5c 165
50dc81ec
AR
166 return success;
167}
93910d5c 168
e4d13993
AR
169/**
170 * Possibly send data to be written to disk:
171 * We only write data when full slot is accumulated or when close() is called.
172 * We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
9d4e9cfb 173 * the slot when the write does not end at the page or sector boundary.
e4d13993 174 */
50dc81ec
AR
175void
176Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
177{
178 debugs(79, 7, swap_filen << " writes " << size << " more");
93910d5c 179
50dc81ec
AR
180 // either this is the first write or append; we do not support write gaps
181 assert(!coreOff || coreOff == -1);
93910d5c 182
e4d13993 183 // allocate the first slice during the first write
50dc81ec
AR
184 if (!coreOff) {
185 assert(sidCurrent < 0);
186 sidCurrent = reserveSlotForWriting(); // throws on failures
187 assert(sidCurrent >= 0);
188 writeAnchor().start = sidCurrent;
93910d5c 189 }
e2851fe7 190
50dc81ec
AR
191 // buffer incoming data in slot buffer and write overflowing or final slots
192 // quit when no data left or we stopped writing on reentrant error
193 while (size > 0 && theFile != NULL) {
194 assert(sidCurrent >= 0);
195 const size_t processed = writeToBuffer(buf, size);
196 buf += processed;
197 size -= processed;
198 const bool overflow = size > 0;
199
200 // We do not write a full buffer without overflow because
201 // we would not yet know what to set the nextSlot to.
202 if (overflow) {
203 const SlotId sidNext = reserveSlotForWriting(); // throws
204 assert(sidNext >= 0);
205 writeToDisk(sidNext);
5296bbd9 206 } else if (Store::Root().transientReaders(*e)) {
e6d2c263 207 // write partial buffer for all remote hit readers to see
5296bbd9 208 writeBufToDisk(-1, false);
50dc81ec
AR
209 }
210 }
807feb1d 211
50dc81ec
AR
212}
213
214/// Buffers incoming data for the current slot.
e4d13993 215/// \return the number of bytes buffered
50dc81ec
AR
216size_t
217Rock::IoState::writeToBuffer(char const *buf, size_t size)
218{
219 // do not buffer a cell header for nothing
220 if (!size)
221 return 0;
222
223 if (!theBuf.size) {
224 // will fill the header in writeToDisk when the next slot is known
225 theBuf.appended(sizeof(DbCellHeader));
226 }
227
228 size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
229 theBuf.append(buf, forCurrentSlot);
230 offset_ += forCurrentSlot; // so that Core thinks we wrote it
231 return forCurrentSlot;
e2851fe7
AR
232}
233
50dc81ec
AR
234/// write what was buffered during write() calls
235/// negative sidNext means this is the last write request for this entry
e2851fe7 236void
50dc81ec 237Rock::IoState::writeToDisk(const SlotId sidNext)
e2851fe7
AR
238{
239 assert(theFile != NULL);
50dc81ec
AR
240 assert(theBuf.size >= sizeof(DbCellHeader));
241
fd23a2ca
AR
242 // TODO: if DiskIO module is mmap-based, we should be writing whole pages
243 // to avoid triggering read-page;new_head+old_tail;write-page overheads
244
5296bbd9
AR
245 writeBufToDisk(sidNext, sidNext < 0);
246 theBuf.clear();
247
248 sidCurrent = sidNext;
249}
250
251/// creates and submits a request to write current slot buffer to disk
252/// eof is true if and only this is the last slot
253void
254Rock::IoState::writeBufToDisk(const SlotId sidNext, bool eof)
255{
256 // no slots after the last/eof slot (but partial slots may have a nil next)
257 assert(!eof || sidNext < 0);
258
50dc81ec
AR
259 // finalize db cell header
260 DbCellHeader header;
261 memcpy(header.key, e->key, sizeof(header.key));
262 header.firstSlot = writeAnchor().start;
263 header.nextSlot = sidNext;
264 header.payloadSize = theBuf.size - sizeof(DbCellHeader);
1ffb8602 265 header.entrySize = eof ? offset_ : 0; // storeSwapOutFileClosed sets swap_file_sz after write
50dc81ec
AR
266 header.version = writeAnchor().basics.timestamp;
267
268 // copy finalized db cell header into buffer
269 memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
270
271 // and now allocate another buffer for the WriteRequest so that
272 // we can support concurrent WriteRequests (and to ease cleaning)
273 // TODO: should we limit the number of outstanding requests?
274 size_t wBufCap = 0;
275 void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
276 memcpy(wBuf, theBuf.mem, theBuf.size);
93910d5c 277
807feb1d
DK
278 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
279 debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
280 theBuf.size);
281
93910d5c 282 WriteRequest *const r = new WriteRequest(
50dc81ec 283 ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
9d4e9cfb 284 memFreeBufFunc(wBufCap)), this);
ce49546e
AR
285 r->sidCurrent = sidCurrent;
286 r->sidNext = sidNext;
5296bbd9 287 r->eof = eof;
50dc81ec
AR
288
289 // theFile->write may call writeCompleted immediatelly
93910d5c 290 theFile->write(r);
e2851fe7
AR
291}
292
50dc81ec
AR
293/// finds and returns a free db slot to fill or throws
294Rock::SlotId
295Rock::IoState::reserveSlotForWriting()
296{
297 Ipc::Mem::PageId pageId;
298 if (dir->useFreeSlot(pageId))
299 return pageId.number-1;
300
301 // This may happen when the number of available db slots is close to the
302 // number of concurrent requests reading or writing those slots, which may
303 // happen when the db is "small" compared to the request traffic OR when we
304 // are rebuilding and have not loaded "many" entries or empty slots yet.
305 throw TexcHere("ran out of free db slots");
306}
307
e2851fe7
AR
308void
309Rock::IoState::finishedWriting(const int errFlag)
310{
c728b6f9 311 // we incremented offset_ while accumulating data in write()
49769258 312 // we do not reset writeableAnchor_ here because we still keep the lock
2912daee 313 CollapsedForwarding::Broadcast(*e);
e2851fe7
AR
314 callBack(errFlag);
315}
316
317void
c728b6f9 318Rock::IoState::close(int how)
e2851fe7 319{
50dc81ec
AR
320 debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
321 " buf: " << theBuf.size << " callback: " << callback);
322
323 if (!theFile) {
324 debugs(79, 3, "I/O already canceled");
325 assert(!callback);
49769258 326 // We keep writeableAnchor_ after callBack() on I/O errors.
50dc81ec
AR
327 assert(!readableAnchor_);
328 return;
329 }
330
331 switch (how) {
332 case wroteAll:
333 assert(theBuf.size > 0); // we never flush last bytes on our own
334 writeToDisk(-1); // flush last, yet unwritten slot to disk
335 return; // writeCompleted() will callBack()
336
337 case writerGone:
338 assert(writeableAnchor_);
4475555f 339 dir->writeError(*e); // abort a partially stored entry
50dc81ec
AR
340 finishedWriting(DISK_ERROR);
341 return;
342
343 case readerDone:
344 callBack(0);
345 return;
346 }
e2851fe7
AR
347}
348
9199139f 349/// close callback (STIOCB) dialer: breaks dependencies and
e2851fe7 350/// counts IOState concurrency level
9199139f 351class StoreIOStateCb: public CallDialer
e2851fe7
AR
352{
353public:
354 StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio):
9199139f
AR
355 callback(NULL),
356 callback_data(NULL),
357 errflag(err),
358 sio(anSio) {
e2851fe7
AR
359
360 callback = cb;
361 callback_data = cbdataReference(data);
362 }
363
364 StoreIOStateCb(const StoreIOStateCb &cb):
9199139f
AR
365 callback(NULL),
366 callback_data(NULL),
367 errflag(cb.errflag),
368 sio(cb.sio) {
e2851fe7
AR
369
370 callback = cb.callback;
371 callback_data = cbdataReference(cb.callback_data);
372 }
373
374 virtual ~StoreIOStateCb() {
375 cbdataReferenceDone(callback_data); // may be nil already
376 }
377
378 void dial(AsyncCall &call) {
379 void *cbd;
380 if (cbdataReferenceValidDone(callback_data, &cbd) && callback)
381 callback(cbd, errflag, sio.getRaw());
382 }
383
384 bool canDial(AsyncCall &call) const {
385 return cbdataReferenceValid(callback_data) && callback;
386 }
387
388 virtual void print(std::ostream &os) const {
389 os << '(' << callback_data << ", err=" << errflag << ')';
390 }
391
392private:
393 StoreIOStateCb &operator =(const StoreIOStateCb &cb); // not defined
394
395 StoreIOState::STIOCB *callback;
396 void *callback_data;
397 int errflag;
398 Rock::IoState::Pointer sio;
399};
400
e2851fe7
AR
401void
402Rock::IoState::callBack(int errflag)
403{
404 debugs(79,3, HERE << "errflag=" << errflag);
405 theFile = NULL;
406
407 AsyncCall::Pointer call = asyncCall(79,3, "SomeIoStateCloseCb",
9199139f 408 StoreIOStateCb(callback, callback_data, errflag, this));
e2851fe7
AR
409 ScheduleCallHere(call);
410
411 callback = NULL;
412 cbdataReferenceDone(callback_data);
413}
414