// data from a BodyPipe
class BodySink: public BodyConsumer
{
- bool done;
public:
- BodySink():AsyncJob("BodySink"), done(false) {}
- virtual ~BodySink() {}
+ BodySink(const BodyPipe::Pointer &bp): AsyncJob("BodySink"), body_pipe(bp) {}
+ virtual ~BodySink() { assert(!body_pipe); }
virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) {
size_t contentSize = bp->buf().contentSize();
bp->consume(contentSize);
}
virtual void noteBodyProductionEnded(BodyPipe::Pointer bp) {
- stopConsumingFrom(bp);
- done = true;
+ stopConsumingFrom(body_pipe);
}
virtual void noteBodyProducerAborted(BodyPipe::Pointer bp) {
- stopConsumingFrom(bp);
- done = true;
+ stopConsumingFrom(body_pipe);
}
- bool doneAll() const {return done && AsyncJob::doneAll();}
+ bool doneAll() const {return !body_pipe && AsyncJob::doneAll();}
+
+private:
+ BodyPipe::Pointer body_pipe; ///< the pipe we are consuming from
+
CBDATA_CLASS2(BodySink);
};
{
Must(mustAutoConsume);
Must(!theConsumer);
- theConsumer = new BodySink;
+ theConsumer = new BodySink(this);
debugs(91,7, HERE << "starting auto consumption" << status());
scheduleBodyDataNotification();
}