3 #include "base/AsyncJobCalls.h"
4 #include "base/TextException.h"
7 CBDATA_CLASS_INIT(BodyPipe
);
9 // BodySink is a BodyConsumer class which just consume and drops
10 // data from a BodyPipe
11 class BodySink
: public BodyConsumer
14 BodySink(const BodyPipe::Pointer
&bp
): AsyncJob("BodySink"), body_pipe(bp
) {}
15 virtual ~BodySink() { assert(!body_pipe
); }
17 virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp
) {
18 size_t contentSize
= bp
->buf().contentSize();
19 bp
->consume(contentSize
);
21 virtual void noteBodyProductionEnded(BodyPipe::Pointer bp
) {
22 stopConsumingFrom(body_pipe
);
24 virtual void noteBodyProducerAborted(BodyPipe::Pointer bp
) {
25 stopConsumingFrom(body_pipe
);
27 bool doneAll() const {return !body_pipe
&& AsyncJob::doneAll();}
30 BodyPipe::Pointer body_pipe
; ///< the pipe we are consuming from
32 CBDATA_CLASS2(BodySink
);
35 CBDATA_CLASS_INIT(BodySink
);
37 // The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
38 // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
39 // the BodyPipe passed as argument
40 class BodyProducerDialer
: public UnaryMemFunT
<BodyProducer
, BodyPipe::Pointer
>
43 typedef UnaryMemFunT
<BodyProducer
, BodyPipe::Pointer
> Parent
;
45 BodyProducerDialer(const BodyProducer::Pointer
&aProducer
,
46 Parent::Method aHandler
, BodyPipe::Pointer bp
):
47 Parent(aProducer
, aHandler
, bp
) {}
49 virtual bool canDial(AsyncCall
&call
);
52 // The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
53 // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
54 // of the BodyPipe passed as argument
55 class BodyConsumerDialer
: public UnaryMemFunT
<BodyConsumer
, BodyPipe::Pointer
>
58 typedef UnaryMemFunT
<BodyConsumer
, BodyPipe::Pointer
> Parent
;
60 BodyConsumerDialer(const BodyConsumer::Pointer
&aConsumer
,
61 Parent::Method aHandler
, BodyPipe::Pointer bp
):
62 Parent(aConsumer
, aHandler
, bp
) {}
64 virtual bool canDial(AsyncCall
&call
);
68 BodyProducerDialer::canDial(AsyncCall
&call
)
70 if (!Parent::canDial(call
))
73 const BodyProducer::Pointer
&producer
= job
;
74 BodyPipe::Pointer pipe
= arg1
;
75 if (!pipe
->stillProducing(producer
)) {
76 debugs(call
.debugSection
, call
.debugLevel
, HERE
<< producer
<<
77 " no longer producing for " << pipe
->status());
78 return call
.cancel("no longer producing");
85 BodyConsumerDialer::canDial(AsyncCall
&call
)
87 if (!Parent::canDial(call
))
90 const BodyConsumer::Pointer
&consumer
= job
;
91 BodyPipe::Pointer pipe
= arg1
;
92 if (!pipe
->stillConsuming(consumer
)) {
93 debugs(call
.debugSection
, call
.debugLevel
, HERE
<< consumer
<<
94 " no longer consuming from " << pipe
->status());
95 return call
.cancel("no longer consuming");
103 // inform the pipe that we are done and clear the Pointer
104 void BodyProducer::stopProducingFor(RefCount
<BodyPipe
> &pipe
, bool atEof
)
106 debugs(91,7, HERE
<< this << " will not produce for " << pipe
<<
107 "; atEof: " << atEof
);
108 assert(pipe
!= NULL
); // be strict: the caller state may depend on this
109 pipe
->clearProducer(atEof
);
115 // inform the pipe that we are done and clear the Pointer
116 void BodyConsumer::stopConsumingFrom(RefCount
<BodyPipe
> &pipe
)
118 debugs(91,7, HERE
<< this << " will not consume from " << pipe
);
119 assert(pipe
!= NULL
); // be strict: the caller state may depend on this
120 pipe
->clearConsumer();
126 BodyPipe::BodyPipe(Producer
*aProducer
): theBodySize(-1),
127 theProducer(aProducer
), theConsumer(0),
128 thePutSize(0), theGetSize(0),
129 mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
131 // TODO: teach MemBuf to start with zero minSize
132 // TODO: limit maxSize by theBodySize, when known?
133 theBuf
.init(2*1024, MaxCapacity
);
134 debugs(91,7, HERE
<< "created BodyPipe" << status());
137 BodyPipe::~BodyPipe()
139 debugs(91,7, HERE
<< "destroying BodyPipe" << status());
140 assert(!theProducer
);
141 assert(!theConsumer
);
145 void BodyPipe::setBodySize(uint64_t aBodySize
)
147 assert(!bodySizeKnown());
148 assert(thePutSize
<= aBodySize
);
150 // If this assert fails, we need to add code to check for eof and inform
151 // the consumer about the eof condition via scheduleBodyEndNotification,
152 // because just setting a body size limit may trigger the eof condition.
153 assert(!theConsumer
);
155 theBodySize
= aBodySize
;
156 debugs(91,7, HERE
<< "set body size" << status());
159 uint64_t BodyPipe::bodySize() const
161 assert(bodySizeKnown());
162 return static_cast<uint64_t>(theBodySize
);
165 bool BodyPipe::expectMoreAfter(uint64_t offset
) const
167 assert(theGetSize
<= offset
);
168 return offset
< thePutSize
|| // buffer has more now or
169 (!productionEnded() && mayNeedMoreData()); // buffer will have more
172 bool BodyPipe::exhausted() const
174 return !expectMoreAfter(theGetSize
);
177 uint64_t BodyPipe::unproducedSize() const
179 return bodySize() - thePutSize
; // bodySize() asserts that size is known
182 void BodyPipe::expectProductionEndAfter(uint64_t size
)
184 const uint64_t expectedSize
= thePutSize
+ size
;
186 Must(bodySize() == expectedSize
);
188 theBodySize
= expectedSize
;
192 BodyPipe::clearProducer(bool atEof
)
194 if (theProducer
.set()) {
195 debugs(91,7, HERE
<< "clearing BodyPipe producer" << status());
198 if (!bodySizeKnown())
199 theBodySize
= thePutSize
;
200 else if (bodySize() != thePutSize
)
201 debugs(91,3, HERE
<< "aborting on premature eof" << status());
203 // asserta that we can detect the abort if the consumer joins later
204 assert(!bodySizeKnown() || bodySize() != thePutSize
);
206 scheduleBodyEndNotification();
211 BodyPipe::putMoreData(const char *aBuffer
, size_t size
)
214 size
= min((uint64_t)size
, unproducedSize());
216 const size_t spaceSize
= static_cast<size_t>(theBuf
.potentialSpaceSize());
217 if ((size
= min(size
, spaceSize
))) {
218 theBuf
.append(aBuffer
, size
);
226 BodyPipe::setConsumerIfNotLate(const Consumer::Pointer
&aConsumer
)
228 assert(!theConsumer
);
229 assert(aConsumer
.set()); // but might be invalid
231 // TODO: convert this into an exception and remove IfNotLate suffix
232 // If there is something consumed already, we are in an auto-consuming mode
233 // and it is too late to attach a real consumer to the pipe.
234 if (theGetSize
> 0) {
235 assert(mustAutoConsume
);
239 Must(!abortedConsumption
); // did not promise to never consume
241 theConsumer
= aConsumer
;
242 debugs(91,7, HERE
<< "set consumer" << status());
243 if (theBuf
.hasContent())
244 scheduleBodyDataNotification();
246 scheduleBodyEndNotification();
252 BodyPipe::clearConsumer()
254 if (theConsumer
.set()) {
255 debugs(91,7, HERE
<< "clearing consumer" << status());
257 // do not abort if we have not consumed so that HTTP or ICAP can retry
258 // benign xaction failures due to persistent connection race conditions
260 expectNoConsumption();
265 BodyPipe::expectNoConsumption()
267 // We may be called multiple times because multiple jobs on the consumption
268 // chain may realize that there will be no more setConsumer() calls (e.g.,
269 // consuming code and retrying code). It is both difficult and not really
270 // necessary for them to coordinate their expectNoConsumption() calls.
272 // As a consequence, we may be called when we are auto-consuming already.
274 if (!abortedConsumption
&& !exhausted()) {
275 // Before we abort, any regular consumption should be over and auto
276 // consumption must not be started.
279 AsyncCall::Pointer call
= asyncCall(91, 7,
280 "BodyProducer::noteBodyConsumerAborted",
281 BodyProducerDialer(theProducer
,
282 &BodyProducer::noteBodyConsumerAborted
, this));
283 ScheduleCallHere(call
);
284 abortedConsumption
= true;
286 // in case somebody enabled auto-consumption before regular one aborted
288 startAutoConsumption();
293 BodyPipe::getMoreData(MemBuf
&aMemBuffer
)
295 if (!theBuf
.hasContent())
296 return 0; // did not touch the possibly uninitialized buf
298 if (aMemBuffer
.isNull())
300 const size_t size
= min(theBuf
.contentSize(), aMemBuffer
.potentialSpaceSize());
301 aMemBuffer
.append(theBuf
.content(), size
);
302 theBuf
.consume(size
);
304 return size
; // cannot be zero if we called buf.init above
308 BodyPipe::consume(size_t size
)
310 theBuf
.consume(size
);
314 // In the AutoConsumption mode the consumer has gone but the producer continues
315 // producing data. We are using a BodySink BodyConsumer which just discards the produced data.
317 BodyPipe::enableAutoConsumption()
319 mustAutoConsume
= true;
320 debugs(91,5, HERE
<< "enabled auto consumption" << status());
321 if (!theConsumer
&& theBuf
.hasContent())
322 startAutoConsumption();
325 // start auto consumption by creating body sink
327 BodyPipe::startAutoConsumption()
329 Must(mustAutoConsume
);
331 theConsumer
= new BodySink(this);
332 debugs(91,7, HERE
<< "starting auto consumption" << status());
333 scheduleBodyDataNotification();
339 assert(!isCheckedOut
);
345 BodyPipe::checkIn(Checkout
&checkout
)
347 assert(isCheckedOut
);
348 isCheckedOut
= false;
349 const size_t currentSize
= theBuf
.contentSize();
350 if (checkout
.checkedOutSize
> currentSize
)
351 postConsume(checkout
.checkedOutSize
- currentSize
);
352 else if (checkout
.checkedOutSize
< currentSize
)
353 postAppend(currentSize
- checkout
.checkedOutSize
);
357 BodyPipe::undoCheckOut(Checkout
&checkout
)
359 assert(isCheckedOut
);
360 const size_t currentSize
= theBuf
.contentSize();
361 // We can only undo if size did not change, and even that carries
362 // some risk. If this becomes a problem, the code checking out
363 // raw buffers should always check them in (possibly unchanged)
364 // instead of relying on the automated undo mechanism of Checkout.
365 // The code can always use a temporary buffer to accomplish that.
366 Must(checkout
.checkedOutSize
== currentSize
);
369 // TODO: Optimize: inform consumer/producer about more data/space only if
370 // they used the data/space since we notified them last time.
373 BodyPipe::postConsume(size_t size
)
375 assert(!isCheckedOut
);
377 debugs(91,7, HERE
<< "consumed " << size
<< " bytes" << status());
378 if (mayNeedMoreData()) {
379 AsyncCall::Pointer call
= asyncCall(91, 7,
380 "BodyProducer::noteMoreBodySpaceAvailable",
381 BodyProducerDialer(theProducer
,
382 &BodyProducer::noteMoreBodySpaceAvailable
, this));
383 ScheduleCallHere(call
);
388 BodyPipe::postAppend(size_t size
)
390 assert(!isCheckedOut
);
392 debugs(91,7, HERE
<< "added " << size
<< " bytes" << status());
394 if (mustAutoConsume
&& !theConsumer
&& size
> 0)
395 startAutoConsumption();
397 // We should not consume here even if mustAutoConsume because the
398 // caller may not be ready for the data to be consumed during this call.
399 scheduleBodyDataNotification();
401 if (!mayNeedMoreData())
402 clearProducer(true); // reached end-of-body
406 BodyPipe::scheduleBodyDataNotification()
408 if (theConsumer
.valid()) { // TODO: allow asyncCall() to check this instead
409 AsyncCall::Pointer call
= asyncCall(91, 7,
410 "BodyConsumer::noteMoreBodyDataAvailable",
411 BodyConsumerDialer(theConsumer
,
412 &BodyConsumer::noteMoreBodyDataAvailable
, this));
413 ScheduleCallHere(call
);
418 BodyPipe::scheduleBodyEndNotification()
420 if (theConsumer
.valid()) { // TODO: allow asyncCall() to check this instead
421 if (bodySizeKnown() && bodySize() == thePutSize
) {
422 AsyncCall::Pointer call
= asyncCall(91, 7,
423 "BodyConsumer::noteBodyProductionEnded",
424 BodyConsumerDialer(theConsumer
,
425 &BodyConsumer::noteBodyProductionEnded
, this));
426 ScheduleCallHere(call
);
428 AsyncCall::Pointer call
= asyncCall(91, 7,
429 "BodyConsumer::noteBodyProducerAborted",
430 BodyConsumerDialer(theConsumer
,
431 &BodyConsumer::noteBodyProducerAborted
, this));
432 ScheduleCallHere(call
);
437 // a short temporary string describing buffer status for debugging
438 const char *BodyPipe::status() const
440 static MemBuf outputBuffer
;
441 outputBuffer
.reset();
443 outputBuffer
.append(" [", 2);
445 outputBuffer
.Printf("%" PRIu64
"<=%" PRIu64
, theGetSize
, thePutSize
);
446 if (theBodySize
>= 0)
447 outputBuffer
.Printf("<=%" PRId64
, theBodySize
);
449 outputBuffer
.append("<=?", 3);
451 outputBuffer
.Printf(" %d+%d", (int)theBuf
.contentSize(), (int)theBuf
.spaceSize());
453 outputBuffer
.Printf(" pipe%p", this);
454 if (theProducer
.set())
455 outputBuffer
.Printf(" prod%p", theProducer
.get());
456 if (theConsumer
.set())
457 outputBuffer
.Printf(" cons%p", theConsumer
.get());
460 outputBuffer
.append(" A", 2);
461 if (abortedConsumption
)
462 outputBuffer
.append(" !C", 3);
464 outputBuffer
.append(" L", 2); // Locked
466 outputBuffer
.append("]", 1);
468 outputBuffer
.terminate();
470 return outputBuffer
.content();
473 /* BodyPipeCheckout */
475 BodyPipeCheckout::BodyPipeCheckout(BodyPipe
&aPipe
): pipe(aPipe
),
476 buf(aPipe
.checkOut()), offset(aPipe
.consumedSize()),
477 checkedOutSize(buf
.contentSize()), checkedIn(false)
481 BodyPipeCheckout::~BodyPipeCheckout()
484 // Do not pipe.undoCheckOut(*this) because it asserts or throws
485 // TODO: consider implementing the long-term solution discussed at
486 // http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html
487 debugs(91,2, HERE
<< "Warning: cannot undo BodyPipeCheckout");
493 BodyPipeCheckout::checkIn()
500 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout
&c
): pipe(c
.pipe
),
501 buf(c
.buf
), offset(c
.offset
), checkedOutSize(c
.checkedOutSize
),
502 checkedIn(c
.checkedIn
)
504 assert(false); // prevent copying
508 BodyPipeCheckout::operator =(const BodyPipeCheckout
&)
510 assert(false); // prevent assignment