]>
Commit | Line | Data |
---|---|---|
abf396ec AR |
1 | /* |
2 | * Copyright (C) 1996-2016 The Squid Software Foundation and contributors | |
3 | * | |
4 | * Squid software is distributed under GPLv2+ license and includes | |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
7 | */ | |
8 | ||
9 | #include "squid.h" | |
10 | #include "base/AsyncJobCalls.h" | |
11 | #include "Debug.h" | |
12 | #include "fs/rock/RockHeaderUpdater.h" | |
13 | #include "fs/rock/RockIoState.h" | |
14 | #include "mime_header.h" | |
15 | #include "Store.h" | |
16 | #include "StoreMetaUnpacker.h" | |
17 | ||
18 | CBDATA_NAMESPACED_CLASS_INIT(Rock, HeaderUpdater); | |
19 | ||
20 | Rock::HeaderUpdater::HeaderUpdater(const Rock::SwapDir::Pointer &aStore, const Ipc::StoreMapUpdate &anUpdate): | |
21 | AsyncJob("Rock::HeaderUpdater"), | |
22 | store(aStore), | |
23 | update(anUpdate), | |
24 | reader(), | |
25 | writer(), | |
26 | bytesRead(0), | |
27 | staleSwapHeaderSize(0), | |
28 | staleSplicingPointNext(-1) | |
29 | { | |
30 | // TODO: Consider limiting the number of concurrent store updates. | |
31 | } | |
32 | ||
33 | bool | |
34 | Rock::HeaderUpdater::doneAll() const | |
35 | { | |
36 | return !reader && !writer && AsyncJob::doneAll(); | |
37 | } | |
38 | ||
39 | void | |
40 | Rock::HeaderUpdater::swanSong() | |
41 | { | |
42 | if (update.stale || update.fresh) | |
43 | store->map->abortUpdating(update); | |
44 | ||
45 | if (reader) { | |
46 | reader->close(StoreIOState::readerDone); | |
47 | reader = nullptr; | |
48 | } | |
49 | ||
50 | if (writer) { | |
51 | writer->close(StoreIOState::writerGone); | |
52 | // Emulate SwapDir::disconnect() that writeCompleted(err) hopes for. | |
53 | // Also required to avoid IoState destructor assertions. | |
54 | // We can do this because we closed update earlier or aborted it above. | |
55 | dynamic_cast<IoState&>(*writer).writeableAnchor_ = nullptr; | |
56 | writer = nullptr; | |
57 | } | |
58 | ||
59 | AsyncJob::swanSong(); | |
60 | } | |
61 | ||
62 | void | |
63 | Rock::HeaderUpdater::start() | |
64 | { | |
65 | Must(update.entry); | |
66 | Must(update.stale); | |
67 | Must(update.fresh); | |
68 | startReading(); | |
69 | } | |
70 | ||
71 | void | |
72 | Rock::HeaderUpdater::startReading() | |
73 | { | |
74 | reader = store->openStoreIO( | |
75 | *update.entry, | |
76 | nullptr, // unused; see StoreIOState::file_callback | |
77 | &NoteDoneReading, | |
78 | this); | |
79 | readMore("need swap entry metadata"); | |
80 | } | |
81 | ||
82 | void | |
83 | Rock::HeaderUpdater::stopReading(const char *why) | |
84 | { | |
85 | debugs(47, 7, why); | |
86 | ||
87 | Must(reader); | |
88 | const IoState &rockReader = dynamic_cast<IoState&>(*reader); | |
89 | update.stale.splicingPoint = rockReader.splicingPoint; | |
90 | staleSplicingPointNext = rockReader.staleSplicingPointNext; | |
91 | debugs(47, 5, "stale chain ends at " << update.stale.splicingPoint << | |
92 | " body continues at " << staleSplicingPointNext); | |
93 | ||
94 | reader->close(StoreIOState::readerDone); // calls noteDoneReading(0) | |
95 | reader = nullptr; // so that swanSong() does not try to close again | |
96 | } | |
97 | ||
98 | void | |
99 | Rock::HeaderUpdater::NoteRead(void *data, const char *buf, ssize_t result, StoreIOState::Pointer) | |
100 | { | |
101 | // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free". | |
102 | CallJobHere1(47, 7, | |
103 | CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)), | |
104 | Rock::HeaderUpdater, | |
105 | noteRead, | |
106 | result); | |
107 | } | |
108 | ||
109 | void | |
110 | Rock::HeaderUpdater::noteRead(ssize_t result) | |
111 | { | |
112 | debugs(47, 7, result); | |
113 | if (!result) { // EOF | |
114 | stopReading("eof"); | |
115 | } else { | |
116 | Must(result > 0); | |
117 | bytesRead += result; | |
118 | readerBuffer.forceSize(readerBuffer.length() + result); | |
119 | exchangeBuffer.append(readerBuffer); | |
120 | debugs(47, 7, "accumulated " << exchangeBuffer.length()); | |
121 | } | |
122 | ||
123 | parseReadBytes(); | |
124 | } | |
125 | ||
126 | void | |
127 | Rock::HeaderUpdater::readMore(const char *why) | |
128 | { | |
129 | debugs(47, 7, "from " << bytesRead << " because " << why); | |
130 | Must(reader); | |
131 | readerBuffer.clear(); | |
132 | storeRead(reader, | |
133 | readerBuffer.rawSpace(store->slotSize), | |
134 | store->slotSize, | |
135 | bytesRead, | |
136 | &NoteRead, | |
137 | this); | |
138 | } | |
139 | ||
140 | void | |
141 | Rock::HeaderUpdater::NoteDoneReading(void *data, int errflag, StoreIOState::Pointer) | |
142 | { | |
143 | // TODO: Avoid Rock::StoreIOStateCb for jobs to protect jobs for "free". | |
144 | CallJobHere1(47, 7, | |
145 | CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)), | |
146 | Rock::HeaderUpdater, | |
147 | noteDoneReading, | |
148 | errflag); | |
149 | } | |
150 | ||
151 | void | |
152 | Rock::HeaderUpdater::noteDoneReading(int errflag) | |
153 | { | |
154 | debugs(47, 5, errflag << " writer=" << writer); | |
155 | if (const bool weInitiatedThisClosure = !reader) { | |
156 | Must(!errflag); // we only initiate successful closures | |
157 | Must(writer); // otherwise we would be done() and would not be called | |
158 | } else { | |
159 | reader = nullptr; // we are done reading | |
160 | Must(errflag); // any external closures ought to be errors | |
161 | mustStop("read error"); | |
162 | } | |
163 | } | |
164 | ||
165 | void | |
166 | Rock::HeaderUpdater::startWriting() | |
167 | { | |
168 | writer = store->createUpdateIO( | |
169 | update, | |
170 | nullptr, // unused; see StoreIOState::file_callback | |
171 | &NoteDoneWriting, | |
172 | this); | |
173 | Must(writer); | |
174 | ||
175 | IoState &rockWriter = dynamic_cast<IoState&>(*writer); | |
176 | rockWriter.staleSplicingPointNext = staleSplicingPointNext; | |
177 | ||
178 | off_t offset = 0; // current writing offset (for debugging) | |
179 | ||
180 | { | |
181 | debugs(20, 7, "fresh store meta for " << *update.entry); | |
182 | const char *freshSwapHeader = update.entry->getSerialisedMetaData(); | |
183 | const auto freshSwapHeaderSize = update.entry->mem_obj->swap_hdr_sz; | |
184 | Must(freshSwapHeader); | |
185 | writer->write(freshSwapHeader, freshSwapHeaderSize, 0, nullptr); | |
186 | offset += freshSwapHeaderSize; | |
187 | } | |
188 | ||
189 | { | |
190 | debugs(20, 7, "fresh HTTP header @ " << offset); | |
191 | MemBuf *httpHeader = update.entry->mem_obj->getReply()->pack(); | |
192 | writer->write(httpHeader->content(), httpHeader->contentSize(), -1, nullptr); | |
193 | offset += httpHeader->contentSize(); | |
194 | delete httpHeader; | |
195 | } | |
196 | ||
197 | { | |
198 | debugs(20, 7, "moved HTTP body prefix @ " << offset); | |
199 | writer->write(exchangeBuffer.rawContent(), exchangeBuffer.length(), -1, nullptr); | |
200 | offset += exchangeBuffer.length(); | |
201 | exchangeBuffer.clear(); | |
202 | } | |
203 | ||
204 | debugs(20, 7, "wrote " << offset); | |
205 | ||
206 | writer->close(StoreIOState::wroteAll); // should call noteDoneWriting() | |
207 | } | |
208 | ||
209 | void | |
210 | Rock::HeaderUpdater::NoteDoneWriting(void *data, int errflag, StoreIOState::Pointer) | |
211 | { | |
212 | CallJobHere1(47, 7, | |
213 | CbcPointer<HeaderUpdater>(static_cast<HeaderUpdater*>(data)), | |
214 | Rock::HeaderUpdater, | |
215 | noteDoneWriting, | |
216 | errflag); | |
217 | } | |
218 | ||
219 | void | |
220 | Rock::HeaderUpdater::noteDoneWriting(int errflag) | |
221 | { | |
222 | debugs(47, 5, errflag << " reader=" << reader); | |
223 | Must(!errflag); | |
224 | Must(!reader); // if we wrote everything, then we must have read everything | |
225 | ||
226 | Must(writer); | |
227 | IoState &rockWriter = dynamic_cast<IoState&>(*writer); | |
228 | update.fresh.splicingPoint = rockWriter.splicingPoint; | |
229 | debugs(47, 5, "fresh chain ends at " << update.fresh.splicingPoint); | |
230 | store->map->closeForUpdating(update); | |
231 | rockWriter.writeableAnchor_ = nullptr; | |
232 | writer = nullptr; // we are done writing | |
233 | ||
234 | Must(doneAll()); | |
235 | } | |
236 | ||
237 | void | |
238 | Rock::HeaderUpdater::parseReadBytes() | |
239 | { | |
240 | if (!staleSwapHeaderSize) { | |
241 | StoreMetaUnpacker aBuilder( | |
242 | exchangeBuffer.rawContent(), | |
243 | exchangeBuffer.length(), | |
244 | &staleSwapHeaderSize); | |
245 | // Squid assumes that metadata always fits into a single db slot | |
246 | Must(aBuilder.isBufferSane()); // cannot update what we cannot parse | |
247 | debugs(47, 7, "staleSwapHeaderSize=" << staleSwapHeaderSize); | |
248 | Must(staleSwapHeaderSize > 0); | |
249 | exchangeBuffer.consume(staleSwapHeaderSize); | |
250 | } | |
251 | ||
252 | const size_t staleHttpHeaderSize = headersEnd( | |
253 | exchangeBuffer.rawContent(), | |
254 | exchangeBuffer.length()); | |
255 | debugs(47, 7, "staleHttpHeaderSize=" << staleHttpHeaderSize); | |
256 | if (!staleHttpHeaderSize) { | |
257 | readMore("need more stale HTTP reply header data"); | |
258 | return; | |
259 | } | |
260 | ||
261 | exchangeBuffer.consume(staleHttpHeaderSize); | |
262 | debugs(47, 7, "httpBodySizePrefix=" << exchangeBuffer.length()); | |
263 | ||
264 | stopReading("read the last HTTP header slot"); | |
265 | startWriting(); | |
266 | } | |
267 |