]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Merging async-call branch changes to HEAD:
authorrousskov <>
Wed, 13 Feb 2008 06:44:35 +0000 (06:44 +0000)
committerrousskov <>
Wed, 13 Feb 2008 06:44:35 +0000 (06:44 +0000)
BodyPipe now uses the new job calls interface:
 - Instead of scheduling BodyPipe calls we are scheduling BodyProducer
   and BodyCosnumer calls
 - BodyProducer/BodyConsumer::noteXXXXXX(BodyPipe &bp) methods converted
   to Body*::noteXXXXXX(BodyPipe::Pointer) to allow them used with new
   async calls interface.
 - BodyPipe::tell* methods and related AsyncCallWrappers removed.
 - Implement the CallNoteBodyProducer and CallNoteBodyConsumer JobCall based
   classes. These classes used too schedule calls for BodyConsumer and
   BodyProducer respectivelly. They are like the normal JobCalls but
   aditionally checks if the BodyConsumer and BodyProducer is still part
   of the pipe.
 - Implement the NullBodyConsumer class as a BodyConsumer child which just
   reads the data from pipe and discard them. This class used to replace
   old AutoConsume mechanism.
 - Removing the ICAPXaction_{Enter,Exit} calls from ICAPModXact::note* methods
   Their logic implemented inside the new async calls code.

Remove the old "late call avoidance" code from BodyPipe as no longer needed.

src/BodyPipe.cc
src/BodyPipe.h

index 71c12b00c17278cc3a4bf8c14a5ad5173e2b5298..528cf68caa3d5d57313415addc783dcc095d0e47 100644 (file)
@@ -4,6 +4,95 @@
 
 CBDATA_CLASS_INIT(BodyPipe);
 
+// BodySink is a BodyConsumer class which  just consume and drops
+// data from a BodyPipe 
+class BodySink: public BodyConsumer {
+    bool done;
+public:
+    BodySink():AsyncJob("BodySink"), done(false){}
+    virtual ~BodySink() {}
+    
+    virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) {
+       size_t contentSize = bp->buf().contentSize();
+       bp->consume(contentSize);
+    }
+    virtual void noteBodyProductionEnded(BodyPipe::Pointer bp) {
+       stopConsumingFrom(bp);
+       done = true;
+    }
+    virtual void noteBodyProducerAborted(BodyPipe::Pointer bp) {
+       stopConsumingFrom(bp);
+       done = true;
+    }
+    bool doneAll() const {return done && AsyncJob::doneAll();}
+    CBDATA_CLASS2(BodySink);
+};
+
+CBDATA_CLASS_INIT(BodySink);
+
+// The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
+// In addition to a normal AsyncCall checks if the BodyProducer is still the producer of 
+// the BodyPipe passed as argument
+class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
+{
+public:
+       typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
+
+       BodyProducerDialer(BodyProducer *aProducer, Parent::Method aHandler,
+               BodyPipe::Pointer bp): Parent(aProducer, aHandler, bp) {}
+
+       virtual bool canDial(AsyncCall &call);
+};
+
+// The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
+// In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient 
+// of the BodyPipe passed as argument
+class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
+{
+public:
+       typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
+
+       BodyConsumerDialer(BodyConsumer *aConsumer, Parent::Method aHandler, 
+               BodyPipe::Pointer bp): Parent(aConsumer, aHandler, bp) {}
+
+       virtual bool canDial(AsyncCall &call);
+};
+
+bool
+BodyProducerDialer::canDial(AsyncCall &call) {
+       if (!Parent::canDial(call))
+               return false;
+
+       BodyProducer *producer = object;
+       BodyPipe::Pointer pipe = arg1;
+       if (!pipe->stillProducing(producer)) {
+               debugs(call.debugSection, call.debugLevel, HERE << producer <<
+                       " no longer producing for " << pipe->status());
+               return call.cancel("no longer producing");
+       }
+
+       return true;
+}
+
+bool
+BodyConsumerDialer::canDial(AsyncCall &call) {
+       if (!Parent::canDial(call))
+               return false;
+
+       BodyConsumer *consumer = object;
+       BodyPipe::Pointer pipe = arg1;
+       if (!pipe->stillConsuming(consumer)) {
+               debugs(call.debugSection, call.debugLevel, HERE << consumer <<
+                       " no longer consuming from " << pipe->status());
+               return call.cancel("no longer consuming");
+       }
+
+       return true;
+}
+
+
+/* BodyProducer */
+
 // inform the pipe that we are done and clear the Pointer
 void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
 {
@@ -14,6 +103,10 @@ void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
        pipe = NULL;
 }
 
+
+
+/* BodyConsumer */
+
 // inform the pipe that we are done and clear the Pointer
 void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
 {
@@ -23,11 +116,12 @@ void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
        pipe = NULL;
 }
 
+
 /* BodyPipe */
 
 BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
        theProducer(aProducer), theConsumer(0),
-       thePutSize(0), theGetSize(0), theCCallsPending(0), theCCallsToSkip(0),
+                                        thePutSize(0), theGetSize(0),
        mustAutoConsume(false), isCheckedOut(false)
 {
        // TODO: teach MemBuf to start with zero minSize
@@ -152,9 +246,13 @@ BodyPipe::clearConsumer() {
        if (theConsumer) {
                debugs(91,7, HERE << "clearing consumer" << status());
                theConsumer = NULL;
-               theCCallsToSkip = theCCallsPending; // skip all pending consumer calls
-               if (consumedSize() && !exhausted())
-                       AsyncCall(91,5, this, BodyPipe::tellBodyConsumerAborted);
+               if (consumedSize() && !exhausted()) {
+                       AsyncCall::Pointer call= asyncCall(91, 7,
+                               "BodyProducer::noteBodyConsumerAborted",
+                               BodyProducerDialer(theProducer,
+                                       &BodyProducer::noteBodyConsumerAborted, this));
+                       ScheduleCallHere(call);
+               }
        }
 }
 
@@ -180,12 +278,16 @@ BodyPipe::consume(size_t size)
        postConsume(size);
 }
 
+// In the AutoConsumption  mode the consumer has gone but the producer continues 
+// producing data. We are using a BodySink BodyConsumer which just discards the produced data.
 void
 BodyPipe::enableAutoConsumption() {
        mustAutoConsume = true;
        debugs(91,5, HERE << "enabled auto consumption" << status());
-       if (!theConsumer && theBuf.hasContent())
+       if (!theConsumer && theBuf.hasContent()){
+                       theConsumer = new BodySink;
                scheduleBodyDataNotification();
+       }
 }
 
 MemBuf &
@@ -229,8 +331,13 @@ BodyPipe::postConsume(size_t size) {
        assert(!isCheckedOut);
        theGetSize += size;
        debugs(91,7, HERE << "consumed " << size << " bytes" << status());
-       if (mayNeedMoreData())
-               AsyncCall(91,5, this, BodyPipe::tellMoreBodySpaceAvailable);
+       if (mayNeedMoreData()){
+               AsyncCall::Pointer call=  asyncCall(91, 7,
+                       "BodyProducer::noteMoreBodySpaceAvailable",
+                       BodyProducerDialer(theProducer,
+                               &BodyProducer::noteMoreBodySpaceAvailable, this));
+               ScheduleCallHere(call);
+       }
 }
 
 void
@@ -252,8 +359,11 @@ void
 BodyPipe::scheduleBodyDataNotification()
 {
        if (theConsumer || mustAutoConsume) {
-               ++theCCallsPending;
-               AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable);
+               AsyncCall::Pointer call = asyncCall(91, 7,
+                       "BodyConsumer::noteMoreBodyDataAvailable",
+                       BodyConsumerDialer(theConsumer,
+                               &BodyConsumer::noteMoreBodyDataAvailable, this));
+               ScheduleCallHere(call);
        }
 }
 
@@ -261,70 +371,23 @@ void
 BodyPipe::scheduleBodyEndNotification()
 {
        if (theConsumer) {
-               ++theCCallsPending;
-               if (bodySizeKnown() && bodySize() == thePutSize)
-                       AsyncCall(91,5, this, BodyPipe::tellBodyProductionEnded);
-               else
-                       AsyncCall(91,5, this, BodyPipe::tellBodyProducerAborted);
+               if (bodySizeKnown() && bodySize() == thePutSize) {
+                       AsyncCall::Pointer call = asyncCall(91, 7,
+                               "BodyConsumer::noteBodyProductionEnded",
+                               BodyConsumerDialer(theConsumer,
+                                       &BodyConsumer::noteBodyProductionEnded, this));
+                       ScheduleCallHere(call);
+               }
+               else {
+                       AsyncCall::Pointer call = asyncCall(91, 7,
+                               "BodyConsumer::noteBodyProducerAborted",
+                               BodyConsumerDialer(theConsumer,
+                                       &BodyConsumer::noteBodyProducerAborted, this));
+                       ScheduleCallHere(call);
+               }
        }
 }
 
-void BodyPipe::tellMoreBodySpaceAvailable()
-{
-       if (theProducer != NULL)
-               theProducer->noteMoreBodySpaceAvailable(*this);
-}
-
-void BodyPipe::tellBodyConsumerAborted()
-{
-       if (theProducer != NULL)
-               theProducer->noteBodyConsumerAborted(*this);
-}
-
-void BodyPipe::tellMoreBodyDataAvailable()
-{
-       if (skipCCall())
-               return;
-
-       if (theConsumer != NULL)
-               theConsumer->noteMoreBodyDataAvailable(*this);
-       else
-       if (mustAutoConsume && theBuf.hasContent())
-               consume(theBuf.contentSize());
-}
-
-void BodyPipe::tellBodyProductionEnded()
-{
-       if (skipCCall())
-               return;
-
-       if (theConsumer != NULL)
-               theConsumer->noteBodyProductionEnded(*this);
-}
-
-void BodyPipe::tellBodyProducerAborted()
-{
-       if (skipCCall())
-               return;
-
-       if (theConsumer != NULL)
-               theConsumer->noteBodyProducerAborted(*this);
-}
-
-// skips calls destined for the previous consumer; see BodyPipe::clearConsumer
-bool BodyPipe::skipCCall()
-{
-       assert(theCCallsPending > 0);
-       --theCCallsPending;
-
-       if (theCCallsToSkip <= 0)
-               return false;
-
-       --theCCallsToSkip;
-       debugs(91,5, HERE << "skipped call");
-       return true;
-}
-
 // a short temporary string describing buffer status for debugging
 const char *BodyPipe::status() const
 {
index 6ff0d3d2bd56927819bce904c9a9cb89494fc808..4a2c58ab54bcfcecc1399828e41ac13993c7e52d 100644 (file)
@@ -4,18 +4,20 @@
 
 #include "MemBuf.h"
 #include "AsyncCall.h"
+#include "ICAP/AsyncJob.h"
 
 class BodyPipe;
 
 // Interface for those who want to produce body content for others.
 // BodyProducer is expected to create the BodyPipe.
 // One pipe cannot have more than one producer.
-class BodyProducer {
+class BodyProducer: virtual public AsyncJob {
        public:
+                BodyProducer():AsyncJob("BodyProducer"){}
                virtual ~BodyProducer() {}
 
-               virtual void noteMoreBodySpaceAvailable(BodyPipe &bp) = 0;
-               virtual void noteBodyConsumerAborted(BodyPipe &bp) = 0;
+               virtual void noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp) = 0;
+               virtual void noteBodyConsumerAborted(RefCount<BodyPipe> bp) = 0;
 
        protected:
                void stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof);
@@ -25,13 +27,14 @@ class BodyProducer {
 // BodyConsumer is expected to register with an existing BodyPipe
 // by calling BodyPipe::setConsumer().
 // One pipe cannot have more than one consumer.
-class BodyConsumer {
+class BodyConsumer: virtual public AsyncJob {
        public:
+                BodyConsumer():AsyncJob("BodyConsumer"){}
                virtual ~BodyConsumer() {}
 
-               virtual void noteMoreBodyDataAvailable(BodyPipe &bp) = 0;
-               virtual void noteBodyProductionEnded(BodyPipe &bp) = 0;
-               virtual void noteBodyProducerAborted(BodyPipe &bp) = 0;
+               virtual void noteMoreBodyDataAvailable(RefCount<BodyPipe> bp) = 0;
+               virtual void noteBodyProductionEnded(RefCount<BodyPipe> bp) = 0;
+               virtual void noteBodyProducerAborted(RefCount<BodyPipe> bp) = 0;
 
        protected:
                void stopConsumingFrom(RefCount<BodyPipe> &pipe);
@@ -93,6 +96,7 @@ class BodyPipe: public RefCountable {
                bool mayNeedMoreData() const { return !bodySizeKnown() || needsMoreData(); }
                bool needsMoreData() const { return bodySizeKnown() && unproducedSize() > 0; }
                uint64_t unproducedSize() const; // size of still unproduced data
+               bool stillProducing(Producer *producer) const { return theProducer == producer; }
 
                // called by consumers
                bool setConsumerIfNotLate(Consumer *aConsumer);
@@ -101,6 +105,7 @@ class BodyPipe: public RefCountable {
                void consume(size_t size);
                bool expectMoreAfter(uint64_t offset) const;
                bool exhausted() const; // saw eof/abort and all data consumed
+               bool stillConsuming(Consumer *consumer) const { return theConsumer == consumer; }
 
                // start or continue consuming when there is no consumer
                void enableAutoConsumption();
@@ -122,26 +127,6 @@ class BodyPipe: public RefCountable {
                void postConsume(size_t size);
                void postAppend(size_t size);
 
-               bool skipCCall(); // decides whether to skip the call, updates counters
-
-       public: /* public to enable callbacks, but treat as private */
-
-               /* these methods are calling producer and sibscriber note*()
-                * callbacks with this BodyPipe as a parameter, which allows
-                * a single producer or consumer to support multiple pipes. */
-                
-               void tellMoreBodySpaceAvailable();
-               void tellBodyConsumerAborted();
-               void tellMoreBodyDataAvailable();
-               void tellBodyProductionEnded();
-               void tellBodyProducerAborted();
-
-               AsyncCallWrapper(91,5, BodyPipe, tellMoreBodySpaceAvailable);
-               AsyncCallWrapper(91,5, BodyPipe, tellBodyConsumerAborted);
-               AsyncCallWrapper(91,5, BodyPipe, tellMoreBodyDataAvailable);
-               AsyncCallWrapper(91,5, BodyPipe, tellBodyProductionEnded);
-               AsyncCallWrapper(91,5, BodyPipe, tellBodyProducerAborted);
-
        private:
                int64_t  theBodySize;   // expected total content length, if known
                Producer *theProducer; // content producer, if any
@@ -150,9 +135,6 @@ class BodyPipe: public RefCountable {
                uint64_t thePutSize; // ever-increasing total
                uint64_t theGetSize; // ever-increasing total
 
-               int theCCallsPending; // outstanding calls to the consumer
-               int theCCallsToSkip; // how many calls to the consumer we should skip
-
                MemBuf theBuf; // produced but not yet consumed content, if any
 
                bool mustAutoConsume; // consume when there is no consumer