]> git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockIoState.cc
Made some of the Rock collapsing behavior dependent on collapsed_forwarding.
[thirdparty/squid.git] / src / fs / rock / RockIoState.cc
1 /*
2 * DEBUG: section 79 Disk IO Routines
3 */
4
5 #include "squid.h"
6 #include "base/TextException.h"
7 #include "DiskIO/DiskIOModule.h"
8 #include "DiskIO/DiskIOStrategy.h"
9 #include "DiskIO/WriteRequest.h"
10 #include "fs/rock/RockIoState.h"
11 #include "fs/rock/RockIoRequests.h"
12 #include "fs/rock/RockSwapDir.h"
13 #include "globals.h"
14 #include "MemObject.h"
15 #include "Mem.h"
16 #include "Parsing.h"
17
18 Rock::IoState::IoState(Rock::SwapDir::Pointer &aDir,
19 StoreEntry *anEntry,
20 StoreIOState::STFNCB *cbFile,
21 StoreIOState::STIOCB *cbIo,
22 void *data):
23 readableAnchor_(NULL),
24 writeableAnchor_(NULL),
25 sidCurrent(-1),
26 dir(aDir),
27 slotSize(dir->slotSize),
28 objOffset(0),
29 theBuf(dir->slotSize)
30 {
31 e = anEntry;
32 e->lock("rock I/O");
33 // anchor, swap_filen, and swap_dirn are set by the caller
34 file_callback = cbFile;
35 callback = cbIo;
36 callback_data = cbdataReference(data);
37 ++store_open_disk_fd; // TODO: use a dedicated counter?
38 //theFile is set by SwapDir because it depends on DiskIOStrategy
39 }
40
41 Rock::IoState::~IoState()
42 {
43 --store_open_disk_fd;
44
45 // The dir map entry may still be open for reading at the point because
46 // the map entry lock is associated with StoreEntry, not IoState.
47 // assert(!readableAnchor_);
48 assert(!writeableAnchor_);
49
50 if (callback_data)
51 cbdataReferenceDone(callback_data);
52 theFile = NULL;
53
54 e->unlock("rock I/O");
55 }
56
57 void
58 Rock::IoState::file(const RefCount<DiskFile> &aFile)
59 {
60 assert(!theFile);
61 assert(aFile != NULL);
62 theFile = aFile;
63 }
64
65 const Ipc::StoreMapAnchor &
66 Rock::IoState::readAnchor() const
67 {
68 assert(readableAnchor_);
69 return *readableAnchor_;
70 }
71
72 Ipc::StoreMapAnchor &
73 Rock::IoState::writeAnchor()
74 {
75 assert(writeableAnchor_);
76 return *writeableAnchor_;
77 }
78
79 /// convenience wrapper returning the map slot we are reading now
80 const Ipc::StoreMapSlice &
81 Rock::IoState::currentReadableSlice() const
82 {
83 return dir->map->readableSlice(swap_filen, sidCurrent);
84 }
85
86 void
87 Rock::IoState::read_(char *buf, size_t len, off_t coreOff, STRCB *cb, void *data)
88 {
89 debugs(79, 7, swap_filen << " reads from " << coreOff);
90
91 assert(theFile != NULL);
92 assert(coreOff >= 0);
93
94 // if we are dealing with the first read or
95 // if the offset went backwords, start searching from the beginning
96 if (sidCurrent < 0 || coreOff < objOffset) {
97 sidCurrent = readAnchor().start;
98 objOffset = 0;
99 }
100
101 while (coreOff >= objOffset + currentReadableSlice().size) {
102 objOffset += currentReadableSlice().size;
103 sidCurrent = currentReadableSlice().next;
104 assert(sidCurrent >= 0); // XXX: handle "read offset too big" error
105 }
106
107 offset_ = coreOff;
108 len = min(len,
109 static_cast<size_t>(objOffset + currentReadableSlice().size - coreOff));
110
111 assert(read.callback == NULL);
112 assert(read.callback_data == NULL);
113 read.callback = cb;
114 read.callback_data = cbdataReference(data);
115
116 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
117 theFile->read(new ReadRequest(::ReadRequest(buf,
118 diskOffset + sizeof(DbCellHeader) + coreOff - objOffset, len), this));
119 }
120
121 /// wraps tryWrite() to handle deep write failures centrally and safely
122 bool
123 Rock::IoState::write(char const *buf, size_t size, off_t coreOff, FREE *dtor)
124 {
125 bool success = false;
126 try {
127 tryWrite(buf, size, coreOff);
128 success = true;
129 } catch (const std::exception &ex) { // TODO: should we catch ... as well?
130 debugs(79, 2, "db write error: " << ex.what());
131 dir->writeError(swap_filen);
132 finishedWriting(DISK_ERROR);
133 // 'this' might be gone beyond this point; fall through to free buf
134 }
135
136 // careful: 'this' might be gone here
137
138 if (dtor)
139 (dtor)(const_cast<char*>(buf)); // cast due to a broken API?
140
141 return success;
142 }
143
144 /** We only write data when full slot is accumulated or when close() is called.
145 We buffer, in part, to avoid forcing OS to _read_ old unwritten portions of
146 the slot when the write does not end at the page or sector boundary. */
147 void
148 Rock::IoState::tryWrite(char const *buf, size_t size, off_t coreOff)
149 {
150 debugs(79, 7, swap_filen << " writes " << size << " more");
151
152 // either this is the first write or append; we do not support write gaps
153 assert(!coreOff || coreOff == -1);
154
155 // allocate the first slice diring the first write
156 if (!coreOff) {
157 assert(sidCurrent < 0);
158 sidCurrent = reserveSlotForWriting(); // throws on failures
159 assert(sidCurrent >= 0);
160 writeAnchor().start = sidCurrent;
161 }
162
163 // buffer incoming data in slot buffer and write overflowing or final slots
164 // quit when no data left or we stopped writing on reentrant error
165 while (size > 0 && theFile != NULL) {
166 assert(sidCurrent >= 0);
167 const size_t processed = writeToBuffer(buf, size);
168 buf += processed;
169 size -= processed;
170 const bool overflow = size > 0;
171
172 // We do not write a full buffer without overflow because
173 // we would not yet know what to set the nextSlot to.
174 if (overflow) {
175 const SlotId sidNext = reserveSlotForWriting(); // throws
176 assert(sidNext >= 0);
177 writeToDisk(sidNext);
178 } else if (Config.onoff.collapsed_forwarding) {
179 // write partial buffer for all collapsed hit readers to see
180 // XXX: can we check that this is needed w/o stalling readers
181 // that appear right after our check?
182 writeBufToDisk(false);
183 }
184 }
185
186 }
187
188 /// Buffers incoming data for the current slot.
189 /// Returns the number of bytes buffered.
190 size_t
191 Rock::IoState::writeToBuffer(char const *buf, size_t size)
192 {
193 // do not buffer a cell header for nothing
194 if (!size)
195 return 0;
196
197 if (!theBuf.size) {
198 // will fill the header in writeToDisk when the next slot is known
199 theBuf.appended(sizeof(DbCellHeader));
200 }
201
202 size_t forCurrentSlot = min(size, static_cast<size_t>(theBuf.spaceSize()));
203 theBuf.append(buf, forCurrentSlot);
204 offset_ += forCurrentSlot; // so that Core thinks we wrote it
205 return forCurrentSlot;
206 }
207
208 /// write what was buffered during write() calls
209 /// negative sidNext means this is the last write request for this entry
210 void
211 Rock::IoState::writeToDisk(const SlotId sidNext)
212 {
213 assert(theFile != NULL);
214 assert(theBuf.size >= sizeof(DbCellHeader));
215
216 if (sidNext < 0) { // we are writing the last slot
217 e->swap_file_sz = offset_;
218 writeAnchor().basics.swap_file_sz = offset_; // would not hurt, right?
219 }
220
221 // TODO: if DiskIO module is mmap-based, we should be writing whole pages
222 // to avoid triggering read-page;new_head+old_tail;write-page overheads
223
224 // finalize map slice
225 Ipc::StoreMap::Slice &slice =
226 dir->map->writeableSlice(swap_filen, sidCurrent);
227 slice.next = sidNext;
228 slice.size = theBuf.size - sizeof(DbCellHeader);
229
230 // finalize db cell header
231 DbCellHeader header;
232 memcpy(header.key, e->key, sizeof(header.key));
233 header.firstSlot = writeAnchor().start;
234 header.nextSlot = sidNext;
235 header.payloadSize = theBuf.size - sizeof(DbCellHeader);
236 header.entrySize = e->swap_file_sz; // may still be zero unless sidNext < 0
237 header.version = writeAnchor().basics.timestamp;
238
239 // copy finalized db cell header into buffer
240 memcpy(theBuf.mem, &header, sizeof(DbCellHeader));
241
242 writeBufToDisk(sidNext < 0);
243 theBuf.clear();
244
245 sidCurrent = sidNext;
246 }
247
248 /// Write header-less (XXX) or complete buffer to disk.
249 void
250 Rock::IoState::writeBufToDisk(const bool last)
251 {
252 // and now allocate another buffer for the WriteRequest so that
253 // we can support concurrent WriteRequests (and to ease cleaning)
254 // TODO: should we limit the number of outstanding requests?
255 size_t wBufCap = 0;
256 void *wBuf = memAllocBuf(theBuf.size, &wBufCap);
257 memcpy(wBuf, theBuf.mem, theBuf.size);
258
259 const uint64_t diskOffset = dir->diskOffset(sidCurrent);
260 debugs(79, 5, HERE << swap_filen << " at " << diskOffset << '+' <<
261 theBuf.size);
262
263 WriteRequest *const r = new WriteRequest(
264 ::WriteRequest(static_cast<char*>(wBuf), diskOffset, theBuf.size,
265 memFreeBufFunc(wBufCap)), this, last);
266
267 // theFile->write may call writeCompleted immediatelly
268 theFile->write(r);
269 }
270
271 /// finds and returns a free db slot to fill or throws
272 Rock::SlotId
273 Rock::IoState::reserveSlotForWriting()
274 {
275 Ipc::Mem::PageId pageId;
276 if (dir->useFreeSlot(pageId))
277 return pageId.number-1;
278
279 // This may happen when the number of available db slots is close to the
280 // number of concurrent requests reading or writing those slots, which may
281 // happen when the db is "small" compared to the request traffic OR when we
282 // are rebuilding and have not loaded "many" entries or empty slots yet.
283 throw TexcHere("ran out of free db slots");
284 }
285
286 void
287 Rock::IoState::finishedWriting(const int errFlag)
288 {
289 // we incremented offset_ while accumulating data in write()
290 writeableAnchor_ = NULL;
291 callBack(errFlag);
292 }
293
294 void
295 Rock::IoState::close(int how)
296 {
297 debugs(79, 3, swap_filen << " offset: " << offset_ << " how: " << how <<
298 " buf: " << theBuf.size << " callback: " << callback);
299
300 if (!theFile) {
301 debugs(79, 3, "I/O already canceled");
302 assert(!callback);
303 assert(!writeableAnchor_);
304 assert(!readableAnchor_);
305 return;
306 }
307
308 switch (how) {
309 case wroteAll:
310 assert(theBuf.size > 0); // we never flush last bytes on our own
311 writeToDisk(-1); // flush last, yet unwritten slot to disk
312 return; // writeCompleted() will callBack()
313
314 case writerGone:
315 assert(writeableAnchor_);
316 dir->writeError(swap_filen); // abort a partially stored entry
317 finishedWriting(DISK_ERROR);
318 return;
319
320 case readerDone:
321 callBack(0);
322 return;
323 }
324 }
325
326 /// close callback (STIOCB) dialer: breaks dependencies and
327 /// counts IOState concurrency level
328 class StoreIOStateCb: public CallDialer
329 {
330 public:
331 StoreIOStateCb(StoreIOState::STIOCB *cb, void *data, int err, const Rock::IoState::Pointer &anSio):
332 callback(NULL),
333 callback_data(NULL),
334 errflag(err),
335 sio(anSio) {
336
337 callback = cb;
338 callback_data = cbdataReference(data);
339 }
340
341 StoreIOStateCb(const StoreIOStateCb &cb):
342 callback(NULL),
343 callback_data(NULL),
344 errflag(cb.errflag),
345 sio(cb.sio) {
346
347 callback = cb.callback;
348 callback_data = cbdataReference(cb.callback_data);
349 }
350
351 virtual ~StoreIOStateCb() {
352 cbdataReferenceDone(callback_data); // may be nil already
353 }
354
355 void dial(AsyncCall &call) {
356 void *cbd;
357 if (cbdataReferenceValidDone(callback_data, &cbd) && callback)
358 callback(cbd, errflag, sio.getRaw());
359 }
360
361 bool canDial(AsyncCall &call) const {
362 return cbdataReferenceValid(callback_data) && callback;
363 }
364
365 virtual void print(std::ostream &os) const {
366 os << '(' << callback_data << ", err=" << errflag << ')';
367 }
368
369 private:
370 StoreIOStateCb &operator =(const StoreIOStateCb &cb); // not defined
371
372 StoreIOState::STIOCB *callback;
373 void *callback_data;
374 int errflag;
375 Rock::IoState::Pointer sio;
376 };
377
378 void
379 Rock::IoState::callBack(int errflag)
380 {
381 debugs(79,3, HERE << "errflag=" << errflag);
382 theFile = NULL;
383
384 AsyncCall::Pointer call = asyncCall(79,3, "SomeIoStateCloseCb",
385 StoreIOStateCb(callback, callback_data, errflag, this));
386 ScheduleCallHere(call);
387
388 callback = NULL;
389 cbdataReferenceDone(callback_data);
390 }
391