]>
Commit | Line | Data |
---|---|---|
1b39caaa | 1 | |
2 | #include "squid.h" | |
3 | #include "BodyPipe.h" | |
4 | ||
5 | CBDATA_CLASS_INIT(BodyPipe); | |
6 | ||
7 | // inform the pipe that we are done and clear the Pointer | |
8 | void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof) | |
9 | { | |
10 | debugs(91,7, HERE << this << " will not produce for " << pipe << | |
11 | "; atEof: " << atEof); | |
12 | assert(pipe != NULL); // be strict: the caller state may depend on this | |
13 | pipe->clearProducer(atEof); | |
14 | pipe = NULL; | |
15 | } | |
16 | ||
17 | // inform the pipe that we are done and clear the Pointer | |
18 | void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe) | |
19 | { | |
20 | debugs(91,7, HERE << this << " will not consume from " << pipe); | |
21 | assert(pipe != NULL); // be strict: the caller state may depend on this | |
22 | pipe->clearConsumer(); | |
23 | pipe = NULL; | |
24 | } | |
25 | ||
26 | /* BodyPipe */ | |
27 | ||
28 | BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1), | |
29 | theProducer(aProducer), theConsumer(0), | |
6c56baf6 | 30 | thePutSize(0), theGetSize(0), theCCallsPending(0), theCCallsToSkip(0), |
31 | mustAutoConsume(false), isCheckedOut(false) | |
1b39caaa | 32 | { |
33 | // TODO: teach MemBuf to start with zero minSize | |
34 | // TODO: limit maxSize by theBodySize, when known? | |
35 | theBuf.init(2*1024, MaxCapacity); | |
36 | debugs(91,7, HERE << "created BodyPipe" << status()); | |
37 | } | |
38 | ||
39 | BodyPipe::~BodyPipe() | |
40 | { | |
41 | debugs(91,7, HERE << "destroying BodyPipe" << status()); | |
42 | assert(!theProducer); | |
43 | assert(!theConsumer); | |
44 | theBuf.clean(); | |
45 | } | |
46 | ||
47f6e231 | 47 | void BodyPipe::setBodySize(uint64_t aBodySize) |
1b39caaa | 48 | { |
49 | assert(!bodySizeKnown()); | |
50 | assert(aBodySize >= 0); | |
51 | assert(thePutSize <= aBodySize); | |
52 | ||
53 | // If this assert fails, we need to add code to check for eof and inform | |
54 | // the consumer about the eof condition via scheduleBodyEndNotification, | |
55 | // because just setting a body size limit may trigger the eof condition. | |
56 | assert(!theConsumer); | |
57 | ||
58 | theBodySize = aBodySize; | |
59 | debugs(91,7, HERE << "set body size" << status()); | |
60 | } | |
61 | ||
47f6e231 | 62 | uint64_t BodyPipe::bodySize() const |
1b39caaa | 63 | { |
64 | assert(bodySizeKnown()); | |
47f6e231 | 65 | return static_cast<uint64_t>(theBodySize); |
1b39caaa | 66 | } |
67 | ||
47f6e231 | 68 | bool BodyPipe::expectMoreAfter(uint64_t offset) const |
1b39caaa | 69 | { |
70 | assert(theGetSize <= offset); | |
71 | return offset < thePutSize || // buffer has more now or | |
72 | (!productionEnded() && mayNeedMoreData()); // buffer will have more | |
73 | } | |
74 | ||
75 | bool BodyPipe::exhausted() const | |
76 | { | |
77 | return !expectMoreAfter(theGetSize); | |
78 | } | |
79 | ||
80 | size_t BodyPipe::unproducedSize() const | |
81 | { | |
82 | return bodySize() - thePutSize; // bodySize() asserts that size is known | |
83 | } | |
84 | ||
85 | void | |
86 | BodyPipe::clearProducer(bool atEof) | |
87 | { | |
88 | if (theProducer) { | |
89 | debugs(91,7, HERE << "clearing BodyPipe producer" << status()); | |
90 | theProducer = NULL; | |
91 | if (atEof) { | |
92 | if (!bodySizeKnown()) | |
93 | theBodySize = thePutSize; | |
94 | else | |
95 | if (bodySize() != thePutSize) | |
96 | debugs(91,1, HERE << "aborting on premature eof" << status()); | |
97 | } else { | |
98 | // asserta that we can detect the abort if the consumer joins later | |
99 | assert(!bodySizeKnown() || bodySize() != thePutSize); | |
100 | } | |
101 | scheduleBodyEndNotification(); | |
102 | } | |
103 | } | |
104 | ||
105 | size_t | |
106 | BodyPipe::putMoreData(const char *buf, size_t size) | |
107 | { | |
108 | if (bodySizeKnown()) | |
109 | size = XMIN(size, unproducedSize()); | |
110 | ||
111 | const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize()); | |
112 | if ((size = XMIN(size, spaceSize))) { | |
113 | theBuf.append(buf, size); | |
114 | postAppend(size); | |
115 | return size; | |
116 | } | |
117 | return 0; | |
118 | } | |
119 | ||
120 | bool | |
121 | BodyPipe::setConsumerIfNotLate(Consumer *aConsumer) | |
122 | { | |
123 | assert(!theConsumer); | |
124 | assert(aConsumer); | |
125 | ||
126 | // TODO: convert this into an exception and remove IfNotLate suffix | |
127 | // If there is something consumed already, we are in an auto-consuming mode | |
128 | // and it is too late to attach a real consumer to the pipe. | |
129 | if (theGetSize > 0) { | |
130 | assert(mustAutoConsume); | |
131 | return false; | |
132 | } | |
133 | ||
134 | theConsumer = aConsumer; | |
135 | debugs(91,7, HERE << "set consumer" << status()); | |
136 | if (theBuf.hasContent()) | |
6c56baf6 | 137 | scheduleBodyDataNotification(); |
1b39caaa | 138 | if (!theProducer) |
139 | scheduleBodyEndNotification(); | |
140 | ||
141 | return true; | |
142 | } | |
143 | ||
6c56baf6 | 144 | // When BodyPipe consumer is gone, all events for that consumer must not |
145 | // reach the new consumer (if any). Otherwise, the calls may go out of order | |
146 | // (if _some_ calls are dropped due to the ultimate destination being | |
147 | // temporary NULL). The code keeps track of the number of outstanding | |
148 | // events and skips that number if consumer leaves. TODO: when AscyncCall | |
149 | // support is improved, should we just schedule calls directly to consumer? | |
1b39caaa | 150 | void |
151 | BodyPipe::clearConsumer() { | |
152 | if (theConsumer) { | |
153 | debugs(91,7, HERE << "clearing consumer" << status()); | |
154 | theConsumer = NULL; | |
6c56baf6 | 155 | theCCallsToSkip = theCCallsPending; // skip all pending consumer calls |
e71477f3 | 156 | if (consumedSize() && !exhausted()) |
1b39caaa | 157 | AsyncCall(91,5, this, BodyPipe::tellBodyConsumerAborted); |
158 | } | |
159 | } | |
160 | ||
161 | size_t | |
162 | BodyPipe::getMoreData(MemBuf &buf) | |
163 | { | |
164 | if (!theBuf.hasContent()) | |
165 | return 0; // did not touch the possibly uninitialized buf | |
166 | ||
167 | if (buf.isNull()) | |
168 | buf.init(); | |
169 | const size_t size = XMIN(theBuf.contentSize(), buf.potentialSpaceSize()); | |
170 | buf.append(theBuf.content(), size); | |
171 | theBuf.consume(size); | |
172 | postConsume(size); | |
173 | return size; // cannot be zero if we called buf.init above | |
174 | } | |
175 | ||
176 | void | |
177 | BodyPipe::consume(size_t size) | |
178 | { | |
179 | theBuf.consume(size); | |
180 | postConsume(size); | |
181 | } | |
182 | ||
183 | void | |
184 | BodyPipe::enableAutoConsumption() { | |
185 | mustAutoConsume = true; | |
186 | debugs(91,5, HERE << "enabled auto consumption" << status()); | |
187 | if (!theConsumer && theBuf.hasContent()) | |
6c56baf6 | 188 | scheduleBodyDataNotification(); |
1b39caaa | 189 | } |
190 | ||
191 | MemBuf & | |
192 | BodyPipe::checkOut() { | |
193 | assert(!isCheckedOut); | |
194 | isCheckedOut = true; | |
195 | return theBuf; | |
196 | } | |
197 | ||
198 | void | |
199 | BodyPipe::checkIn(Checkout &checkout) | |
200 | { | |
201 | assert(isCheckedOut); | |
202 | isCheckedOut = false; | |
203 | const size_t currentSize = theBuf.contentSize(); | |
204 | if (checkout.checkedOutSize > currentSize) | |
205 | postConsume(checkout.checkedOutSize - currentSize); | |
206 | else | |
207 | if (checkout.checkedOutSize < currentSize) | |
208 | postAppend(currentSize - checkout.checkedOutSize); | |
209 | } | |
210 | ||
211 | void | |
212 | BodyPipe::undoCheckOut(Checkout &checkout) | |
213 | { | |
214 | assert(isCheckedOut); | |
215 | const size_t currentSize = theBuf.contentSize(); | |
216 | // We can only undo if size did not change, and even that carries | |
217 | // some risk. If this becomes a problem, the code checking out | |
218 | // raw buffers should always check them in (possibly unchanged) | |
219 | // instead of relying on the automated undo mechanism of Checkout. | |
220 | // The code can always use a temporary buffer to accomplish that. | |
221 | assert(checkout.checkedOutSize == currentSize); | |
222 | } | |
223 | ||
224 | // TODO: Optimize: inform consumer/producer about more data/space only if | |
225 | // they used the data/space since we notified them last time. | |
226 | ||
227 | void | |
228 | BodyPipe::postConsume(size_t size) { | |
229 | assert(!isCheckedOut); | |
230 | theGetSize += size; | |
231 | debugs(91,7, HERE << "consumed " << size << " bytes" << status()); | |
232 | if (mayNeedMoreData()) | |
233 | AsyncCall(91,5, this, BodyPipe::tellMoreBodySpaceAvailable); | |
234 | } | |
235 | ||
236 | void | |
237 | BodyPipe::postAppend(size_t size) { | |
238 | assert(!isCheckedOut); | |
239 | thePutSize += size; | |
240 | debugs(91,7, HERE << "added " << size << " bytes" << status()); | |
241 | ||
242 | // We should not consume here even if mustAutoConsume because the | |
243 | // caller may not be ready for the data to be consumed during this call. | |
6c56baf6 | 244 | scheduleBodyDataNotification(); |
1b39caaa | 245 | |
246 | if (!mayNeedMoreData()) | |
247 | clearProducer(true); // reached end-of-body | |
248 | } | |
249 | ||
250 | ||
6c56baf6 | 251 | void |
252 | BodyPipe::scheduleBodyDataNotification() | |
253 | { | |
254 | if (theConsumer || mustAutoConsume) { | |
255 | ++theCCallsPending; | |
256 | AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable); | |
257 | } | |
258 | } | |
259 | ||
1b39caaa | 260 | void |
261 | BodyPipe::scheduleBodyEndNotification() | |
262 | { | |
6c56baf6 | 263 | if (theConsumer) { |
264 | ++theCCallsPending; | |
265 | if (bodySizeKnown() && bodySize() == thePutSize) | |
266 | AsyncCall(91,5, this, BodyPipe::tellBodyProductionEnded); | |
267 | else | |
268 | AsyncCall(91,5, this, BodyPipe::tellBodyProducerAborted); | |
269 | } | |
1b39caaa | 270 | } |
271 | ||
272 | void BodyPipe::tellMoreBodySpaceAvailable() | |
273 | { | |
274 | if (theProducer != NULL) | |
275 | theProducer->noteMoreBodySpaceAvailable(*this); | |
276 | } | |
277 | ||
278 | void BodyPipe::tellBodyConsumerAborted() | |
279 | { | |
280 | if (theProducer != NULL) | |
281 | theProducer->noteBodyConsumerAborted(*this); | |
282 | } | |
283 | ||
284 | void BodyPipe::tellMoreBodyDataAvailable() | |
285 | { | |
6c56baf6 | 286 | if (skipCCall()) |
287 | return; | |
288 | ||
1b39caaa | 289 | if (theConsumer != NULL) |
290 | theConsumer->noteMoreBodyDataAvailable(*this); | |
291 | else | |
292 | if (mustAutoConsume && theBuf.hasContent()) | |
293 | consume(theBuf.contentSize()); | |
294 | } | |
295 | ||
296 | void BodyPipe::tellBodyProductionEnded() | |
297 | { | |
6c56baf6 | 298 | if (skipCCall()) |
299 | return; | |
300 | ||
1b39caaa | 301 | if (theConsumer != NULL) |
302 | theConsumer->noteBodyProductionEnded(*this); | |
303 | } | |
304 | ||
305 | void BodyPipe::tellBodyProducerAborted() | |
306 | { | |
6c56baf6 | 307 | if (skipCCall()) |
308 | return; | |
309 | ||
1b39caaa | 310 | if (theConsumer != NULL) |
311 | theConsumer->noteBodyProducerAborted(*this); | |
312 | } | |
313 | ||
6c56baf6 | 314 | // skips calls destined for the previous consumer; see BodyPipe::clearConsumer |
315 | bool BodyPipe::skipCCall() | |
316 | { | |
317 | assert(theCCallsPending > 0); | |
318 | --theCCallsPending; | |
319 | ||
320 | if (theCCallsToSkip <= 0) | |
321 | return false; | |
322 | ||
323 | --theCCallsToSkip; | |
324 | debugs(91,5, HERE << "skipped call"); | |
325 | return true; | |
326 | } | |
327 | ||
1b39caaa | 328 | // a short temporary string describing buffer status for debugging |
329 | const char *BodyPipe::status() const | |
330 | { | |
331 | static MemBuf buf; | |
332 | buf.reset(); | |
333 | ||
334 | buf.append(" [", 2); | |
335 | ||
47f6e231 | 336 | buf.Printf("%"PRIu64"<=%"PRIu64, theGetSize, thePutSize); |
1b39caaa | 337 | if (theBodySize >= 0) |
47f6e231 | 338 | buf.Printf("<=%"PRId64, theBodySize); |
1b39caaa | 339 | else |
340 | buf.append("<=?", 3); | |
341 | ||
342 | buf.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize()); | |
343 | ||
344 | buf.Printf(" pipe%p", this); | |
345 | if (theProducer) | |
346 | buf.Printf(" prod%p", theProducer); | |
347 | if (theConsumer) | |
348 | buf.Printf(" cons%p", theConsumer); | |
349 | ||
350 | if (mustAutoConsume) | |
351 | buf.append(" A", 2); | |
352 | if (isCheckedOut) | |
353 | buf.append(" L", 2); // Locked | |
354 | ||
355 | buf.append("]", 1); | |
356 | ||
357 | buf.terminate(); | |
358 | ||
359 | return buf.content(); | |
360 | } | |
361 | ||
362 | ||
363 | /* BodyPipeCheckout */ | |
364 | ||
365 | BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe), | |
366 | buf(aPipe.checkOut()), offset(aPipe.consumedSize()), | |
367 | checkedOutSize(buf.contentSize()), checkedIn(false) | |
368 | { | |
369 | } | |
370 | ||
371 | BodyPipeCheckout::~BodyPipeCheckout() | |
372 | { | |
373 | if (!checkedIn) | |
374 | pipe.undoCheckOut(*this); | |
375 | } | |
376 | ||
377 | void | |
378 | BodyPipeCheckout::checkIn() | |
379 | { | |
380 | assert(!checkedIn); | |
381 | pipe.checkIn(*this); | |
382 | checkedIn = true; | |
383 | } | |
384 | ||
385 | ||
386 | BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe), | |
387 | buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize), | |
388 | checkedIn(c.checkedIn) | |
389 | { | |
390 | assert(false); // prevent copying | |
391 | } | |
392 | ||
393 | BodyPipeCheckout & | |
394 | BodyPipeCheckout::operator =(const BodyPipeCheckout &) | |
395 | { | |
396 | assert(false); // prevent assignment | |
397 | return *this; | |
398 | } |