]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Polished code by introducing two explicit and mostly independent states
authorAlex Rousskov <rousskov@measurement-factory.com>
Mon, 29 Sep 2008 03:50:32 +0000 (21:50 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Mon, 29 Sep 2008 03:50:32 +0000 (21:50 -0600)
(proxying virgin body and proxying adapted body) as well as a flag to
check for virgin body content access after the pipe was invalidated.

src/eCAP/XactionRep.cc
src/eCAP/XactionRep.h

index 73cf6ba1f85906962683caf0c8ec3dbe868838fa..4c07de8bfa7bbd60b64c69a8d33fd07438336b18 100644 (file)
@@ -18,7 +18,8 @@ Ecap::XactionRep::XactionRep(Adaptation::Initiator *anInitiator,
     const Adaptation::ServicePointer &aService):
     AsyncJob("Ecap::XactionRep"),
     Adaptation::Initiate("Ecap::XactionRep", anInitiator, aService),
-    theVirginRep(virginHeader), theCauseRep(NULL)
+    theVirginRep(virginHeader), theCauseRep(NULL),
+    proxyingVb(opUndecided), proxyingAb(opUndecided), canAccessVb(false)
 {
     if (virginCause)
         theCauseRep = new MessageRep(virginCause);
@@ -44,10 +45,10 @@ Ecap::XactionRep::start()
 {
     Must(theMaster);
 
-    // register as a consumer if there is a body
-    // we do not actually consume unless the adapter tells us to
-    BodyPipePointer &p = theVirginRep.raw().body_pipe;
-    Must(!p || p->setConsumerIfNotLate(this));
+    if (theVirginRep.raw().body_pipe != NULL)
+        canAccessVb = true; /// assumes nobody is consuming; \todo check
+    else
+        proxyingVb = opNever;
 
     theMaster->start();
 }
@@ -56,33 +57,28 @@ void
 Ecap::XactionRep::swanSong()
 {
     // clear body_pipes, if any
+    // this code does not maintain proxying* and canAccessVb states; should it?
 
     if (theAnswerRep != NULL) {
                BodyPipe::Pointer body_pipe = answer().body_pipe;
-               if (body_pipe != NULL && body_pipe->stillProducing(this))
+               if (body_pipe != NULL) {
+                       Must(body_pipe->stillProducing(this));
                        stopProducingFor(body_pipe, false);
+               }
        }
 
     {
                BodyPipe::Pointer body_pipe = theVirginRep.raw().body_pipe;
-               if (body_pipe != NULL && body_pipe->stillConsuming(this))
+               if (body_pipe != NULL) {
+                       Must(body_pipe->stillConsuming(this));
                        stopConsumingFrom(body_pipe);
+               }
        }
 
     terminateMaster();
     Adaptation::Initiate::swanSong();
 }
 
-void
-Ecap::XactionRep::terminateMaster()
-{
-    if (theMaster) {
-        AdapterXaction x = theMaster;
-        theMaster.reset();
-        x->stop();
-       }
-}
-
 libecap::Message &
 Ecap::XactionRep::virgin()
 {
@@ -111,74 +107,64 @@ Ecap::XactionRep::answer()
     return rep->raw();
 }
 
-bool
-Ecap::XactionRep::doneAll() const
+void
+Ecap::XactionRep::terminateMaster()
 {
     if (theMaster) {
-        if (!doneWithAdapted() || sendingVirgin())
-            return false;
-       }   
-
-    return Adaptation::Initiate::doneAll();
-}
-
-bool
-Ecap::XactionRep::doneWithAdapted() const
-{
-    if (!theAnswerRep)
-        return false;
-
-    // we are not done if we are producing
-    MessageRep *answer = dynamic_cast<MessageRep*>(theAnswerRep.get());
-       Must(answer);
-    const BodyPipePointer &ap = answer->raw().body_pipe;
-    return !ap || !ap->stillProducing(this);
+        AdapterXaction x = theMaster;
+        theMaster.reset();
+        x->stop();
+       }
 }
 
-// are we still sending virgin body to theMaster?
 bool
-Ecap::XactionRep::sendingVirgin() const
+Ecap::XactionRep::doneAll() const
 {
-    // we are sending if we are consuming
-    const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
-    return vp != NULL && vp->stillConsuming(this);
+    return proxyingVb >= opComplete && proxyingAb >= opComplete &&
+        Adaptation::Initiate::doneAll();
 }
 
-// stops sending virgin to theMaster and enables auto-consumption
+// stops receiving virgin and enables auto-consumption
 void
 Ecap::XactionRep::dropVirgin(const char *reason)
 {
-    debugs(93,4, HERE << "because " << reason);
+    debugs(93,4, HERE << "because " << reason << "; status:" << status());
+    Must(proxyingVb = opOn);
 
     BodyPipePointer &p = theVirginRep.raw().body_pipe;
     Must(p != NULL);
     Must(p->stillConsuming(this));
     stopConsumingFrom(p);
     p->enableAutoConsumption();
-    if (doneWithAdapted())
-        theMaster.reset();
+    proxyingVb = opComplete;
+    canAccessVb = false;
+
+    // called from adapter handler so does not inform adapter
 }
 
 void 
 Ecap::XactionRep::useVirgin()
 {
     debugs(93,3, HERE << status());
+    Must(proxyingAb == opUndecided);
+    proxyingAb = opNever;
 
-    // XXX: check state everywhere
-    Must(!theAnswerRep);
-    theMaster.reset();
-
-       HttpMsg *answer = theVirginRep.raw().header->clone();
-       Must(!theVirginRep.raw().body_pipe == !answer->body_pipe); // check clone()
-
-       if (answer->body_pipe != NULL) {
+    BodyPipePointer &vbody_pipe = theVirginRep.raw().body_pipe;
+    if (proxyingVb == opOn) {
+        Must(vbody_pipe->stillConsuming(this));
         // if libecap consumed, we cannot shortcircuit
-        Must(!answer->body_pipe->consumedSize());
-        Must(answer->body_pipe->stillConsuming(this));
-               stopConsumingFrom(answer->body_pipe);
-    }
-
-       sendAnswer(answer);
+        Must(!vbody_pipe->consumedSize());
+        stopConsumingFrom(vbody_pipe);
+        canAccessVb = false;
+        proxyingVb = opComplete;
+    } else
+    if (proxyingVb == opUndecided)
+        proxyingVb = opNever;
+
+       HttpMsg *clone = theVirginRep.raw().header->clone();
+    // check that clone() copies the pipe so that we do not have to
+       Must(!theVirginRep.raw().header->body_pipe == !clone->body_pipe);
+    sendAnswer(clone);
     Must(done());
 }
 
@@ -186,57 +172,76 @@ void
 Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
 {
     debugs(93,3, HERE << status());
+    Must(m);
     theAnswerRep = m;
-       MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
-       Must(rep);
-       HttpMsg *answer = rep->raw().header;
-    if (!theAnswerRep->body()) {
-        if (!sendingVirgin())
-            theMaster.reset();
-        sendAnswer(answer);
-       } else {
-               Must(!answer->body_pipe); // only host can set body pipes
-               rep->tieBody(this);
+    Must(proxyingAb == opUndecided);
+
+       HttpMsg *msg = answer().header;
+    if (!theAnswerRep->body()) { // final, bodyless answer
+        proxyingAb = opNever;
+        sendAnswer(msg);
+       } else { // got answer headers but need to handle body
+        proxyingAb = opOn;
+               Must(!msg->body_pipe); // only host can set body pipes
+        MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
+               Must(rep);
+               rep->tieBody(this); // sets us as a producer
+               Must(msg->body_pipe != NULL); // check tieBody
+
+        sendAnswer(msg);
+
         debugs(93,4, HERE << "adapter will produce body" << status());
         theMaster->abMake(); // libecap will produce
-        sendAnswer(answer);
     }
 }
 
 void
 Ecap::XactionRep::vbIgnore()
 {
+    Must(proxyingVb == opUndecided);
     // if adapter does not need vb, we do not need to send it
-    if (sendingVirgin())
-        dropVirgin("vbIgnore");
+    dropVirgin("vbIgnore");
+    Must(proxyingVb == opNever);
 }
 
 void
 Ecap::XactionRep::vbMake()
 {
-    Must(sendingVirgin());
-    theMaster->noteVbContentAvailable(); // XXX: async
+    Must(proxyingVb == opUndecided);
+    BodyPipePointer &p = theVirginRep.raw().body_pipe;
+    Must(p != NULL);
+    Must(p->setConsumerIfNotLate(this)); // to make vb, we must receive vb
+    proxyingVb = opOn;
 }
 
 void
 Ecap::XactionRep::vbStopMaking()
 {
-    // if adapter does not need vb, we do not need to send it
-    if (sendingVirgin())
-        dropVirgin("vbIgnore");
+    // if adapter does not need vb, we do not need to receive it
+    if (proxyingVb == opOn)
+        dropVirgin("vbStopMaking");
+    Must(proxyingVb == opComplete);
 }
 
 void
 Ecap::XactionRep::vbMakeMore()
 {
-    Must(sendingVirgin() && !theVirginRep.raw().body_pipe->exhausted());
+    Must(proxyingVb == opOn); // cannot make more if done proxying
+    // we cannot guarantee more vb, but we can check that there is a chance
+    Must(!theVirginRep.raw().body_pipe->exhausted());
 }
 
 libecap::Area
 Ecap::XactionRep::vbContent(libecap::off_type o, libecap::size_type s)
 {
+    Must(canAccessVb);
+    // We may not be proxyingVb yet. It should be OK, but see vbContentShift().
+
     const BodyPipePointer &p = theVirginRep.raw().body_pipe;
-    const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
+    Must(p != NULL);
+
+    // TODO: make MemBuf use size_t?
+    const size_t haveSize = static_cast<size_t>(p->buf().contentSize());
 
     // convert to Squid types; XXX: check for overflow
     const uint64_t offset = static_cast<uint64_t>(o);
@@ -246,9 +251,6 @@ Ecap::XactionRep::vbContent(libecap::off_type o, libecap::size_type s)
     const size_t size = s == libecap::nsize ?
                haveSize - offset : static_cast<size_t>(s);
 
-    if (!size)
-        return libecap::Area();
-
     // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
     return libecap::Area::FromTempBuffer(p->buf().content() + offset,
                min(static_cast<size_t>(haveSize - offset), size));
@@ -257,6 +259,11 @@ Ecap::XactionRep::vbContent(libecap::off_type o, libecap::size_type s)
 void
 Ecap::XactionRep::vbContentShift(libecap::size_type n)
 {
+    Must(canAccessVb);
+    // We may not be proxyingVb yet. It should be OK now, but if BodyPipe
+    // consume() requirements change, we would have to return empty vbContent
+    // until the adapter registers as a consumer
+
     BodyPipePointer &p = theVirginRep.raw().body_pipe;
     Must(p != NULL);
     const size_t size = static_cast<size_t>(n); // XXX: check for overflow
@@ -267,19 +274,19 @@ Ecap::XactionRep::vbContentShift(libecap::size_type n)
 void
 Ecap::XactionRep::noteAbContentDone(bool atEnd)
 {
-    Must(!doneWithAdapted());
-    answer().body_pipe->clearProducer(atEnd);
-    if (!sendingVirgin())
-        theMaster.reset();
+    Must(proxyingAb == opOn);
+    stopProducingFor(answer().body_pipe, atEnd);
+    proxyingAb = opComplete;
 }
 
 void
 Ecap::XactionRep::noteAbContentAvailable()
 {
+    Must(proxyingAb == opOn);
     moveAbContent();
 }
 
-#if 0
+#if 0 /* XXX: implement */
 void
 Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
 {
@@ -301,8 +308,8 @@ Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
 void 
 Ecap::XactionRep::adaptationAborted()
 {
-    theMaster.reset();
     tellQueryAborted(true); // should eCAP support retries?
+    mustStop("adaptationAborted");
 }
 
 bool
@@ -314,19 +321,24 @@ Ecap::XactionRep::callable() const
 void 
 Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
 {
-    if (!doneWithAdapted())
-        moveAbContent();
+    Must(proxyingAb == opOn);
+    moveAbContent();
 }
 
 void 
 Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
 {
-    terminateMaster();
+    Must(proxyingAb == opOn);
+    stopProducingFor(answer().body_pipe, false);
+    Must(theMaster);
+    theMaster->abStopMaking();
+    proxyingAb = opComplete;
 }
 
 void
 Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
 {
+    Must(proxyingVb == opOn);
     Must(theMaster);
     theMaster->noteVbContentAvailable();
 }
@@ -334,19 +346,19 @@ Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
 void
 Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
 {
+    Must(proxyingVb == opOn);
     Must(theMaster);
     theMaster->noteVbContentDone(true);
-    if (doneWithAdapted())
-        theMaster.reset();
+    proxyingVb = opComplete;
 }
 
 void
 Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
 {
+    Must(proxyingVb == opOn);
     Must(theMaster);
     theMaster->noteVbContentDone(false);
-    if (doneWithAdapted())
-        theMaster.reset();
+    proxyingVb = opComplete;
 }
 
 void
@@ -359,7 +371,7 @@ Ecap::XactionRep::noteInitiatorAborted()
 void
 Ecap::XactionRep::moveAbContent()
 {
-    Must(!doneWithAdapted());
+    Must(proxyingAb == opOn);
     const libecap::Area c = theMaster->abContent(0, libecap::nsize);
     debugs(93,5, HERE << " up to " << c.size << " bytes");
     if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
@@ -374,23 +386,25 @@ Ecap::XactionRep::status() const
 
     buf.append(" [", 2);
 
-    const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
-    if (vp != NULL && vp->stillConsuming(this)) {
-               buf.append("Vb", 2);
-               buf.append(vp->status(), strlen(vp->status())); // XXX
+    if (proxyingVb == opOn) {
+        const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
+        if (!canAccessVb)
+            buf.append("x", 1);
+        if (vp != NULL && vp->stillConsuming(this)) {
+                   buf.append("Vb", 2);
+            buf.append(vp->status(), strlen(vp->status())); // XXX
+               } else
+            buf.append("V.", 2);
        }
-    else
-               buf.append("V.", 2);
 
-    if (theAnswerRep != NULL) {
-               MessageRep *answer = dynamic_cast<MessageRep*>(theAnswerRep.get());
-               Must(answer);
-               const BodyPipePointer &ap = answer->raw().body_pipe;
+    if (proxyingAb == opOn) {
+        MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
+        Must(rep);
+               const BodyPipePointer &ap = rep->raw().body_pipe;
                if (ap != NULL && ap->stillProducing(this)) {
                        buf.append(" Ab", 3);
                        buf.append(ap->status(), strlen(ap->status())); // XXX
-               }
-        else
+               } else
                        buf.append(" A.", 3);
        }
 
index 8ea0c7d02bdd837fdcb2cf1401354cfd6bae533b..f9fe27c05bd2770dd75f82380f157780f14313f9 100644 (file)
@@ -72,10 +72,7 @@ public:
 protected:
     Adaptation::Message &answer();
 
-    bool sendingVirgin() const;
     void dropVirgin(const char *reason);
-    bool doneWithAdapted() const;
-
     void moveAbContent();
 
     void terminateMaster();
@@ -90,6 +87,11 @@ private:
        typedef libecap::shared_ptr<libecap::Message> MessagePtr;
        MessagePtr theAnswerRep;
 
+    typedef enum { opUndecided, opOn, opComplete, opNever } OperationState;
+       OperationState proxyingVb; // delivering virgin body from core to adapter
+       OperationState proxyingAb; // delivering adapted body from adapter to core
+       bool canAccessVb;          // virgin BodyPipe content is accessible
+
        CBDATA_CLASS2(XactionRep);
 };