From: rousskov <> Date: Wed, 13 Feb 2008 06:44:35 +0000 (+0000) Subject: Merging async-call branch changes to HEAD: X-Git-Tag: BASIC_TPROXY4~100 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=e7352f30766f60d42bd46cdb8892efbb36d8d653;p=thirdparty%2Fsquid.git Merging async-call branch changes to HEAD: BodyPipe now uses the new job calls interface: - Instead of scheduling BodyPipe calls we are scheduling BodyProducer and BodyCosnumer calls - BodyProducer/BodyConsumer::noteXXXXXX(BodyPipe &bp) methods converted to Body*::noteXXXXXX(BodyPipe::Pointer) to allow them used with new async calls interface. - BodyPipe::tell* methods and related AsyncCallWrappers removed. - Implement the CallNoteBodyProducer and CallNoteBodyConsumer JobCall based classes. These classes used too schedule calls for BodyConsumer and BodyProducer respectivelly. They are like the normal JobCalls but aditionally checks if the BodyConsumer and BodyProducer is still part of the pipe. - Implement the NullBodyConsumer class as a BodyConsumer child which just reads the data from pipe and discard them. This class used to replace old AutoConsume mechanism. - Removing the ICAPXaction_{Enter,Exit} calls from ICAPModXact::note* methods Their logic implemented inside the new async calls code. Remove the old "late call avoidance" code from BodyPipe as no longer needed. --- diff --git a/src/BodyPipe.cc b/src/BodyPipe.cc index 71c12b00c1..528cf68caa 100644 --- a/src/BodyPipe.cc +++ b/src/BodyPipe.cc @@ -4,6 +4,95 @@ CBDATA_CLASS_INIT(BodyPipe); +// BodySink is a BodyConsumer class which just consume and drops +// data from a BodyPipe +class BodySink: public BodyConsumer { + bool done; +public: + BodySink():AsyncJob("BodySink"), done(false){} + virtual ~BodySink() {} + + virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) { + size_t contentSize = bp->buf().contentSize(); + bp->consume(contentSize); + } + virtual void noteBodyProductionEnded(BodyPipe::Pointer bp) { + stopConsumingFrom(bp); + done = true; + } + virtual void noteBodyProducerAborted(BodyPipe::Pointer bp) { + stopConsumingFrom(bp); + done = true; + } + bool doneAll() const {return done && AsyncJob::doneAll();} + CBDATA_CLASS2(BodySink); +}; + +CBDATA_CLASS_INIT(BodySink); + +// The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls. +// In addition to a normal AsyncCall checks if the BodyProducer is still the producer of +// the BodyPipe passed as argument +class BodyProducerDialer: public UnaryMemFunT +{ +public: + typedef UnaryMemFunT Parent; + + BodyProducerDialer(BodyProducer *aProducer, Parent::Method aHandler, + BodyPipe::Pointer bp): Parent(aProducer, aHandler, bp) {} + + virtual bool canDial(AsyncCall &call); +}; + +// The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls. +// In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient +// of the BodyPipe passed as argument +class BodyConsumerDialer: public UnaryMemFunT +{ +public: + typedef UnaryMemFunT Parent; + + BodyConsumerDialer(BodyConsumer *aConsumer, Parent::Method aHandler, + BodyPipe::Pointer bp): Parent(aConsumer, aHandler, bp) {} + + virtual bool canDial(AsyncCall &call); +}; + +bool +BodyProducerDialer::canDial(AsyncCall &call) { + if (!Parent::canDial(call)) + return false; + + BodyProducer *producer = object; + BodyPipe::Pointer pipe = arg1; + if (!pipe->stillProducing(producer)) { + debugs(call.debugSection, call.debugLevel, HERE << producer << + " no longer producing for " << pipe->status()); + return call.cancel("no longer producing"); + } + + return true; +} + +bool +BodyConsumerDialer::canDial(AsyncCall &call) { + if (!Parent::canDial(call)) + return false; + + BodyConsumer *consumer = object; + BodyPipe::Pointer pipe = arg1; + if (!pipe->stillConsuming(consumer)) { + debugs(call.debugSection, call.debugLevel, HERE << consumer << + " no longer consuming from " << pipe->status()); + return call.cancel("no longer consuming"); + } + + return true; +} + + +/* BodyProducer */ + // inform the pipe that we are done and clear the Pointer void BodyProducer::stopProducingFor(RefCount &pipe, bool atEof) { @@ -14,6 +103,10 @@ void BodyProducer::stopProducingFor(RefCount &pipe, bool atEof) pipe = NULL; } + + +/* BodyConsumer */ + // inform the pipe that we are done and clear the Pointer void BodyConsumer::stopConsumingFrom(RefCount &pipe) { @@ -23,11 +116,12 @@ void BodyConsumer::stopConsumingFrom(RefCount &pipe) pipe = NULL; } + /* BodyPipe */ BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1), theProducer(aProducer), theConsumer(0), - thePutSize(0), theGetSize(0), theCCallsPending(0), theCCallsToSkip(0), + thePutSize(0), theGetSize(0), mustAutoConsume(false), isCheckedOut(false) { // TODO: teach MemBuf to start with zero minSize @@ -152,9 +246,13 @@ BodyPipe::clearConsumer() { if (theConsumer) { debugs(91,7, HERE << "clearing consumer" << status()); theConsumer = NULL; - theCCallsToSkip = theCCallsPending; // skip all pending consumer calls - if (consumedSize() && !exhausted()) - AsyncCall(91,5, this, BodyPipe::tellBodyConsumerAborted); + if (consumedSize() && !exhausted()) { + AsyncCall::Pointer call= asyncCall(91, 7, + "BodyProducer::noteBodyConsumerAborted", + BodyProducerDialer(theProducer, + &BodyProducer::noteBodyConsumerAborted, this)); + ScheduleCallHere(call); + } } } @@ -180,12 +278,16 @@ BodyPipe::consume(size_t size) postConsume(size); } +// In the AutoConsumption mode the consumer has gone but the producer continues +// producing data. We are using a BodySink BodyConsumer which just discards the produced data. void BodyPipe::enableAutoConsumption() { mustAutoConsume = true; debugs(91,5, HERE << "enabled auto consumption" << status()); - if (!theConsumer && theBuf.hasContent()) + if (!theConsumer && theBuf.hasContent()){ + theConsumer = new BodySink; scheduleBodyDataNotification(); + } } MemBuf & @@ -229,8 +331,13 @@ BodyPipe::postConsume(size_t size) { assert(!isCheckedOut); theGetSize += size; debugs(91,7, HERE << "consumed " << size << " bytes" << status()); - if (mayNeedMoreData()) - AsyncCall(91,5, this, BodyPipe::tellMoreBodySpaceAvailable); + if (mayNeedMoreData()){ + AsyncCall::Pointer call= asyncCall(91, 7, + "BodyProducer::noteMoreBodySpaceAvailable", + BodyProducerDialer(theProducer, + &BodyProducer::noteMoreBodySpaceAvailable, this)); + ScheduleCallHere(call); + } } void @@ -252,8 +359,11 @@ void BodyPipe::scheduleBodyDataNotification() { if (theConsumer || mustAutoConsume) { - ++theCCallsPending; - AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable); + AsyncCall::Pointer call = asyncCall(91, 7, + "BodyConsumer::noteMoreBodyDataAvailable", + BodyConsumerDialer(theConsumer, + &BodyConsumer::noteMoreBodyDataAvailable, this)); + ScheduleCallHere(call); } } @@ -261,70 +371,23 @@ void BodyPipe::scheduleBodyEndNotification() { if (theConsumer) { - ++theCCallsPending; - if (bodySizeKnown() && bodySize() == thePutSize) - AsyncCall(91,5, this, BodyPipe::tellBodyProductionEnded); - else - AsyncCall(91,5, this, BodyPipe::tellBodyProducerAborted); + if (bodySizeKnown() && bodySize() == thePutSize) { + AsyncCall::Pointer call = asyncCall(91, 7, + "BodyConsumer::noteBodyProductionEnded", + BodyConsumerDialer(theConsumer, + &BodyConsumer::noteBodyProductionEnded, this)); + ScheduleCallHere(call); + } + else { + AsyncCall::Pointer call = asyncCall(91, 7, + "BodyConsumer::noteBodyProducerAborted", + BodyConsumerDialer(theConsumer, + &BodyConsumer::noteBodyProducerAborted, this)); + ScheduleCallHere(call); + } } } -void BodyPipe::tellMoreBodySpaceAvailable() -{ - if (theProducer != NULL) - theProducer->noteMoreBodySpaceAvailable(*this); -} - -void BodyPipe::tellBodyConsumerAborted() -{ - if (theProducer != NULL) - theProducer->noteBodyConsumerAborted(*this); -} - -void BodyPipe::tellMoreBodyDataAvailable() -{ - if (skipCCall()) - return; - - if (theConsumer != NULL) - theConsumer->noteMoreBodyDataAvailable(*this); - else - if (mustAutoConsume && theBuf.hasContent()) - consume(theBuf.contentSize()); -} - -void BodyPipe::tellBodyProductionEnded() -{ - if (skipCCall()) - return; - - if (theConsumer != NULL) - theConsumer->noteBodyProductionEnded(*this); -} - -void BodyPipe::tellBodyProducerAborted() -{ - if (skipCCall()) - return; - - if (theConsumer != NULL) - theConsumer->noteBodyProducerAborted(*this); -} - -// skips calls destined for the previous consumer; see BodyPipe::clearConsumer -bool BodyPipe::skipCCall() -{ - assert(theCCallsPending > 0); - --theCCallsPending; - - if (theCCallsToSkip <= 0) - return false; - - --theCCallsToSkip; - debugs(91,5, HERE << "skipped call"); - return true; -} - // a short temporary string describing buffer status for debugging const char *BodyPipe::status() const { diff --git a/src/BodyPipe.h b/src/BodyPipe.h index 6ff0d3d2bd..4a2c58ab54 100644 --- a/src/BodyPipe.h +++ b/src/BodyPipe.h @@ -4,18 +4,20 @@ #include "MemBuf.h" #include "AsyncCall.h" +#include "ICAP/AsyncJob.h" class BodyPipe; // Interface for those who want to produce body content for others. // BodyProducer is expected to create the BodyPipe. // One pipe cannot have more than one producer. -class BodyProducer { +class BodyProducer: virtual public AsyncJob { public: + BodyProducer():AsyncJob("BodyProducer"){} virtual ~BodyProducer() {} - virtual void noteMoreBodySpaceAvailable(BodyPipe &bp) = 0; - virtual void noteBodyConsumerAborted(BodyPipe &bp) = 0; + virtual void noteMoreBodySpaceAvailable(RefCount bp) = 0; + virtual void noteBodyConsumerAborted(RefCount bp) = 0; protected: void stopProducingFor(RefCount &pipe, bool atEof); @@ -25,13 +27,14 @@ class BodyProducer { // BodyConsumer is expected to register with an existing BodyPipe // by calling BodyPipe::setConsumer(). // One pipe cannot have more than one consumer. -class BodyConsumer { +class BodyConsumer: virtual public AsyncJob { public: + BodyConsumer():AsyncJob("BodyConsumer"){} virtual ~BodyConsumer() {} - virtual void noteMoreBodyDataAvailable(BodyPipe &bp) = 0; - virtual void noteBodyProductionEnded(BodyPipe &bp) = 0; - virtual void noteBodyProducerAborted(BodyPipe &bp) = 0; + virtual void noteMoreBodyDataAvailable(RefCount bp) = 0; + virtual void noteBodyProductionEnded(RefCount bp) = 0; + virtual void noteBodyProducerAborted(RefCount bp) = 0; protected: void stopConsumingFrom(RefCount &pipe); @@ -93,6 +96,7 @@ class BodyPipe: public RefCountable { bool mayNeedMoreData() const { return !bodySizeKnown() || needsMoreData(); } bool needsMoreData() const { return bodySizeKnown() && unproducedSize() > 0; } uint64_t unproducedSize() const; // size of still unproduced data + bool stillProducing(Producer *producer) const { return theProducer == producer; } // called by consumers bool setConsumerIfNotLate(Consumer *aConsumer); @@ -101,6 +105,7 @@ class BodyPipe: public RefCountable { void consume(size_t size); bool expectMoreAfter(uint64_t offset) const; bool exhausted() const; // saw eof/abort and all data consumed + bool stillConsuming(Consumer *consumer) const { return theConsumer == consumer; } // start or continue consuming when there is no consumer void enableAutoConsumption(); @@ -122,26 +127,6 @@ class BodyPipe: public RefCountable { void postConsume(size_t size); void postAppend(size_t size); - bool skipCCall(); // decides whether to skip the call, updates counters - - public: /* public to enable callbacks, but treat as private */ - - /* these methods are calling producer and sibscriber note*() - * callbacks with this BodyPipe as a parameter, which allows - * a single producer or consumer to support multiple pipes. */ - - void tellMoreBodySpaceAvailable(); - void tellBodyConsumerAborted(); - void tellMoreBodyDataAvailable(); - void tellBodyProductionEnded(); - void tellBodyProducerAborted(); - - AsyncCallWrapper(91,5, BodyPipe, tellMoreBodySpaceAvailable); - AsyncCallWrapper(91,5, BodyPipe, tellBodyConsumerAborted); - AsyncCallWrapper(91,5, BodyPipe, tellMoreBodyDataAvailable); - AsyncCallWrapper(91,5, BodyPipe, tellBodyProductionEnded); - AsyncCallWrapper(91,5, BodyPipe, tellBodyProducerAborted); - private: int64_t theBodySize; // expected total content length, if known Producer *theProducer; // content producer, if any @@ -150,9 +135,6 @@ class BodyPipe: public RefCountable { uint64_t thePutSize; // ever-increasing total uint64_t theGetSize; // ever-increasing total - int theCCallsPending; // outstanding calls to the consumer - int theCCallsToSkip; // how many calls to the consumer we should skip - MemBuf theBuf; // produced but not yet consumed content, if any bool mustAutoConsume; // consume when there is no consumer