]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/fs/rock/RockHeaderUpdater.cc
2 * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
10 #include "base/AsyncJobCalls.h"
12 #include "fs/rock/RockHeaderUpdater.h"
13 #include "fs/rock/RockIoState.h"
14 #include "mime_header.h"
16 #include "StoreMetaUnpacker.h"
18 CBDATA_NAMESPACED_CLASS_INIT(Rock
, HeaderUpdater
);
20 Rock::HeaderUpdater::HeaderUpdater(const Rock::SwapDir::Pointer
&aStore
, const Ipc::StoreMapUpdate
&anUpdate
):
21 AsyncJob("Rock::HeaderUpdater"),
27 staleSwapHeaderSize(0),
28 staleSplicingPointNext(-1)
30 // TODO: Consider limiting the number of concurrent store updates.
34 Rock::HeaderUpdater::doneAll() const
36 return !reader
&& !writer
&& AsyncJob::doneAll();
40 Rock::HeaderUpdater::swanSong()
42 if (update
.stale
|| update
.fresh
)
43 store
->map
->abortUpdating(update
);
46 reader
->close(StoreIOState::readerDone
);
51 writer
->close(StoreIOState::writerGone
);
52 // Emulate SwapDir::disconnect() that writeCompleted(err) hopes for.
53 // Also required to avoid IoState destructor assertions.
54 // We can do this because we closed update earlier or aborted it above.
55 dynamic_cast<IoState
&>(*writer
).writeableAnchor_
= nullptr;
63 Rock::HeaderUpdater::start()
72 Rock::HeaderUpdater::startReading()
74 reader
= store
->openStoreIO(
76 nullptr, // unused; see StoreIOState::file_callback
79 readMore("need swap entry metadata");
83 Rock::HeaderUpdater::stopReading(const char *why
)
88 const IoState
&rockReader
= dynamic_cast<IoState
&>(*reader
);
89 update
.stale
.splicingPoint
= rockReader
.splicingPoint
;
90 staleSplicingPointNext
= rockReader
.staleSplicingPointNext
;
91 debugs(47, 5, "stale chain ends at " << update
.stale
.splicingPoint
<<
92 " body continues at " << staleSplicingPointNext
);
94 reader
->close(StoreIOState::readerDone
); // calls noteDoneReading(0)
95 reader
= nullptr; // so that swanSong() does not try to close again
99 Rock::HeaderUpdater::NoteRead(void *data
, const char *buf
, ssize_t result
, StoreIOState::Pointer
)
101 IoCbParams
io(buf
, result
);
102 // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
104 CbcPointer
<HeaderUpdater
>(static_cast<HeaderUpdater
*>(data
)),
111 Rock::HeaderUpdater::noteRead(const Rock::HeaderUpdater::IoCbParams result
)
113 debugs(47, 7, result
.size
);
114 if (!result
.size
) { // EOF
117 Must(result
.size
> 0);
118 bytesRead
+= result
.size
;
119 readerBuffer
.rawAppendFinish(result
.buf
, result
.size
);
120 exchangeBuffer
.append(readerBuffer
);
121 debugs(47, 7, "accumulated " << exchangeBuffer
.length());
128 Rock::HeaderUpdater::readMore(const char *why
)
130 debugs(47, 7, "from " << bytesRead
<< " because " << why
);
132 readerBuffer
.clear();
134 readerBuffer
.rawAppendStart(store
->slotSize
),
142 Rock::HeaderUpdater::NoteDoneReading(void *data
, int errflag
, StoreIOState::Pointer
)
144 // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
146 CbcPointer
<HeaderUpdater
>(static_cast<HeaderUpdater
*>(data
)),
153 Rock::HeaderUpdater::noteDoneReading(int errflag
)
155 debugs(47, 5, errflag
<< " writer=" << writer
);
157 Must(!errflag
); // we only initiate successful closures
158 Must(writer
); // otherwise we would be done() and would not be called
160 reader
= nullptr; // we are done reading
161 Must(errflag
); // any external closures ought to be errors
162 mustStop("read error");
167 Rock::HeaderUpdater::startWriting()
169 writer
= store
->createUpdateIO(
171 nullptr, // unused; see StoreIOState::file_callback
176 IoState
&rockWriter
= dynamic_cast<IoState
&>(*writer
);
177 rockWriter
.staleSplicingPointNext
= staleSplicingPointNext
;
179 // here, prefix is swap header plus HTTP reply header (i.e., updated bytes)
180 uint64_t stalePrefixSz
= 0;
181 uint64_t freshPrefixSz
= 0;
183 off_t offset
= 0; // current writing offset (for debugging)
185 const auto &mem
= update
.entry
->mem();
188 debugs(20, 7, "fresh store meta for " << *update
.entry
);
189 size_t freshSwapHeaderSize
= 0; // set by getSerialisedMetaData() below
191 // There is a circular dependency between the correct/fresh value of
192 // entry->swap_file_sz and freshSwapHeaderSize. We break that loop by
193 // serializing zero swap_file_sz, just like the regular first-time
194 // swapout code may do. De-serializing code will re-calculate it in
195 // storeRebuildParseEntry(). TODO: Stop serializing entry->swap_file_sz.
196 const auto savedEntrySwapFileSize
= update
.entry
->swap_file_sz
;
197 update
.entry
->swap_file_sz
= 0;
198 const auto freshSwapHeader
= update
.entry
->getSerialisedMetaData(freshSwapHeaderSize
);
199 update
.entry
->swap_file_sz
= savedEntrySwapFileSize
;
201 Must(freshSwapHeader
);
202 writer
->write(freshSwapHeader
, freshSwapHeaderSize
, 0, nullptr);
203 stalePrefixSz
+= mem
.swap_hdr_sz
;
204 freshPrefixSz
+= freshSwapHeaderSize
;
205 offset
+= freshSwapHeaderSize
;
206 xfree(freshSwapHeader
);
210 debugs(20, 7, "fresh HTTP header @ " << offset
);
211 const auto httpHeader
= mem
.freshestReply().pack();
212 writer
->write(httpHeader
->content(), httpHeader
->contentSize(), -1, nullptr);
213 const auto &staleReply
= mem
.baseReply();
214 Must(staleReply
.hdr_sz
>= 0); // for int-to-uint64_t conversion below
215 Must(staleReply
.hdr_sz
> 0); // already initialized
216 stalePrefixSz
+= staleReply
.hdr_sz
;
217 freshPrefixSz
+= httpHeader
->contentSize();
218 offset
+= httpHeader
->contentSize();
223 debugs(20, 7, "moved HTTP body prefix @ " << offset
);
224 writer
->write(exchangeBuffer
.rawContent(), exchangeBuffer
.length(), -1, nullptr);
225 offset
+= exchangeBuffer
.length();
226 exchangeBuffer
.clear();
229 debugs(20, 7, "wrote " << offset
<<
230 "; swap_file_sz delta: -" << stalePrefixSz
<< " +" << freshPrefixSz
);
232 // Optimistic early update OK: Our write lock blocks access to swap_file_sz.
233 auto &swap_file_sz
= update
.fresh
.anchor
->basics
.swap_file_sz
;
234 Must(swap_file_sz
>= stalePrefixSz
);
235 swap_file_sz
-= stalePrefixSz
;
236 swap_file_sz
+= freshPrefixSz
;
238 writer
->close(StoreIOState::wroteAll
); // should call noteDoneWriting()
242 Rock::HeaderUpdater::NoteDoneWriting(void *data
, int errflag
, StoreIOState::Pointer
)
245 CbcPointer
<HeaderUpdater
>(static_cast<HeaderUpdater
*>(data
)),
252 Rock::HeaderUpdater::noteDoneWriting(int errflag
)
254 debugs(47, 5, errflag
<< " reader=" << reader
);
256 Must(!reader
); // if we wrote everything, then we must have read everything
259 IoState
&rockWriter
= dynamic_cast<IoState
&>(*writer
);
260 update
.fresh
.splicingPoint
= rockWriter
.splicingPoint
;
261 debugs(47, 5, "fresh chain ends at " << update
.fresh
.splicingPoint
);
262 store
->map
->closeForUpdating(update
);
263 rockWriter
.writeableAnchor_
= nullptr;
264 writer
= nullptr; // we are done writing
270 Rock::HeaderUpdater::parseReadBytes()
272 if (!staleSwapHeaderSize
) {
273 StoreMetaUnpacker
aBuilder(
274 exchangeBuffer
.rawContent(),
275 exchangeBuffer
.length(),
276 &staleSwapHeaderSize
);
277 // Squid assumes that metadata always fits into a single db slot
278 aBuilder
.checkBuffer(); // cannot update an entry with invalid metadata
279 debugs(47, 7, "staleSwapHeaderSize=" << staleSwapHeaderSize
);
280 Must(staleSwapHeaderSize
> 0);
281 exchangeBuffer
.consume(staleSwapHeaderSize
);
284 const size_t staleHttpHeaderSize
= headersEnd(
285 exchangeBuffer
.rawContent(),
286 exchangeBuffer
.length());
287 debugs(47, 7, "staleHttpHeaderSize=" << staleHttpHeaderSize
);
288 if (!staleHttpHeaderSize
) {
289 readMore("need more stale HTTP reply header data");
293 exchangeBuffer
.consume(staleHttpHeaderSize
);
294 debugs(47, 7, "httpBodySizePrefix=" << exchangeBuffer
.length());
296 stopReading("read the last HTTP header slot");