3 #include "base/TextException.h"
6 CBDATA_CLASS_INIT(BodyPipe
);
8 // BodySink is a BodyConsumer class which just consume and drops
9 // data from a BodyPipe
10 class BodySink
: public BodyConsumer
14 BodySink():AsyncJob("BodySink"), done(false) {}
15 virtual ~BodySink() {}
17 virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp
) {
18 size_t contentSize
= bp
->buf().contentSize();
19 bp
->consume(contentSize
);
21 virtual void noteBodyProductionEnded(BodyPipe::Pointer bp
) {
22 stopConsumingFrom(bp
);
25 virtual void noteBodyProducerAborted(BodyPipe::Pointer bp
) {
26 stopConsumingFrom(bp
);
29 bool doneAll() const {return done
&& AsyncJob::doneAll();}
30 CBDATA_CLASS2(BodySink
);
33 CBDATA_CLASS_INIT(BodySink
);
35 // The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
36 // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
37 // the BodyPipe passed as argument
38 class BodyProducerDialer
: public UnaryMemFunT
<BodyProducer
, BodyPipe::Pointer
>
41 typedef UnaryMemFunT
<BodyProducer
, BodyPipe::Pointer
> Parent
;
43 BodyProducerDialer(BodyProducer
*aProducer
, Parent::Method aHandler
,
44 BodyPipe::Pointer bp
): Parent(aProducer
, aHandler
, bp
) {}
46 virtual bool canDial(AsyncCall
&call
);
49 // The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
50 // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
51 // of the BodyPipe passed as argument
52 class BodyConsumerDialer
: public UnaryMemFunT
<BodyConsumer
, BodyPipe::Pointer
>
55 typedef UnaryMemFunT
<BodyConsumer
, BodyPipe::Pointer
> Parent
;
57 BodyConsumerDialer(BodyConsumer
*aConsumer
, Parent::Method aHandler
,
58 BodyPipe::Pointer bp
): Parent(aConsumer
, aHandler
, bp
) {}
60 virtual bool canDial(AsyncCall
&call
);
64 BodyProducerDialer::canDial(AsyncCall
&call
)
66 if (!Parent::canDial(call
))
69 BodyProducer
*producer
= object
;
70 BodyPipe::Pointer pipe
= arg1
;
71 if (!pipe
->stillProducing(producer
)) {
72 debugs(call
.debugSection
, call
.debugLevel
, HERE
<< producer
<<
73 " no longer producing for " << pipe
->status());
74 return call
.cancel("no longer producing");
81 BodyConsumerDialer::canDial(AsyncCall
&call
)
83 if (!Parent::canDial(call
))
86 BodyConsumer
*consumer
= object
;
87 BodyPipe::Pointer pipe
= arg1
;
88 if (!pipe
->stillConsuming(consumer
)) {
89 debugs(call
.debugSection
, call
.debugLevel
, HERE
<< consumer
<<
90 " no longer consuming from " << pipe
->status());
91 return call
.cancel("no longer consuming");
100 // inform the pipe that we are done and clear the Pointer
101 void BodyProducer::stopProducingFor(RefCount
<BodyPipe
> &pipe
, bool atEof
)
103 debugs(91,7, HERE
<< this << " will not produce for " << pipe
<<
104 "; atEof: " << atEof
);
105 assert(pipe
!= NULL
); // be strict: the caller state may depend on this
106 pipe
->clearProducer(atEof
);
114 // inform the pipe that we are done and clear the Pointer
115 void BodyConsumer::stopConsumingFrom(RefCount
<BodyPipe
> &pipe
)
117 debugs(91,7, HERE
<< this << " will not consume from " << pipe
);
118 assert(pipe
!= NULL
); // be strict: the caller state may depend on this
119 pipe
->clearConsumer();
126 BodyPipe::BodyPipe(Producer
*aProducer
): theBodySize(-1),
127 theProducer(aProducer
), theConsumer(0),
128 thePutSize(0), theGetSize(0),
129 mustAutoConsume(false), isCheckedOut(false)
131 // TODO: teach MemBuf to start with zero minSize
132 // TODO: limit maxSize by theBodySize, when known?
133 theBuf
.init(2*1024, MaxCapacity
);
134 debugs(91,7, HERE
<< "created BodyPipe" << status());
137 BodyPipe::~BodyPipe()
139 debugs(91,7, HERE
<< "destroying BodyPipe" << status());
140 assert(!theProducer
);
141 assert(!theConsumer
);
145 void BodyPipe::setBodySize(uint64_t aBodySize
)
147 assert(!bodySizeKnown());
148 assert(aBodySize
>= 0);
149 assert(thePutSize
<= aBodySize
);
151 // If this assert fails, we need to add code to check for eof and inform
152 // the consumer about the eof condition via scheduleBodyEndNotification,
153 // because just setting a body size limit may trigger the eof condition.
154 assert(!theConsumer
);
156 theBodySize
= aBodySize
;
157 debugs(91,7, HERE
<< "set body size" << status());
160 uint64_t BodyPipe::bodySize() const
162 assert(bodySizeKnown());
163 return static_cast<uint64_t>(theBodySize
);
166 bool BodyPipe::expectMoreAfter(uint64_t offset
) const
168 assert(theGetSize
<= offset
);
169 return offset
< thePutSize
|| // buffer has more now or
170 (!productionEnded() && mayNeedMoreData()); // buffer will have more
173 bool BodyPipe::exhausted() const
175 return !expectMoreAfter(theGetSize
);
178 uint64_t BodyPipe::unproducedSize() const
180 return bodySize() - thePutSize
; // bodySize() asserts that size is known
183 void BodyPipe::expectProductionEndAfter(uint64_t size
)
185 const uint64_t expectedSize
= thePutSize
+ size
;
187 Must(bodySize() == expectedSize
);
189 theBodySize
= expectedSize
;
193 BodyPipe::clearProducer(bool atEof
)
196 debugs(91,7, HERE
<< "clearing BodyPipe producer" << status());
199 if (!bodySizeKnown())
200 theBodySize
= thePutSize
;
201 else if (bodySize() != thePutSize
)
202 debugs(91,3, HERE
<< "aborting on premature eof" << status());
204 // asserta that we can detect the abort if the consumer joins later
205 assert(!bodySizeKnown() || bodySize() != thePutSize
);
207 scheduleBodyEndNotification();
212 BodyPipe::putMoreData(const char *aBuffer
, size_t size
)
215 size
= min((uint64_t)size
, unproducedSize());
217 const size_t spaceSize
= static_cast<size_t>(theBuf
.potentialSpaceSize());
218 if ((size
= min(size
, spaceSize
))) {
219 theBuf
.append(aBuffer
, size
);
227 BodyPipe::setConsumerIfNotLate(Consumer
*aConsumer
)
229 assert(!theConsumer
);
232 // TODO: convert this into an exception and remove IfNotLate suffix
233 // If there is something consumed already, we are in an auto-consuming mode
234 // and it is too late to attach a real consumer to the pipe.
235 if (theGetSize
> 0) {
236 assert(mustAutoConsume
);
240 theConsumer
= aConsumer
;
241 debugs(91,7, HERE
<< "set consumer" << status());
242 if (theBuf
.hasContent())
243 scheduleBodyDataNotification();
245 scheduleBodyEndNotification();
250 // When BodyPipe consumer is gone, all events for that consumer must not
251 // reach the new consumer (if any). Otherwise, the calls may go out of order
252 // (if _some_ calls are dropped due to the ultimate destination being
253 // temporary NULL). The code keeps track of the number of outstanding
254 // events and skips that number if consumer leaves. TODO: when AscyncCall
255 // support is improved, should we just schedule calls directly to consumer?
257 BodyPipe::clearConsumer()
260 debugs(91,7, HERE
<< "clearing consumer" << status());
262 if (consumedSize() && !exhausted()) {
263 AsyncCall::Pointer call
= asyncCall(91, 7,
264 "BodyProducer::noteBodyConsumerAborted",
265 BodyProducerDialer(theProducer
,
266 &BodyProducer::noteBodyConsumerAborted
, this));
267 ScheduleCallHere(call
);
273 BodyPipe::getMoreData(MemBuf
&aMemBuffer
)
275 if (!theBuf
.hasContent())
276 return 0; // did not touch the possibly uninitialized buf
278 if (aMemBuffer
.isNull())
280 const size_t size
= min(theBuf
.contentSize(), aMemBuffer
.potentialSpaceSize());
281 aMemBuffer
.append(theBuf
.content(), size
);
282 theBuf
.consume(size
);
284 return size
; // cannot be zero if we called buf.init above
288 BodyPipe::consume(size_t size
)
290 theBuf
.consume(size
);
294 // In the AutoConsumption mode the consumer has gone but the producer continues
295 // producing data. We are using a BodySink BodyConsumer which just discards the produced data.
297 BodyPipe::enableAutoConsumption()
299 mustAutoConsume
= true;
300 debugs(91,5, HERE
<< "enabled auto consumption" << status());
301 if (!theConsumer
&& theBuf
.hasContent())
302 startAutoConsumption();
305 // start auto consumption by creating body sink
307 BodyPipe::startAutoConsumption()
309 Must(mustAutoConsume
);
311 theConsumer
= new BodySink
;
312 debugs(91,7, HERE
<< "starting auto consumption" << status());
313 scheduleBodyDataNotification();
319 assert(!isCheckedOut
);
325 BodyPipe::checkIn(Checkout
&checkout
)
327 assert(isCheckedOut
);
328 isCheckedOut
= false;
329 const size_t currentSize
= theBuf
.contentSize();
330 if (checkout
.checkedOutSize
> currentSize
)
331 postConsume(checkout
.checkedOutSize
- currentSize
);
332 else if (checkout
.checkedOutSize
< currentSize
)
333 postAppend(currentSize
- checkout
.checkedOutSize
);
337 BodyPipe::undoCheckOut(Checkout
&checkout
)
339 assert(isCheckedOut
);
340 const size_t currentSize
= theBuf
.contentSize();
341 // We can only undo if size did not change, and even that carries
342 // some risk. If this becomes a problem, the code checking out
343 // raw buffers should always check them in (possibly unchanged)
344 // instead of relying on the automated undo mechanism of Checkout.
345 // The code can always use a temporary buffer to accomplish that.
346 assert(checkout
.checkedOutSize
== currentSize
);
349 // TODO: Optimize: inform consumer/producer about more data/space only if
350 // they used the data/space since we notified them last time.
353 BodyPipe::postConsume(size_t size
)
355 assert(!isCheckedOut
);
357 debugs(91,7, HERE
<< "consumed " << size
<< " bytes" << status());
358 if (mayNeedMoreData()) {
359 AsyncCall::Pointer call
= asyncCall(91, 7,
360 "BodyProducer::noteMoreBodySpaceAvailable",
361 BodyProducerDialer(theProducer
,
362 &BodyProducer::noteMoreBodySpaceAvailable
, this));
363 ScheduleCallHere(call
);
368 BodyPipe::postAppend(size_t size
)
370 assert(!isCheckedOut
);
372 debugs(91,7, HERE
<< "added " << size
<< " bytes" << status());
374 if (mustAutoConsume
&& !theConsumer
&& size
> 0)
375 startAutoConsumption();
377 // We should not consume here even if mustAutoConsume because the
378 // caller may not be ready for the data to be consumed during this call.
379 scheduleBodyDataNotification();
381 if (!mayNeedMoreData())
382 clearProducer(true); // reached end-of-body
387 BodyPipe::scheduleBodyDataNotification()
390 AsyncCall::Pointer call
= asyncCall(91, 7,
391 "BodyConsumer::noteMoreBodyDataAvailable",
392 BodyConsumerDialer(theConsumer
,
393 &BodyConsumer::noteMoreBodyDataAvailable
, this));
394 ScheduleCallHere(call
);
399 BodyPipe::scheduleBodyEndNotification()
402 if (bodySizeKnown() && bodySize() == thePutSize
) {
403 AsyncCall::Pointer call
= asyncCall(91, 7,
404 "BodyConsumer::noteBodyProductionEnded",
405 BodyConsumerDialer(theConsumer
,
406 &BodyConsumer::noteBodyProductionEnded
, this));
407 ScheduleCallHere(call
);
409 AsyncCall::Pointer call
= asyncCall(91, 7,
410 "BodyConsumer::noteBodyProducerAborted",
411 BodyConsumerDialer(theConsumer
,
412 &BodyConsumer::noteBodyProducerAborted
, this));
413 ScheduleCallHere(call
);
418 // a short temporary string describing buffer status for debugging
419 const char *BodyPipe::status() const
421 static MemBuf outputBuffer
;
422 outputBuffer
.reset();
424 outputBuffer
.append(" [", 2);
426 outputBuffer
.Printf("%"PRIu64
"<=%"PRIu64
, theGetSize
, thePutSize
);
427 if (theBodySize
>= 0)
428 outputBuffer
.Printf("<=%"PRId64
, theBodySize
);
430 outputBuffer
.append("<=?", 3);
432 outputBuffer
.Printf(" %d+%d", (int)theBuf
.contentSize(), (int)theBuf
.spaceSize());
434 outputBuffer
.Printf(" pipe%p", this);
436 outputBuffer
.Printf(" prod%p", theProducer
);
438 outputBuffer
.Printf(" cons%p", theConsumer
);
441 outputBuffer
.append(" A", 2);
443 outputBuffer
.append(" L", 2); // Locked
445 outputBuffer
.append("]", 1);
447 outputBuffer
.terminate();
449 return outputBuffer
.content();
453 /* BodyPipeCheckout */
455 BodyPipeCheckout::BodyPipeCheckout(BodyPipe
&aPipe
): pipe(aPipe
),
456 buf(aPipe
.checkOut()), offset(aPipe
.consumedSize()),
457 checkedOutSize(buf
.contentSize()), checkedIn(false)
461 BodyPipeCheckout::~BodyPipeCheckout()
464 pipe
.undoCheckOut(*this);
468 BodyPipeCheckout::checkIn()
476 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout
&c
): pipe(c
.pipe
),
477 buf(c
.buf
), offset(c
.offset
), checkedOutSize(c
.checkedOutSize
),
478 checkedIn(c
.checkedIn
)
480 assert(false); // prevent copying
484 BodyPipeCheckout::operator =(const BodyPipeCheckout
&)
486 assert(false); // prevent assignment