CBDATA_CLASS_INIT(BodyPipe);
+// BodySink is a BodyConsumer class which just consume and drops
+// data from a BodyPipe
+class BodySink: public BodyConsumer {
+ bool done;
+public:
+ BodySink():AsyncJob("BodySink"), done(false){}
+ virtual ~BodySink() {}
+
+ virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) {
+ size_t contentSize = bp->buf().contentSize();
+ bp->consume(contentSize);
+ }
+ virtual void noteBodyProductionEnded(BodyPipe::Pointer bp) {
+ stopConsumingFrom(bp);
+ done = true;
+ }
+ virtual void noteBodyProducerAborted(BodyPipe::Pointer bp) {
+ stopConsumingFrom(bp);
+ done = true;
+ }
+ bool doneAll() const {return done && AsyncJob::doneAll();}
+ CBDATA_CLASS2(BodySink);
+};
+
+CBDATA_CLASS_INIT(BodySink);
+
+// The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
+// In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
+// the BodyPipe passed as argument
+class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
+{
+public:
+ typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
+
+ BodyProducerDialer(BodyProducer *aProducer, Parent::Method aHandler,
+ BodyPipe::Pointer bp): Parent(aProducer, aHandler, bp) {}
+
+ virtual bool canDial(AsyncCall &call);
+};
+
+// The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
+// In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
+// of the BodyPipe passed as argument
+class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
+{
+public:
+ typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
+
+ BodyConsumerDialer(BodyConsumer *aConsumer, Parent::Method aHandler,
+ BodyPipe::Pointer bp): Parent(aConsumer, aHandler, bp) {}
+
+ virtual bool canDial(AsyncCall &call);
+};
+
+bool
+BodyProducerDialer::canDial(AsyncCall &call) {
+ if (!Parent::canDial(call))
+ return false;
+
+ BodyProducer *producer = object;
+ BodyPipe::Pointer pipe = arg1;
+ if (!pipe->stillProducing(producer)) {
+ debugs(call.debugSection, call.debugLevel, HERE << producer <<
+ " no longer producing for " << pipe->status());
+ return call.cancel("no longer producing");
+ }
+
+ return true;
+}
+
+bool
+BodyConsumerDialer::canDial(AsyncCall &call) {
+ if (!Parent::canDial(call))
+ return false;
+
+ BodyConsumer *consumer = object;
+ BodyPipe::Pointer pipe = arg1;
+ if (!pipe->stillConsuming(consumer)) {
+ debugs(call.debugSection, call.debugLevel, HERE << consumer <<
+ " no longer consuming from " << pipe->status());
+ return call.cancel("no longer consuming");
+ }
+
+ return true;
+}
+
+
+/* BodyProducer */
+
// inform the pipe that we are done and clear the Pointer
void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
{
pipe = NULL;
}
+
+
+/* BodyConsumer */
+
// inform the pipe that we are done and clear the Pointer
void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
{
pipe = NULL;
}
+
/* BodyPipe */
BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
theProducer(aProducer), theConsumer(0),
- thePutSize(0), theGetSize(0), theCCallsPending(0), theCCallsToSkip(0),
+ thePutSize(0), theGetSize(0),
mustAutoConsume(false), isCheckedOut(false)
{
// TODO: teach MemBuf to start with zero minSize
if (theConsumer) {
debugs(91,7, HERE << "clearing consumer" << status());
theConsumer = NULL;
- theCCallsToSkip = theCCallsPending; // skip all pending consumer calls
- if (consumedSize() && !exhausted())
- AsyncCall(91,5, this, BodyPipe::tellBodyConsumerAborted);
+ if (consumedSize() && !exhausted()) {
+ AsyncCall::Pointer call= asyncCall(91, 7,
+ "BodyProducer::noteBodyConsumerAborted",
+ BodyProducerDialer(theProducer,
+ &BodyProducer::noteBodyConsumerAborted, this));
+ ScheduleCallHere(call);
+ }
}
}
postConsume(size);
}
+// In the AutoConsumption mode the consumer has gone but the producer continues
+// producing data. We are using a BodySink BodyConsumer which just discards the produced data.
void
BodyPipe::enableAutoConsumption() {
mustAutoConsume = true;
debugs(91,5, HERE << "enabled auto consumption" << status());
- if (!theConsumer && theBuf.hasContent())
+ if (!theConsumer && theBuf.hasContent()){
+ theConsumer = new BodySink;
scheduleBodyDataNotification();
+ }
}
MemBuf &
assert(!isCheckedOut);
theGetSize += size;
debugs(91,7, HERE << "consumed " << size << " bytes" << status());
- if (mayNeedMoreData())
- AsyncCall(91,5, this, BodyPipe::tellMoreBodySpaceAvailable);
+ if (mayNeedMoreData()){
+ AsyncCall::Pointer call= asyncCall(91, 7,
+ "BodyProducer::noteMoreBodySpaceAvailable",
+ BodyProducerDialer(theProducer,
+ &BodyProducer::noteMoreBodySpaceAvailable, this));
+ ScheduleCallHere(call);
+ }
}
void
BodyPipe::scheduleBodyDataNotification()
{
if (theConsumer || mustAutoConsume) {
- ++theCCallsPending;
- AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable);
+ AsyncCall::Pointer call = asyncCall(91, 7,
+ "BodyConsumer::noteMoreBodyDataAvailable",
+ BodyConsumerDialer(theConsumer,
+ &BodyConsumer::noteMoreBodyDataAvailable, this));
+ ScheduleCallHere(call);
}
}
BodyPipe::scheduleBodyEndNotification()
{
if (theConsumer) {
- ++theCCallsPending;
- if (bodySizeKnown() && bodySize() == thePutSize)
- AsyncCall(91,5, this, BodyPipe::tellBodyProductionEnded);
- else
- AsyncCall(91,5, this, BodyPipe::tellBodyProducerAborted);
+ if (bodySizeKnown() && bodySize() == thePutSize) {
+ AsyncCall::Pointer call = asyncCall(91, 7,
+ "BodyConsumer::noteBodyProductionEnded",
+ BodyConsumerDialer(theConsumer,
+ &BodyConsumer::noteBodyProductionEnded, this));
+ ScheduleCallHere(call);
+ }
+ else {
+ AsyncCall::Pointer call = asyncCall(91, 7,
+ "BodyConsumer::noteBodyProducerAborted",
+ BodyConsumerDialer(theConsumer,
+ &BodyConsumer::noteBodyProducerAborted, this));
+ ScheduleCallHere(call);
+ }
}
}
-void BodyPipe::tellMoreBodySpaceAvailable()
-{
- if (theProducer != NULL)
- theProducer->noteMoreBodySpaceAvailable(*this);
-}
-
-void BodyPipe::tellBodyConsumerAborted()
-{
- if (theProducer != NULL)
- theProducer->noteBodyConsumerAborted(*this);
-}
-
-void BodyPipe::tellMoreBodyDataAvailable()
-{
- if (skipCCall())
- return;
-
- if (theConsumer != NULL)
- theConsumer->noteMoreBodyDataAvailable(*this);
- else
- if (mustAutoConsume && theBuf.hasContent())
- consume(theBuf.contentSize());
-}
-
-void BodyPipe::tellBodyProductionEnded()
-{
- if (skipCCall())
- return;
-
- if (theConsumer != NULL)
- theConsumer->noteBodyProductionEnded(*this);
-}
-
-void BodyPipe::tellBodyProducerAborted()
-{
- if (skipCCall())
- return;
-
- if (theConsumer != NULL)
- theConsumer->noteBodyProducerAborted(*this);
-}
-
-// skips calls destined for the previous consumer; see BodyPipe::clearConsumer
-bool BodyPipe::skipCCall()
-{
- assert(theCCallsPending > 0);
- --theCCallsPending;
-
- if (theCCallsToSkip <= 0)
- return false;
-
- --theCCallsToSkip;
- debugs(91,5, HERE << "skipped call");
- return true;
-}
-
// a short temporary string describing buffer status for debugging
const char *BodyPipe::status() const
{
#include "MemBuf.h"
#include "AsyncCall.h"
+#include "ICAP/AsyncJob.h"
class BodyPipe;
// Interface for those who want to produce body content for others.
// BodyProducer is expected to create the BodyPipe.
// One pipe cannot have more than one producer.
-class BodyProducer {
+class BodyProducer: virtual public AsyncJob {
public:
+ BodyProducer():AsyncJob("BodyProducer"){}
virtual ~BodyProducer() {}
- virtual void noteMoreBodySpaceAvailable(BodyPipe &bp) = 0;
- virtual void noteBodyConsumerAborted(BodyPipe &bp) = 0;
+ virtual void noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp) = 0;
+ virtual void noteBodyConsumerAborted(RefCount<BodyPipe> bp) = 0;
protected:
void stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof);
// BodyConsumer is expected to register with an existing BodyPipe
// by calling BodyPipe::setConsumer().
// One pipe cannot have more than one consumer.
-class BodyConsumer {
+class BodyConsumer: virtual public AsyncJob {
public:
+ BodyConsumer():AsyncJob("BodyConsumer"){}
virtual ~BodyConsumer() {}
- virtual void noteMoreBodyDataAvailable(BodyPipe &bp) = 0;
- virtual void noteBodyProductionEnded(BodyPipe &bp) = 0;
- virtual void noteBodyProducerAborted(BodyPipe &bp) = 0;
+ virtual void noteMoreBodyDataAvailable(RefCount<BodyPipe> bp) = 0;
+ virtual void noteBodyProductionEnded(RefCount<BodyPipe> bp) = 0;
+ virtual void noteBodyProducerAborted(RefCount<BodyPipe> bp) = 0;
protected:
void stopConsumingFrom(RefCount<BodyPipe> &pipe);
bool mayNeedMoreData() const { return !bodySizeKnown() || needsMoreData(); }
bool needsMoreData() const { return bodySizeKnown() && unproducedSize() > 0; }
uint64_t unproducedSize() const; // size of still unproduced data
+ bool stillProducing(Producer *producer) const { return theProducer == producer; }
// called by consumers
bool setConsumerIfNotLate(Consumer *aConsumer);
void consume(size_t size);
bool expectMoreAfter(uint64_t offset) const;
bool exhausted() const; // saw eof/abort and all data consumed
+ bool stillConsuming(Consumer *consumer) const { return theConsumer == consumer; }
// start or continue consuming when there is no consumer
void enableAutoConsumption();
void postConsume(size_t size);
void postAppend(size_t size);
- bool skipCCall(); // decides whether to skip the call, updates counters
-
- public: /* public to enable callbacks, but treat as private */
-
- /* these methods are calling producer and sibscriber note*()
- * callbacks with this BodyPipe as a parameter, which allows
- * a single producer or consumer to support multiple pipes. */
-
- void tellMoreBodySpaceAvailable();
- void tellBodyConsumerAborted();
- void tellMoreBodyDataAvailable();
- void tellBodyProductionEnded();
- void tellBodyProducerAborted();
-
- AsyncCallWrapper(91,5, BodyPipe, tellMoreBodySpaceAvailable);
- AsyncCallWrapper(91,5, BodyPipe, tellBodyConsumerAborted);
- AsyncCallWrapper(91,5, BodyPipe, tellMoreBodyDataAvailable);
- AsyncCallWrapper(91,5, BodyPipe, tellBodyProductionEnded);
- AsyncCallWrapper(91,5, BodyPipe, tellBodyProducerAborted);
-
private:
int64_t theBodySize; // expected total content length, if known
Producer *theProducer; // content producer, if any
uint64_t thePutSize; // ever-increasing total
uint64_t theGetSize; // ever-increasing total
- int theCCallsPending; // outstanding calls to the consumer
- int theCCallsToSkip; // how many calls to the consumer we should skip
-
MemBuf theBuf; // produced but not yet consumed content, if any
bool mustAutoConsume; // consume when there is no consumer