]>
Commit | Line | Data |
---|---|---|
1b39caaa | 1 | |
2 | #include "squid.h" | |
3d93a84d | 3 | #include "base/TextException.h" |
1b39caaa | 4 | #include "BodyPipe.h" |
5 | ||
6 | CBDATA_CLASS_INIT(BodyPipe); | |
7 | ||
e7352f30 | 8 | // BodySink is a BodyConsumer class which just consume and drops |
26ac0430 AJ |
9 | // data from a BodyPipe |
10 | class BodySink: public BodyConsumer | |
11 | { | |
e7352f30 | 12 | bool done; |
13 | public: | |
26ac0430 | 14 | BodySink():AsyncJob("BodySink"), done(false) {} |
e7352f30 | 15 | virtual ~BodySink() {} |
26ac0430 | 16 | |
e7352f30 | 17 | virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) { |
26ac0430 AJ |
18 | size_t contentSize = bp->buf().contentSize(); |
19 | bp->consume(contentSize); | |
e7352f30 | 20 | } |
21 | virtual void noteBodyProductionEnded(BodyPipe::Pointer bp) { | |
26ac0430 AJ |
22 | stopConsumingFrom(bp); |
23 | done = true; | |
e7352f30 | 24 | } |
25 | virtual void noteBodyProducerAborted(BodyPipe::Pointer bp) { | |
26ac0430 AJ |
26 | stopConsumingFrom(bp); |
27 | done = true; | |
e7352f30 | 28 | } |
29 | bool doneAll() const {return done && AsyncJob::doneAll();} | |
30 | CBDATA_CLASS2(BodySink); | |
31 | }; | |
32 | ||
33 | CBDATA_CLASS_INIT(BodySink); | |
34 | ||
35 | // The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls. | |
26ac0430 | 36 | // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of |
e7352f30 | 37 | // the BodyPipe passed as argument |
38 | class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer> | |
39 | { | |
40 | public: | |
26ac0430 | 41 | typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent; |
e7352f30 | 42 | |
26ac0430 AJ |
43 | BodyProducerDialer(BodyProducer *aProducer, Parent::Method aHandler, |
44 | BodyPipe::Pointer bp): Parent(aProducer, aHandler, bp) {} | |
e7352f30 | 45 | |
26ac0430 | 46 | virtual bool canDial(AsyncCall &call); |
e7352f30 | 47 | }; |
48 | ||
49 | // The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls. | |
26ac0430 | 50 | // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient |
e7352f30 | 51 | // of the BodyPipe passed as argument |
52 | class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> | |
53 | { | |
54 | public: | |
26ac0430 | 55 | typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent; |
e7352f30 | 56 | |
26ac0430 AJ |
57 | BodyConsumerDialer(BodyConsumer *aConsumer, Parent::Method aHandler, |
58 | BodyPipe::Pointer bp): Parent(aConsumer, aHandler, bp) {} | |
e7352f30 | 59 | |
26ac0430 | 60 | virtual bool canDial(AsyncCall &call); |
e7352f30 | 61 | }; |
62 | ||
63 | bool | |
26ac0430 AJ |
64 | BodyProducerDialer::canDial(AsyncCall &call) |
65 | { | |
66 | if (!Parent::canDial(call)) | |
67 | return false; | |
68 | ||
69 | BodyProducer *producer = object; | |
70 | BodyPipe::Pointer pipe = arg1; | |
71 | if (!pipe->stillProducing(producer)) { | |
72 | debugs(call.debugSection, call.debugLevel, HERE << producer << | |
73 | " no longer producing for " << pipe->status()); | |
74 | return call.cancel("no longer producing"); | |
75 | } | |
e7352f30 | 76 | |
26ac0430 | 77 | return true; |
e7352f30 | 78 | } |
79 | ||
80 | bool | |
26ac0430 AJ |
81 | BodyConsumerDialer::canDial(AsyncCall &call) |
82 | { | |
83 | if (!Parent::canDial(call)) | |
84 | return false; | |
85 | ||
86 | BodyConsumer *consumer = object; | |
87 | BodyPipe::Pointer pipe = arg1; | |
88 | if (!pipe->stillConsuming(consumer)) { | |
89 | debugs(call.debugSection, call.debugLevel, HERE << consumer << | |
90 | " no longer consuming from " << pipe->status()); | |
91 | return call.cancel("no longer consuming"); | |
92 | } | |
e7352f30 | 93 | |
26ac0430 | 94 | return true; |
e7352f30 | 95 | } |
96 | ||
97 | ||
98 | /* BodyProducer */ | |
99 | ||
1b39caaa | 100 | // inform the pipe that we are done and clear the Pointer |
101 | void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof) | |
102 | { | |
26ac0430 AJ |
103 | debugs(91,7, HERE << this << " will not produce for " << pipe << |
104 | "; atEof: " << atEof); | |
105 | assert(pipe != NULL); // be strict: the caller state may depend on this | |
106 | pipe->clearProducer(atEof); | |
107 | pipe = NULL; | |
1b39caaa | 108 | } |
109 | ||
e7352f30 | 110 | |
111 | ||
112 | /* BodyConsumer */ | |
113 | ||
1b39caaa | 114 | // inform the pipe that we are done and clear the Pointer |
115 | void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe) | |
116 | { | |
26ac0430 AJ |
117 | debugs(91,7, HERE << this << " will not consume from " << pipe); |
118 | assert(pipe != NULL); // be strict: the caller state may depend on this | |
119 | pipe->clearConsumer(); | |
120 | pipe = NULL; | |
1b39caaa | 121 | } |
122 | ||
e7352f30 | 123 | |
1b39caaa | 124 | /* BodyPipe */ |
125 | ||
126 | BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1), | |
26ac0430 AJ |
127 | theProducer(aProducer), theConsumer(0), |
128 | thePutSize(0), theGetSize(0), | |
129 | mustAutoConsume(false), isCheckedOut(false) | |
1b39caaa | 130 | { |
26ac0430 AJ |
131 | // TODO: teach MemBuf to start with zero minSize |
132 | // TODO: limit maxSize by theBodySize, when known? | |
133 | theBuf.init(2*1024, MaxCapacity); | |
134 | debugs(91,7, HERE << "created BodyPipe" << status()); | |
1b39caaa | 135 | } |
136 | ||
137 | BodyPipe::~BodyPipe() | |
138 | { | |
26ac0430 AJ |
139 | debugs(91,7, HERE << "destroying BodyPipe" << status()); |
140 | assert(!theProducer); | |
141 | assert(!theConsumer); | |
142 | theBuf.clean(); | |
1b39caaa | 143 | } |
144 | ||
47f6e231 | 145 | void BodyPipe::setBodySize(uint64_t aBodySize) |
1b39caaa | 146 | { |
26ac0430 AJ |
147 | assert(!bodySizeKnown()); |
148 | assert(aBodySize >= 0); | |
149 | assert(thePutSize <= aBodySize); | |
1b39caaa | 150 | |
26ac0430 AJ |
151 | // If this assert fails, we need to add code to check for eof and inform |
152 | // the consumer about the eof condition via scheduleBodyEndNotification, | |
153 | // because just setting a body size limit may trigger the eof condition. | |
154 | assert(!theConsumer); | |
1b39caaa | 155 | |
26ac0430 AJ |
156 | theBodySize = aBodySize; |
157 | debugs(91,7, HERE << "set body size" << status()); | |
1b39caaa | 158 | } |
159 | ||
47f6e231 | 160 | uint64_t BodyPipe::bodySize() const |
1b39caaa | 161 | { |
26ac0430 AJ |
162 | assert(bodySizeKnown()); |
163 | return static_cast<uint64_t>(theBodySize); | |
1b39caaa | 164 | } |
165 | ||
47f6e231 | 166 | bool BodyPipe::expectMoreAfter(uint64_t offset) const |
1b39caaa | 167 | { |
26ac0430 AJ |
168 | assert(theGetSize <= offset); |
169 | return offset < thePutSize || // buffer has more now or | |
170 | (!productionEnded() && mayNeedMoreData()); // buffer will have more | |
1b39caaa | 171 | } |
172 | ||
173 | bool BodyPipe::exhausted() const | |
174 | { | |
26ac0430 | 175 | return !expectMoreAfter(theGetSize); |
1b39caaa | 176 | } |
177 | ||
610d8f3b | 178 | uint64_t BodyPipe::unproducedSize() const |
1b39caaa | 179 | { |
26ac0430 | 180 | return bodySize() - thePutSize; // bodySize() asserts that size is known |
1b39caaa | 181 | } |
182 | ||
183 | void | |
184 | BodyPipe::clearProducer(bool atEof) | |
185 | { | |
26ac0430 AJ |
186 | if (theProducer) { |
187 | debugs(91,7, HERE << "clearing BodyPipe producer" << status()); | |
188 | theProducer = NULL; | |
189 | if (atEof) { | |
190 | if (!bodySizeKnown()) | |
191 | theBodySize = thePutSize; | |
e1381638 AJ |
192 | else if (bodySize() != thePutSize) |
193 | debugs(91,3, HERE << "aborting on premature eof" << status()); | |
26ac0430 AJ |
194 | } else { |
195 | // asserta that we can detect the abort if the consumer joins later | |
196 | assert(!bodySizeKnown() || bodySize() != thePutSize); | |
197 | } | |
198 | scheduleBodyEndNotification(); | |
199 | } | |
1b39caaa | 200 | } |
201 | ||
202 | size_t | |
350e2aec | 203 | BodyPipe::putMoreData(const char *aBuffer, size_t size) |
1b39caaa | 204 | { |
26ac0430 | 205 | if (bodySizeKnown()) |
d85c3078 | 206 | size = min((uint64_t)size, unproducedSize()); |
26ac0430 AJ |
207 | |
208 | const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize()); | |
d85c3078 | 209 | if ((size = min(size, spaceSize))) { |
350e2aec | 210 | theBuf.append(aBuffer, size); |
26ac0430 AJ |
211 | postAppend(size); |
212 | return size; | |
213 | } | |
214 | return 0; | |
1b39caaa | 215 | } |
216 | ||
217 | bool | |
218 | BodyPipe::setConsumerIfNotLate(Consumer *aConsumer) | |
219 | { | |
26ac0430 AJ |
220 | assert(!theConsumer); |
221 | assert(aConsumer); | |
222 | ||
223 | // TODO: convert this into an exception and remove IfNotLate suffix | |
224 | // If there is something consumed already, we are in an auto-consuming mode | |
225 | // and it is too late to attach a real consumer to the pipe. | |
226 | if (theGetSize > 0) { | |
227 | assert(mustAutoConsume); | |
228 | return false; | |
229 | } | |
1b39caaa | 230 | |
26ac0430 AJ |
231 | theConsumer = aConsumer; |
232 | debugs(91,7, HERE << "set consumer" << status()); | |
233 | if (theBuf.hasContent()) | |
234 | scheduleBodyDataNotification(); | |
235 | if (!theProducer) | |
236 | scheduleBodyEndNotification(); | |
1b39caaa | 237 | |
26ac0430 | 238 | return true; |
1b39caaa | 239 | } |
240 | ||
6c56baf6 | 241 | // When BodyPipe consumer is gone, all events for that consumer must not |
242 | // reach the new consumer (if any). Otherwise, the calls may go out of order | |
26ac0430 | 243 | // (if _some_ calls are dropped due to the ultimate destination being |
6c56baf6 | 244 | // temporary NULL). The code keeps track of the number of outstanding |
245 | // events and skips that number if consumer leaves. TODO: when AscyncCall | |
246 | // support is improved, should we just schedule calls directly to consumer? | |
1b39caaa | 247 | void |
26ac0430 AJ |
248 | BodyPipe::clearConsumer() |
249 | { | |
250 | if (theConsumer) { | |
251 | debugs(91,7, HERE << "clearing consumer" << status()); | |
252 | theConsumer = NULL; | |
253 | if (consumedSize() && !exhausted()) { | |
254 | AsyncCall::Pointer call= asyncCall(91, 7, | |
255 | "BodyProducer::noteBodyConsumerAborted", | |
256 | BodyProducerDialer(theProducer, | |
257 | &BodyProducer::noteBodyConsumerAborted, this)); | |
258 | ScheduleCallHere(call); | |
259 | } | |
260 | } | |
1b39caaa | 261 | } |
262 | ||
263 | size_t | |
350e2aec | 264 | BodyPipe::getMoreData(MemBuf &aMemBuffer) |
1b39caaa | 265 | { |
26ac0430 AJ |
266 | if (!theBuf.hasContent()) |
267 | return 0; // did not touch the possibly uninitialized buf | |
1b39caaa | 268 | |
350e2aec FC |
269 | if (aMemBuffer.isNull()) |
270 | aMemBuffer.init(); | |
271 | const size_t size = min(theBuf.contentSize(), aMemBuffer.potentialSpaceSize()); | |
272 | aMemBuffer.append(theBuf.content(), size); | |
26ac0430 AJ |
273 | theBuf.consume(size); |
274 | postConsume(size); | |
275 | return size; // cannot be zero if we called buf.init above | |
1b39caaa | 276 | } |
277 | ||
278 | void | |
279 | BodyPipe::consume(size_t size) | |
280 | { | |
26ac0430 AJ |
281 | theBuf.consume(size); |
282 | postConsume(size); | |
1b39caaa | 283 | } |
284 | ||
26ac0430 | 285 | // In the AutoConsumption mode the consumer has gone but the producer continues |
e7352f30 | 286 | // producing data. We are using a BodySink BodyConsumer which just discards the produced data. |
1b39caaa | 287 | void |
26ac0430 AJ |
288 | BodyPipe::enableAutoConsumption() |
289 | { | |
290 | mustAutoConsume = true; | |
291 | debugs(91,5, HERE << "enabled auto consumption" << status()); | |
292 | if (!theConsumer && theBuf.hasContent()) | |
293 | startAutoConsumption(); | |
5121d48e AR |
294 | } |
295 | ||
296 | // start auto consumption by creating body sink | |
297 | void | |
298 | BodyPipe::startAutoConsumption() | |
299 | { | |
26ac0430 AJ |
300 | Must(mustAutoConsume); |
301 | Must(!theConsumer); | |
302 | theConsumer = new BodySink; | |
303 | debugs(91,7, HERE << "starting auto consumption" << status()); | |
304 | scheduleBodyDataNotification(); | |
1b39caaa | 305 | } |
306 | ||
307 | MemBuf & | |
26ac0430 AJ |
308 | BodyPipe::checkOut() |
309 | { | |
310 | assert(!isCheckedOut); | |
311 | isCheckedOut = true; | |
312 | return theBuf; | |
1b39caaa | 313 | } |
314 | ||
315 | void | |
316 | BodyPipe::checkIn(Checkout &checkout) | |
317 | { | |
26ac0430 AJ |
318 | assert(isCheckedOut); |
319 | isCheckedOut = false; | |
320 | const size_t currentSize = theBuf.contentSize(); | |
321 | if (checkout.checkedOutSize > currentSize) | |
322 | postConsume(checkout.checkedOutSize - currentSize); | |
e1381638 AJ |
323 | else if (checkout.checkedOutSize < currentSize) |
324 | postAppend(currentSize - checkout.checkedOutSize); | |
1b39caaa | 325 | } |
326 | ||
327 | void | |
328 | BodyPipe::undoCheckOut(Checkout &checkout) | |
329 | { | |
26ac0430 AJ |
330 | assert(isCheckedOut); |
331 | const size_t currentSize = theBuf.contentSize(); | |
332 | // We can only undo if size did not change, and even that carries | |
333 | // some risk. If this becomes a problem, the code checking out | |
334 | // raw buffers should always check them in (possibly unchanged) | |
335 | // instead of relying on the automated undo mechanism of Checkout. | |
336 | // The code can always use a temporary buffer to accomplish that. | |
337 | assert(checkout.checkedOutSize == currentSize); | |
1b39caaa | 338 | } |
339 | ||
340 | // TODO: Optimize: inform consumer/producer about more data/space only if | |
341 | // they used the data/space since we notified them last time. | |
342 | ||
343 | void | |
26ac0430 AJ |
344 | BodyPipe::postConsume(size_t size) |
345 | { | |
346 | assert(!isCheckedOut); | |
347 | theGetSize += size; | |
348 | debugs(91,7, HERE << "consumed " << size << " bytes" << status()); | |
349 | if (mayNeedMoreData()) { | |
350 | AsyncCall::Pointer call= asyncCall(91, 7, | |
351 | "BodyProducer::noteMoreBodySpaceAvailable", | |
352 | BodyProducerDialer(theProducer, | |
353 | &BodyProducer::noteMoreBodySpaceAvailable, this)); | |
354 | ScheduleCallHere(call); | |
355 | } | |
1b39caaa | 356 | } |
357 | ||
358 | void | |
26ac0430 AJ |
359 | BodyPipe::postAppend(size_t size) |
360 | { | |
361 | assert(!isCheckedOut); | |
362 | thePutSize += size; | |
363 | debugs(91,7, HERE << "added " << size << " bytes" << status()); | |
1b39caaa | 364 | |
26ac0430 AJ |
365 | if (mustAutoConsume && !theConsumer && size > 0) |
366 | startAutoConsumption(); | |
5121d48e | 367 | |
26ac0430 AJ |
368 | // We should not consume here even if mustAutoConsume because the |
369 | // caller may not be ready for the data to be consumed during this call. | |
370 | scheduleBodyDataNotification(); | |
1b39caaa | 371 | |
26ac0430 AJ |
372 | if (!mayNeedMoreData()) |
373 | clearProducer(true); // reached end-of-body | |
1b39caaa | 374 | } |
375 | ||
376 | ||
6c56baf6 | 377 | void |
378 | BodyPipe::scheduleBodyDataNotification() | |
379 | { | |
26ac0430 AJ |
380 | if (theConsumer) { |
381 | AsyncCall::Pointer call = asyncCall(91, 7, | |
382 | "BodyConsumer::noteMoreBodyDataAvailable", | |
383 | BodyConsumerDialer(theConsumer, | |
384 | &BodyConsumer::noteMoreBodyDataAvailable, this)); | |
385 | ScheduleCallHere(call); | |
386 | } | |
6c56baf6 | 387 | } |
388 | ||
1b39caaa | 389 | void |
390 | BodyPipe::scheduleBodyEndNotification() | |
391 | { | |
26ac0430 AJ |
392 | if (theConsumer) { |
393 | if (bodySizeKnown() && bodySize() == thePutSize) { | |
394 | AsyncCall::Pointer call = asyncCall(91, 7, | |
395 | "BodyConsumer::noteBodyProductionEnded", | |
396 | BodyConsumerDialer(theConsumer, | |
397 | &BodyConsumer::noteBodyProductionEnded, this)); | |
398 | ScheduleCallHere(call); | |
399 | } else { | |
400 | AsyncCall::Pointer call = asyncCall(91, 7, | |
401 | "BodyConsumer::noteBodyProducerAborted", | |
402 | BodyConsumerDialer(theConsumer, | |
403 | &BodyConsumer::noteBodyProducerAborted, this)); | |
404 | ScheduleCallHere(call); | |
405 | } | |
406 | } | |
1b39caaa | 407 | } |
408 | ||
1b39caaa | 409 | // a short temporary string describing buffer status for debugging |
410 | const char *BodyPipe::status() const | |
411 | { | |
350e2aec FC |
412 | static MemBuf outputBuffer; |
413 | outputBuffer.reset(); | |
1b39caaa | 414 | |
350e2aec | 415 | outputBuffer.append(" [", 2); |
1b39caaa | 416 | |
350e2aec | 417 | outputBuffer.Printf("%"PRIu64"<=%"PRIu64, theGetSize, thePutSize); |
1b39caaa | 418 | if (theBodySize >= 0) |
350e2aec | 419 | outputBuffer.Printf("<=%"PRId64, theBodySize); |
26ac0430 | 420 | else |
350e2aec | 421 | outputBuffer.append("<=?", 3); |
1b39caaa | 422 | |
350e2aec | 423 | outputBuffer.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize()); |
1b39caaa | 424 | |
350e2aec | 425 | outputBuffer.Printf(" pipe%p", this); |
1b39caaa | 426 | if (theProducer) |
350e2aec | 427 | outputBuffer.Printf(" prod%p", theProducer); |
1b39caaa | 428 | if (theConsumer) |
350e2aec | 429 | outputBuffer.Printf(" cons%p", theConsumer); |
1b39caaa | 430 | |
26ac0430 | 431 | if (mustAutoConsume) |
350e2aec | 432 | outputBuffer.append(" A", 2); |
26ac0430 | 433 | if (isCheckedOut) |
350e2aec | 434 | outputBuffer.append(" L", 2); // Locked |
1b39caaa | 435 | |
350e2aec | 436 | outputBuffer.append("]", 1); |
1b39caaa | 437 | |
350e2aec | 438 | outputBuffer.terminate(); |
1b39caaa | 439 | |
350e2aec | 440 | return outputBuffer.content(); |
1b39caaa | 441 | } |
442 | ||
443 | ||
444 | /* BodyPipeCheckout */ | |
445 | ||
446 | BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe), | |
26ac0430 AJ |
447 | buf(aPipe.checkOut()), offset(aPipe.consumedSize()), |
448 | checkedOutSize(buf.contentSize()), checkedIn(false) | |
1b39caaa | 449 | { |
450 | } | |
451 | ||
452 | BodyPipeCheckout::~BodyPipeCheckout() | |
453 | { | |
26ac0430 AJ |
454 | if (!checkedIn) |
455 | pipe.undoCheckOut(*this); | |
1b39caaa | 456 | } |
457 | ||
458 | void | |
459 | BodyPipeCheckout::checkIn() | |
460 | { | |
26ac0430 AJ |
461 | assert(!checkedIn); |
462 | pipe.checkIn(*this); | |
463 | checkedIn = true; | |
1b39caaa | 464 | } |
465 | ||
466 | ||
467 | BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe), | |
26ac0430 AJ |
468 | buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize), |
469 | checkedIn(c.checkedIn) | |
1b39caaa | 470 | { |
26ac0430 | 471 | assert(false); // prevent copying |
1b39caaa | 472 | } |
473 | ||
474 | BodyPipeCheckout & | |
475 | BodyPipeCheckout::operator =(const BodyPipeCheckout &) | |
476 | { | |
26ac0430 AJ |
477 | assert(false); // prevent assignment |
478 | return *this; | |
1b39caaa | 479 | } |