4 #include "TextException.h"
6 CBDATA_CLASS_INIT(BodyPipe
);
8 // BodySink is a BodyConsumer class which just consume and drops
9 // data from a BodyPipe
10 class BodySink
: public BodyConsumer
{
13 BodySink():AsyncJob("BodySink"), done(false){}
14 virtual ~BodySink() {}
16 virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp
) {
17 size_t contentSize
= bp
->buf().contentSize();
18 bp
->consume(contentSize
);
20 virtual void noteBodyProductionEnded(BodyPipe::Pointer bp
) {
21 stopConsumingFrom(bp
);
24 virtual void noteBodyProducerAborted(BodyPipe::Pointer bp
) {
25 stopConsumingFrom(bp
);
28 bool doneAll() const {return done
&& AsyncJob::doneAll();}
29 CBDATA_CLASS2(BodySink
);
32 CBDATA_CLASS_INIT(BodySink
);
34 // The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
35 // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
36 // the BodyPipe passed as argument
37 class BodyProducerDialer
: public UnaryMemFunT
<BodyProducer
, BodyPipe::Pointer
>
40 typedef UnaryMemFunT
<BodyProducer
, BodyPipe::Pointer
> Parent
;
42 BodyProducerDialer(BodyProducer
*aProducer
, Parent::Method aHandler
,
43 BodyPipe::Pointer bp
): Parent(aProducer
, aHandler
, bp
) {}
45 virtual bool canDial(AsyncCall
&call
);
48 // The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
49 // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
50 // of the BodyPipe passed as argument
51 class BodyConsumerDialer
: public UnaryMemFunT
<BodyConsumer
, BodyPipe::Pointer
>
54 typedef UnaryMemFunT
<BodyConsumer
, BodyPipe::Pointer
> Parent
;
56 BodyConsumerDialer(BodyConsumer
*aConsumer
, Parent::Method aHandler
,
57 BodyPipe::Pointer bp
): Parent(aConsumer
, aHandler
, bp
) {}
59 virtual bool canDial(AsyncCall
&call
);
63 BodyProducerDialer::canDial(AsyncCall
&call
) {
64 if (!Parent::canDial(call
))
67 BodyProducer
*producer
= object
;
68 BodyPipe::Pointer pipe
= arg1
;
69 if (!pipe
->stillProducing(producer
)) {
70 debugs(call
.debugSection
, call
.debugLevel
, HERE
<< producer
<<
71 " no longer producing for " << pipe
->status());
72 return call
.cancel("no longer producing");
79 BodyConsumerDialer::canDial(AsyncCall
&call
) {
80 if (!Parent::canDial(call
))
83 BodyConsumer
*consumer
= object
;
84 BodyPipe::Pointer pipe
= arg1
;
85 if (!pipe
->stillConsuming(consumer
)) {
86 debugs(call
.debugSection
, call
.debugLevel
, HERE
<< consumer
<<
87 " no longer consuming from " << pipe
->status());
88 return call
.cancel("no longer consuming");
97 // inform the pipe that we are done and clear the Pointer
98 void BodyProducer::stopProducingFor(RefCount
<BodyPipe
> &pipe
, bool atEof
)
100 debugs(91,7, HERE
<< this << " will not produce for " << pipe
<<
101 "; atEof: " << atEof
);
102 assert(pipe
!= NULL
); // be strict: the caller state may depend on this
103 pipe
->clearProducer(atEof
);
111 // inform the pipe that we are done and clear the Pointer
112 void BodyConsumer::stopConsumingFrom(RefCount
<BodyPipe
> &pipe
)
114 debugs(91,7, HERE
<< this << " will not consume from " << pipe
);
115 assert(pipe
!= NULL
); // be strict: the caller state may depend on this
116 pipe
->clearConsumer();
123 BodyPipe::BodyPipe(Producer
*aProducer
): theBodySize(-1),
124 theProducer(aProducer
), theConsumer(0),
125 thePutSize(0), theGetSize(0),
126 mustAutoConsume(false), isCheckedOut(false)
128 // TODO: teach MemBuf to start with zero minSize
129 // TODO: limit maxSize by theBodySize, when known?
130 theBuf
.init(2*1024, MaxCapacity
);
131 debugs(91,7, HERE
<< "created BodyPipe" << status());
134 BodyPipe::~BodyPipe()
136 debugs(91,7, HERE
<< "destroying BodyPipe" << status());
137 assert(!theProducer
);
138 assert(!theConsumer
);
142 void BodyPipe::setBodySize(uint64_t aBodySize
)
144 assert(!bodySizeKnown());
145 assert(aBodySize
>= 0);
146 assert(thePutSize
<= aBodySize
);
148 // If this assert fails, we need to add code to check for eof and inform
149 // the consumer about the eof condition via scheduleBodyEndNotification,
150 // because just setting a body size limit may trigger the eof condition.
151 assert(!theConsumer
);
153 theBodySize
= aBodySize
;
154 debugs(91,7, HERE
<< "set body size" << status());
157 uint64_t BodyPipe::bodySize() const
159 assert(bodySizeKnown());
160 return static_cast<uint64_t>(theBodySize
);
163 bool BodyPipe::expectMoreAfter(uint64_t offset
) const
165 assert(theGetSize
<= offset
);
166 return offset
< thePutSize
|| // buffer has more now or
167 (!productionEnded() && mayNeedMoreData()); // buffer will have more
170 bool BodyPipe::exhausted() const
172 return !expectMoreAfter(theGetSize
);
175 uint64_t BodyPipe::unproducedSize() const
177 return bodySize() - thePutSize
; // bodySize() asserts that size is known
181 BodyPipe::clearProducer(bool atEof
)
184 debugs(91,7, HERE
<< "clearing BodyPipe producer" << status());
187 if (!bodySizeKnown())
188 theBodySize
= thePutSize
;
190 if (bodySize() != thePutSize
)
191 debugs(91,3, HERE
<< "aborting on premature eof" << status());
193 // asserta that we can detect the abort if the consumer joins later
194 assert(!bodySizeKnown() || bodySize() != thePutSize
);
196 scheduleBodyEndNotification();
201 BodyPipe::putMoreData(const char *buf
, size_t size
)
204 size
= XMIN((uint64_t)size
, unproducedSize());
206 const size_t spaceSize
= static_cast<size_t>(theBuf
.potentialSpaceSize());
207 if ((size
= XMIN(size
, spaceSize
))) {
208 theBuf
.append(buf
, size
);
216 BodyPipe::setConsumerIfNotLate(Consumer
*aConsumer
)
218 assert(!theConsumer
);
221 // TODO: convert this into an exception and remove IfNotLate suffix
222 // If there is something consumed already, we are in an auto-consuming mode
223 // and it is too late to attach a real consumer to the pipe.
224 if (theGetSize
> 0) {
225 assert(mustAutoConsume
);
229 theConsumer
= aConsumer
;
230 debugs(91,7, HERE
<< "set consumer" << status());
231 if (theBuf
.hasContent())
232 scheduleBodyDataNotification();
234 scheduleBodyEndNotification();
239 // When BodyPipe consumer is gone, all events for that consumer must not
240 // reach the new consumer (if any). Otherwise, the calls may go out of order
241 // (if _some_ calls are dropped due to the ultimate destination being
242 // temporary NULL). The code keeps track of the number of outstanding
243 // events and skips that number if consumer leaves. TODO: when AscyncCall
244 // support is improved, should we just schedule calls directly to consumer?
246 BodyPipe::clearConsumer() {
248 debugs(91,7, HERE
<< "clearing consumer" << status());
250 if (consumedSize() && !exhausted()) {
251 AsyncCall::Pointer call
= asyncCall(91, 7,
252 "BodyProducer::noteBodyConsumerAborted",
253 BodyProducerDialer(theProducer
,
254 &BodyProducer::noteBodyConsumerAborted
, this));
255 ScheduleCallHere(call
);
261 BodyPipe::getMoreData(MemBuf
&buf
)
263 if (!theBuf
.hasContent())
264 return 0; // did not touch the possibly uninitialized buf
268 const size_t size
= XMIN(theBuf
.contentSize(), buf
.potentialSpaceSize());
269 buf
.append(theBuf
.content(), size
);
270 theBuf
.consume(size
);
272 return size
; // cannot be zero if we called buf.init above
276 BodyPipe::consume(size_t size
)
278 theBuf
.consume(size
);
282 // In the AutoConsumption mode the consumer has gone but the producer continues
283 // producing data. We are using a BodySink BodyConsumer which just discards the produced data.
285 BodyPipe::enableAutoConsumption() {
286 mustAutoConsume
= true;
287 debugs(91,5, HERE
<< "enabled auto consumption" << status());
288 if (!theConsumer
&& theBuf
.hasContent())
289 startAutoConsumption();
292 // start auto consumption by creating body sink
294 BodyPipe::startAutoConsumption()
296 Must(mustAutoConsume
);
298 theConsumer
= new BodySink
;
299 debugs(91,7, HERE
<< "starting auto consumption" << status());
300 scheduleBodyDataNotification();
304 BodyPipe::checkOut() {
305 assert(!isCheckedOut
);
311 BodyPipe::checkIn(Checkout
&checkout
)
313 assert(isCheckedOut
);
314 isCheckedOut
= false;
315 const size_t currentSize
= theBuf
.contentSize();
316 if (checkout
.checkedOutSize
> currentSize
)
317 postConsume(checkout
.checkedOutSize
- currentSize
);
319 if (checkout
.checkedOutSize
< currentSize
)
320 postAppend(currentSize
- checkout
.checkedOutSize
);
324 BodyPipe::undoCheckOut(Checkout
&checkout
)
326 assert(isCheckedOut
);
327 const size_t currentSize
= theBuf
.contentSize();
328 // We can only undo if size did not change, and even that carries
329 // some risk. If this becomes a problem, the code checking out
330 // raw buffers should always check them in (possibly unchanged)
331 // instead of relying on the automated undo mechanism of Checkout.
332 // The code can always use a temporary buffer to accomplish that.
333 assert(checkout
.checkedOutSize
== currentSize
);
336 // TODO: Optimize: inform consumer/producer about more data/space only if
337 // they used the data/space since we notified them last time.
340 BodyPipe::postConsume(size_t size
) {
341 assert(!isCheckedOut
);
343 debugs(91,7, HERE
<< "consumed " << size
<< " bytes" << status());
344 if (mayNeedMoreData()){
345 AsyncCall::Pointer call
= asyncCall(91, 7,
346 "BodyProducer::noteMoreBodySpaceAvailable",
347 BodyProducerDialer(theProducer
,
348 &BodyProducer::noteMoreBodySpaceAvailable
, this));
349 ScheduleCallHere(call
);
354 BodyPipe::postAppend(size_t size
) {
355 assert(!isCheckedOut
);
357 debugs(91,7, HERE
<< "added " << size
<< " bytes" << status());
359 if (mustAutoConsume
&& !theConsumer
&& size
> 0)
360 startAutoConsumption();
362 // We should not consume here even if mustAutoConsume because the
363 // caller may not be ready for the data to be consumed during this call.
364 scheduleBodyDataNotification();
366 if (!mayNeedMoreData())
367 clearProducer(true); // reached end-of-body
372 BodyPipe::scheduleBodyDataNotification()
375 AsyncCall::Pointer call
= asyncCall(91, 7,
376 "BodyConsumer::noteMoreBodyDataAvailable",
377 BodyConsumerDialer(theConsumer
,
378 &BodyConsumer::noteMoreBodyDataAvailable
, this));
379 ScheduleCallHere(call
);
384 BodyPipe::scheduleBodyEndNotification()
387 if (bodySizeKnown() && bodySize() == thePutSize
) {
388 AsyncCall::Pointer call
= asyncCall(91, 7,
389 "BodyConsumer::noteBodyProductionEnded",
390 BodyConsumerDialer(theConsumer
,
391 &BodyConsumer::noteBodyProductionEnded
, this));
392 ScheduleCallHere(call
);
395 AsyncCall::Pointer call
= asyncCall(91, 7,
396 "BodyConsumer::noteBodyProducerAborted",
397 BodyConsumerDialer(theConsumer
,
398 &BodyConsumer::noteBodyProducerAborted
, this));
399 ScheduleCallHere(call
);
404 // a short temporary string describing buffer status for debugging
405 const char *BodyPipe::status() const
412 buf
.Printf("%"PRIu64
"<=%"PRIu64
, theGetSize
, thePutSize
);
413 if (theBodySize
>= 0)
414 buf
.Printf("<=%"PRId64
, theBodySize
);
416 buf
.append("<=?", 3);
418 buf
.Printf(" %d+%d", (int)theBuf
.contentSize(), (int)theBuf
.spaceSize());
420 buf
.Printf(" pipe%p", this);
422 buf
.Printf(" prod%p", theProducer
);
424 buf
.Printf(" cons%p", theConsumer
);
429 buf
.append(" L", 2); // Locked
435 return buf
.content();
439 /* BodyPipeCheckout */
441 BodyPipeCheckout::BodyPipeCheckout(BodyPipe
&aPipe
): pipe(aPipe
),
442 buf(aPipe
.checkOut()), offset(aPipe
.consumedSize()),
443 checkedOutSize(buf
.contentSize()), checkedIn(false)
447 BodyPipeCheckout::~BodyPipeCheckout()
450 pipe
.undoCheckOut(*this);
454 BodyPipeCheckout::checkIn()
462 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout
&c
): pipe(c
.pipe
),
463 buf(c
.buf
), offset(c
.offset
), checkedOutSize(c
.checkedOutSize
),
464 checkedIn(c
.checkedIn
)
466 assert(false); // prevent copying
470 BodyPipeCheckout::operator =(const BodyPipeCheckout
&)
472 assert(false); // prevent assignment