]> git.ipfire.org Git - thirdparty/squid.git/blame - src/fs/rock/RockHeaderUpdater.cc
Source Format Enforcement (#532)
[thirdparty/squid.git] / src / fs / rock / RockHeaderUpdater.cc
CommitLineData
abf396ec 1/*
77b1029d 2 * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
abf396ec
AR
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9#include "squid.h"
10#include "base/AsyncJobCalls.h"
11#include "Debug.h"
12#include "fs/rock/RockHeaderUpdater.h"
13#include "fs/rock/RockIoState.h"
14#include "mime_header.h"
15#include "Store.h"
16#include "StoreMetaUnpacker.h"
17
18CBDATA_NAMESPACED_CLASS_INIT(Rock, HeaderUpdater);
19
20Rock::HeaderUpdater::HeaderUpdater(const Rock::SwapDir::Pointer &aStore, const Ipc::StoreMapUpdate &anUpdate):
21 AsyncJob("Rock::HeaderUpdater"),
22 store(aStore),
23 update(anUpdate),
24 reader(),
25 writer(),
26 bytesRead(0),
27 staleSwapHeaderSize(0),
28 staleSplicingPointNext(-1)
29{
30 // TODO: Consider limiting the number of concurrent store updates.
31}
32
33bool
34Rock::HeaderUpdater::doneAll() const
35{
36 return !reader && !writer && AsyncJob::doneAll();
37}
38
39void
40Rock::HeaderUpdater::swanSong()
41{
42 if (update.stale || update.fresh)
43 store->map->abortUpdating(update);
44
45 if (reader) {
46 reader->close(StoreIOState::readerDone);
47 reader = nullptr;
48 }
49
50 if (writer) {
51 writer->close(StoreIOState::writerGone);
52 // Emulate SwapDir::disconnect() that writeCompleted(err) hopes for.
53 // Also required to avoid IoState destructor assertions.
54 // We can do this because we closed update earlier or aborted it above.
55 dynamic_cast<IoState&>(*writer).writeableAnchor_ = nullptr;
56 writer = nullptr;
57 }
58
59 AsyncJob::swanSong();
60}
61
62void
63Rock::HeaderUpdater::start()
64{
65 Must(update.entry);
66 Must(update.stale);
67 Must(update.fresh);
68 startReading();
69}
70
71void
72Rock::HeaderUpdater::startReading()
73{
74 reader = store->openStoreIO(
75 *update.entry,
76 nullptr, // unused; see StoreIOState::file_callback
77 &NoteDoneReading,
78 this);
79 readMore("need swap entry metadata");
80}
81
82void
83Rock::HeaderUpdater::stopReading(const char *why)
84{
85 debugs(47, 7, why);
86
87 Must(reader);
88 const IoState &rockReader = dynamic_cast<IoState&>(*reader);
89 update.stale.splicingPoint = rockReader.splicingPoint;
90 staleSplicingPointNext = rockReader.staleSplicingPointNext;
91 debugs(47, 5, "stale chain ends at " << update.stale.splicingPoint <<
92 " body continues at " << staleSplicingPointNext);
93
94 reader->close(StoreIOState::readerDone); // calls noteDoneReading(0)
95 reader = nullptr; // so that swanSong() does not try to close again
96}
97
98void
99Rock::HeaderUpdater::NoteRead(void *data, const char *buf, ssize_t result, StoreIOState::Pointer)
100{
672337d1 101 IoCbParams io(buf, result);
abf396ec
AR
102 // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
103 CallJobHere1(47, 7,
104 CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
105 Rock::HeaderUpdater,
106 noteRead,
672337d1 107 io);
abf396ec
AR
108}
109
110void
672337d1 111Rock::HeaderUpdater::noteRead(const Rock::HeaderUpdater::IoCbParams result)
abf396ec 112{
672337d1
CT
113 debugs(47, 7, result.size);
114 if (!result.size) { // EOF
abf396ec
AR
115 stopReading("eof");
116 } else {
672337d1
CT
117 Must(result.size > 0);
118 bytesRead += result.size;
119 readerBuffer.rawAppendFinish(result.buf, result.size);
abf396ec
AR
120 exchangeBuffer.append(readerBuffer);
121 debugs(47, 7, "accumulated " << exchangeBuffer.length());
122 }
123
124 parseReadBytes();
125}
126
127void
128Rock::HeaderUpdater::readMore(const char *why)
129{
130 debugs(47, 7, "from " << bytesRead << " because " << why);
131 Must(reader);
132 readerBuffer.clear();
133 storeRead(reader,
672337d1 134 readerBuffer.rawAppendStart(store->slotSize),
abf396ec
AR
135 store->slotSize,
136 bytesRead,
137 &NoteRead,
138 this);
139}
140
141void
142Rock::HeaderUpdater::NoteDoneReading(void *data, int errflag, StoreIOState::Pointer)
143{
144 // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free".
145 CallJobHere1(47, 7,
146 CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
147 Rock::HeaderUpdater,
148 noteDoneReading,
149 errflag);
150}
151
152void
153Rock::HeaderUpdater::noteDoneReading(int errflag)
154{
155 debugs(47, 5, errflag << " writer=" << writer);
ee64423f 156 if (!reader) {
abf396ec
AR
157 Must(!errflag); // we only initiate successful closures
158 Must(writer); // otherwise we would be done() and would not be called
159 } else {
160 reader = nullptr; // we are done reading
161 Must(errflag); // any external closures ought to be errors
162 mustStop("read error");
163 }
164}
165
166void
167Rock::HeaderUpdater::startWriting()
168{
169 writer = store->createUpdateIO(
170 update,
171 nullptr, // unused; see StoreIOState::file_callback
172 &NoteDoneWriting,
173 this);
174 Must(writer);
175
176 IoState &rockWriter = dynamic_cast<IoState&>(*writer);
177 rockWriter.staleSplicingPointNext = staleSplicingPointNext;
178
66d51f4f
AR
179 // here, prefix is swap header plus HTTP reply header (i.e., updated bytes)
180 uint64_t stalePrefixSz = 0;
181 uint64_t freshPrefixSz = 0;
182
abf396ec
AR
183 off_t offset = 0; // current writing offset (for debugging)
184
66d51f4f
AR
185 const auto &mem = update.entry->mem();
186
abf396ec
AR
187 {
188 debugs(20, 7, "fresh store meta for " << *update.entry);
66d51f4f
AR
189 size_t freshSwapHeaderSize = 0; // set by getSerialisedMetaData() below
190
191 // There is a circular dependency between the correct/fresh value of
192 // entry->swap_file_sz and freshSwapHeaderSize. We break that loop by
193 // serializing zero swap_file_sz, just like the regular first-time
194 // swapout code may do. De-serializing code will re-calculate it in
195 // storeRebuildParseEntry(). TODO: Stop serializing entry->swap_file_sz.
196 const auto savedEntrySwapFileSize = update.entry->swap_file_sz;
197 update.entry->swap_file_sz = 0;
198 const auto freshSwapHeader = update.entry->getSerialisedMetaData(freshSwapHeaderSize);
199 update.entry->swap_file_sz = savedEntrySwapFileSize;
200
abf396ec
AR
201 Must(freshSwapHeader);
202 writer->write(freshSwapHeader, freshSwapHeaderSize, 0, nullptr);
66d51f4f
AR
203 stalePrefixSz += mem.swap_hdr_sz;
204 freshPrefixSz += freshSwapHeaderSize;
abf396ec 205 offset += freshSwapHeaderSize;
27534aa0 206 xfree(freshSwapHeader);
abf396ec
AR
207 }
208
209 {
210 debugs(20, 7, "fresh HTTP header @ " << offset);
66d51f4f 211 const auto httpHeader = mem.freshestReply().pack();
abf396ec 212 writer->write(httpHeader->content(), httpHeader->contentSize(), -1, nullptr);
66d51f4f
AR
213 const auto &staleReply = mem.baseReply();
214 Must(staleReply.hdr_sz >= 0); // for int-to-uint64_t conversion below
215 Must(staleReply.hdr_sz > 0); // already initialized
216 stalePrefixSz += staleReply.hdr_sz;
217 freshPrefixSz += httpHeader->contentSize();
abf396ec
AR
218 offset += httpHeader->contentSize();
219 delete httpHeader;
220 }
221
222 {
223 debugs(20, 7, "moved HTTP body prefix @ " << offset);
224 writer->write(exchangeBuffer.rawContent(), exchangeBuffer.length(), -1, nullptr);
225 offset += exchangeBuffer.length();
226 exchangeBuffer.clear();
227 }
228
66d51f4f
AR
229 debugs(20, 7, "wrote " << offset <<
230 "; swap_file_sz delta: -" << stalePrefixSz << " +" << freshPrefixSz);
231
232 // Optimistic early update OK: Our write lock blocks access to swap_file_sz.
233 auto &swap_file_sz = update.fresh.anchor->basics.swap_file_sz;
234 Must(swap_file_sz >= stalePrefixSz);
235 swap_file_sz -= stalePrefixSz;
236 swap_file_sz += freshPrefixSz;
abf396ec
AR
237
238 writer->close(StoreIOState::wroteAll); // should call noteDoneWriting()
239}
240
241void
242Rock::HeaderUpdater::NoteDoneWriting(void *data, int errflag, StoreIOState::Pointer)
243{
244 CallJobHere1(47, 7,
245 CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)),
246 Rock::HeaderUpdater,
247 noteDoneWriting,
248 errflag);
249}
250
251void
252Rock::HeaderUpdater::noteDoneWriting(int errflag)
253{
254 debugs(47, 5, errflag << " reader=" << reader);
255 Must(!errflag);
256 Must(!reader); // if we wrote everything, then we must have read everything
257
258 Must(writer);
259 IoState &rockWriter = dynamic_cast<IoState&>(*writer);
260 update.fresh.splicingPoint = rockWriter.splicingPoint;
261 debugs(47, 5, "fresh chain ends at " << update.fresh.splicingPoint);
262 store->map->closeForUpdating(update);
263 rockWriter.writeableAnchor_ = nullptr;
264 writer = nullptr; // we are done writing
265
266 Must(doneAll());
267}
268
269void
270Rock::HeaderUpdater::parseReadBytes()
271{
272 if (!staleSwapHeaderSize) {
273 StoreMetaUnpacker aBuilder(
274 exchangeBuffer.rawContent(),
275 exchangeBuffer.length(),
276 &staleSwapHeaderSize);
277 // Squid assumes that metadata always fits into a single db slot
4c2f8b72 278 aBuilder.checkBuffer(); // cannot update an entry with invalid metadata
abf396ec
AR
279 debugs(47, 7, "staleSwapHeaderSize=" << staleSwapHeaderSize);
280 Must(staleSwapHeaderSize > 0);
281 exchangeBuffer.consume(staleSwapHeaderSize);
282 }
283
284 const size_t staleHttpHeaderSize = headersEnd(
285 exchangeBuffer.rawContent(),
286 exchangeBuffer.length());
287 debugs(47, 7, "staleHttpHeaderSize=" << staleHttpHeaderSize);
288 if (!staleHttpHeaderSize) {
289 readMore("need more stale HTTP reply header data");
290 return;
291 }
292
293 exchangeBuffer.consume(staleHttpHeaderSize);
294 debugs(47, 7, "httpBodySizePrefix=" << exchangeBuffer.length());
295
296 stopReading("read the last HTTP header slot");
297 startWriting();
298}
299