]>
Commit | Line | Data |
---|---|---|
abf396ec | 1 | /* |
77b1029d | 2 | * Copyright (C) 1996-2020 The Squid Software Foundation and contributors |
abf396ec AR |
3 | * |
4 | * Squid software is distributed under GPLv2+ license and includes | |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
7 | */ | |
8 | ||
9 | #include "squid.h" | |
10 | #include "base/AsyncJobCalls.h" | |
11 | #include "Debug.h" | |
12 | #include "fs/rock/RockHeaderUpdater.h" | |
13 | #include "fs/rock/RockIoState.h" | |
14 | #include "mime_header.h" | |
15 | #include "Store.h" | |
16 | #include "StoreMetaUnpacker.h" | |
17 | ||
18 | CBDATA_NAMESPACED_CLASS_INIT(Rock, HeaderUpdater); | |
19 | ||
20 | Rock::HeaderUpdater::HeaderUpdater(const Rock::SwapDir::Pointer &aStore, const Ipc::StoreMapUpdate &anUpdate): | |
21 | AsyncJob("Rock::HeaderUpdater"), | |
22 | store(aStore), | |
23 | update(anUpdate), | |
24 | reader(), | |
25 | writer(), | |
26 | bytesRead(0), | |
27 | staleSwapHeaderSize(0), | |
28 | staleSplicingPointNext(-1) | |
29 | { | |
30 | // TODO: Consider limiting the number of concurrent store updates. | |
31 | } | |
32 | ||
33 | bool | |
34 | Rock::HeaderUpdater::doneAll() const | |
35 | { | |
36 | return !reader && !writer && AsyncJob::doneAll(); | |
37 | } | |
38 | ||
39 | void | |
40 | Rock::HeaderUpdater::swanSong() | |
41 | { | |
42 | if (update.stale || update.fresh) | |
43 | store->map->abortUpdating(update); | |
44 | ||
45 | if (reader) { | |
46 | reader->close(StoreIOState::readerDone); | |
47 | reader = nullptr; | |
48 | } | |
49 | ||
50 | if (writer) { | |
51 | writer->close(StoreIOState::writerGone); | |
52 | // Emulate SwapDir::disconnect() that writeCompleted(err) hopes for. | |
53 | // Also required to avoid IoState destructor assertions. | |
54 | // We can do this because we closed update earlier or aborted it above. | |
55 | dynamic_cast<IoState&>(*writer).writeableAnchor_ = nullptr; | |
56 | writer = nullptr; | |
57 | } | |
58 | ||
59 | AsyncJob::swanSong(); | |
60 | } | |
61 | ||
62 | void | |
63 | Rock::HeaderUpdater::start() | |
64 | { | |
65 | Must(update.entry); | |
66 | Must(update.stale); | |
67 | Must(update.fresh); | |
68 | startReading(); | |
69 | } | |
70 | ||
71 | void | |
72 | Rock::HeaderUpdater::startReading() | |
73 | { | |
74 | reader = store->openStoreIO( | |
75 | *update.entry, | |
76 | nullptr, // unused; see StoreIOState::file_callback | |
77 | &NoteDoneReading, | |
78 | this); | |
79 | readMore("need swap entry metadata"); | |
80 | } | |
81 | ||
82 | void | |
83 | Rock::HeaderUpdater::stopReading(const char *why) | |
84 | { | |
85 | debugs(47, 7, why); | |
86 | ||
87 | Must(reader); | |
88 | const IoState &rockReader = dynamic_cast<IoState&>(*reader); | |
89 | update.stale.splicingPoint = rockReader.splicingPoint; | |
90 | staleSplicingPointNext = rockReader.staleSplicingPointNext; | |
91 | debugs(47, 5, "stale chain ends at " << update.stale.splicingPoint << | |
92 | " body continues at " << staleSplicingPointNext); | |
93 | ||
94 | reader->close(StoreIOState::readerDone); // calls noteDoneReading(0) | |
95 | reader = nullptr; // so that swanSong() does not try to close again | |
96 | } | |
97 | ||
98 | void | |
99 | Rock::HeaderUpdater::NoteRead(void *data, const char *buf, ssize_t result, StoreIOState::Pointer) | |
100 | { | |
672337d1 | 101 | IoCbParams io(buf, result); |
abf396ec AR |
102 | // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free". |
103 | CallJobHere1(47, 7, | |
104 | CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)), | |
105 | Rock::HeaderUpdater, | |
106 | noteRead, | |
672337d1 | 107 | io); |
abf396ec AR |
108 | } |
109 | ||
110 | void | |
672337d1 | 111 | Rock::HeaderUpdater::noteRead(const Rock::HeaderUpdater::IoCbParams result) |
abf396ec | 112 | { |
672337d1 CT |
113 | debugs(47, 7, result.size); |
114 | if (!result.size) { // EOF | |
abf396ec AR |
115 | stopReading("eof"); |
116 | } else { | |
672337d1 CT |
117 | Must(result.size > 0); |
118 | bytesRead += result.size; | |
119 | readerBuffer.rawAppendFinish(result.buf, result.size); | |
abf396ec AR |
120 | exchangeBuffer.append(readerBuffer); |
121 | debugs(47, 7, "accumulated " << exchangeBuffer.length()); | |
122 | } | |
123 | ||
124 | parseReadBytes(); | |
125 | } | |
126 | ||
127 | void | |
128 | Rock::HeaderUpdater::readMore(const char *why) | |
129 | { | |
130 | debugs(47, 7, "from " << bytesRead << " because " << why); | |
131 | Must(reader); | |
132 | readerBuffer.clear(); | |
133 | storeRead(reader, | |
672337d1 | 134 | readerBuffer.rawAppendStart(store->slotSize), |
abf396ec AR |
135 | store->slotSize, |
136 | bytesRead, | |
137 | &NoteRead, | |
138 | this); | |
139 | } | |
140 | ||
141 | void | |
142 | Rock::HeaderUpdater::NoteDoneReading(void *data, int errflag, StoreIOState::Pointer) | |
143 | { | |
144 | // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free". | |
145 | CallJobHere1(47, 7, | |
146 | CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)), | |
147 | Rock::HeaderUpdater, | |
148 | noteDoneReading, | |
149 | errflag); | |
150 | } | |
151 | ||
152 | void | |
153 | Rock::HeaderUpdater::noteDoneReading(int errflag) | |
154 | { | |
155 | debugs(47, 5, errflag << " writer=" << writer); | |
ee64423f | 156 | if (!reader) { |
abf396ec AR |
157 | Must(!errflag); // we only initiate successful closures |
158 | Must(writer); // otherwise we would be done() and would not be called | |
159 | } else { | |
160 | reader = nullptr; // we are done reading | |
161 | Must(errflag); // any external closures ought to be errors | |
162 | mustStop("read error"); | |
163 | } | |
164 | } | |
165 | ||
166 | void | |
167 | Rock::HeaderUpdater::startWriting() | |
168 | { | |
169 | writer = store->createUpdateIO( | |
170 | update, | |
171 | nullptr, // unused; see StoreIOState::file_callback | |
172 | &NoteDoneWriting, | |
173 | this); | |
174 | Must(writer); | |
175 | ||
176 | IoState &rockWriter = dynamic_cast<IoState&>(*writer); | |
177 | rockWriter.staleSplicingPointNext = staleSplicingPointNext; | |
178 | ||
66d51f4f AR |
179 | // here, prefix is swap header plus HTTP reply header (i.e., updated bytes) |
180 | uint64_t stalePrefixSz = 0; | |
181 | uint64_t freshPrefixSz = 0; | |
182 | ||
abf396ec AR |
183 | off_t offset = 0; // current writing offset (for debugging) |
184 | ||
66d51f4f AR |
185 | const auto &mem = update.entry->mem(); |
186 | ||
abf396ec AR |
187 | { |
188 | debugs(20, 7, "fresh store meta for " << *update.entry); | |
66d51f4f AR |
189 | size_t freshSwapHeaderSize = 0; // set by getSerialisedMetaData() below |
190 | ||
191 | // There is a circular dependency between the correct/fresh value of | |
192 | // entry->swap_file_sz and freshSwapHeaderSize. We break that loop by | |
193 | // serializing zero swap_file_sz, just like the regular first-time | |
194 | // swapout code may do. De-serializing code will re-calculate it in | |
195 | // storeRebuildParseEntry(). TODO: Stop serializing entry->swap_file_sz. | |
196 | const auto savedEntrySwapFileSize = update.entry->swap_file_sz; | |
197 | update.entry->swap_file_sz = 0; | |
198 | const auto freshSwapHeader = update.entry->getSerialisedMetaData(freshSwapHeaderSize); | |
199 | update.entry->swap_file_sz = savedEntrySwapFileSize; | |
200 | ||
abf396ec AR |
201 | Must(freshSwapHeader); |
202 | writer->write(freshSwapHeader, freshSwapHeaderSize, 0, nullptr); | |
66d51f4f AR |
203 | stalePrefixSz += mem.swap_hdr_sz; |
204 | freshPrefixSz += freshSwapHeaderSize; | |
abf396ec | 205 | offset += freshSwapHeaderSize; |
27534aa0 | 206 | xfree(freshSwapHeader); |
abf396ec AR |
207 | } |
208 | ||
209 | { | |
210 | debugs(20, 7, "fresh HTTP header @ " << offset); | |
66d51f4f | 211 | const auto httpHeader = mem.freshestReply().pack(); |
abf396ec | 212 | writer->write(httpHeader->content(), httpHeader->contentSize(), -1, nullptr); |
66d51f4f AR |
213 | const auto &staleReply = mem.baseReply(); |
214 | Must(staleReply.hdr_sz >= 0); // for int-to-uint64_t conversion below | |
215 | Must(staleReply.hdr_sz > 0); // already initialized | |
216 | stalePrefixSz += staleReply.hdr_sz; | |
217 | freshPrefixSz += httpHeader->contentSize(); | |
abf396ec AR |
218 | offset += httpHeader->contentSize(); |
219 | delete httpHeader; | |
220 | } | |
221 | ||
222 | { | |
223 | debugs(20, 7, "moved HTTP body prefix @ " << offset); | |
224 | writer->write(exchangeBuffer.rawContent(), exchangeBuffer.length(), -1, nullptr); | |
225 | offset += exchangeBuffer.length(); | |
226 | exchangeBuffer.clear(); | |
227 | } | |
228 | ||
66d51f4f AR |
229 | debugs(20, 7, "wrote " << offset << |
230 | "; swap_file_sz delta: -" << stalePrefixSz << " +" << freshPrefixSz); | |
231 | ||
232 | // Optimistic early update OK: Our write lock blocks access to swap_file_sz. | |
233 | auto &swap_file_sz = update.fresh.anchor->basics.swap_file_sz; | |
234 | Must(swap_file_sz >= stalePrefixSz); | |
235 | swap_file_sz -= stalePrefixSz; | |
236 | swap_file_sz += freshPrefixSz; | |
abf396ec AR |
237 | |
238 | writer->close(StoreIOState::wroteAll); // should call noteDoneWriting() | |
239 | } | |
240 | ||
241 | void | |
242 | Rock::HeaderUpdater::NoteDoneWriting(void *data, int errflag, StoreIOState::Pointer) | |
243 | { | |
244 | CallJobHere1(47, 7, | |
245 | CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)), | |
246 | Rock::HeaderUpdater, | |
247 | noteDoneWriting, | |
248 | errflag); | |
249 | } | |
250 | ||
251 | void | |
252 | Rock::HeaderUpdater::noteDoneWriting(int errflag) | |
253 | { | |
254 | debugs(47, 5, errflag << " reader=" << reader); | |
255 | Must(!errflag); | |
256 | Must(!reader); // if we wrote everything, then we must have read everything | |
257 | ||
258 | Must(writer); | |
259 | IoState &rockWriter = dynamic_cast<IoState&>(*writer); | |
260 | update.fresh.splicingPoint = rockWriter.splicingPoint; | |
261 | debugs(47, 5, "fresh chain ends at " << update.fresh.splicingPoint); | |
262 | store->map->closeForUpdating(update); | |
263 | rockWriter.writeableAnchor_ = nullptr; | |
264 | writer = nullptr; // we are done writing | |
265 | ||
266 | Must(doneAll()); | |
267 | } | |
268 | ||
269 | void | |
270 | Rock::HeaderUpdater::parseReadBytes() | |
271 | { | |
272 | if (!staleSwapHeaderSize) { | |
273 | StoreMetaUnpacker aBuilder( | |
274 | exchangeBuffer.rawContent(), | |
275 | exchangeBuffer.length(), | |
276 | &staleSwapHeaderSize); | |
277 | // Squid assumes that metadata always fits into a single db slot | |
4c2f8b72 | 278 | aBuilder.checkBuffer(); // cannot update an entry with invalid metadata |
abf396ec AR |
279 | debugs(47, 7, "staleSwapHeaderSize=" << staleSwapHeaderSize); |
280 | Must(staleSwapHeaderSize > 0); | |
281 | exchangeBuffer.consume(staleSwapHeaderSize); | |
282 | } | |
283 | ||
284 | const size_t staleHttpHeaderSize = headersEnd( | |
285 | exchangeBuffer.rawContent(), | |
286 | exchangeBuffer.length()); | |
287 | debugs(47, 7, "staleHttpHeaderSize=" << staleHttpHeaderSize); | |
288 | if (!staleHttpHeaderSize) { | |
289 | readMore("need more stale HTTP reply header data"); | |
290 | return; | |
291 | } | |
292 | ||
293 | exchangeBuffer.consume(staleHttpHeaderSize); | |
294 | debugs(47, 7, "httpBodySizePrefix=" << exchangeBuffer.length()); | |
295 | ||
296 | stopReading("read the last HTTP header slot"); | |
297 | startWriting(); | |
298 | } | |
299 |