]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Use message representatives to store virgin and adapted messages.
authorAlex Rousskov <rousskov@measurement-factory.com>
Sat, 27 Sep 2008 18:17:22 +0000 (12:17 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Sat, 27 Sep 2008 18:17:22 +0000 (12:17 -0600)
Migrating to a model where all message changes are done via transaction,
not the message itself. A message cannot handle many changes on its own
because it is not a job, and placing some changes in MessageRep and some
in XactionRep results in messy code.

Polished body handling. Needs more work.

Added satus reporting. Needs more work.

src/eCAP/XactionRep.cc
src/eCAP/XactionRep.h

index 6426d4da35e8eac2959e4a6cb22ccc699390603a..c497d7086203ed799c32030b510e213f1498d7c3 100644 (file)
@@ -1,4 +1,6 @@
 #include "squid.h"
+#include <libecap/common/area.h>
+#include <libecap/common/delay.h>
 #include <libecap/adapter/xaction.h>
 #include "TextException.h"
 #include "assert.h"
@@ -16,19 +18,17 @@ Ecap::XactionRep::XactionRep(Adaptation::Initiator *anInitiator,
     const Adaptation::ServicePointer &aService):
     AsyncJob("Ecap::XactionRep"),
     Adaptation::Initiate("Ecap::XactionRep", anInitiator, aService),
-    theVirgin(virginHeader), theCause(virginCause),
-    theVirginRep(theVirgin, NULL), theCauseRep(NULL),
-    theAnswerRep(NULL)
+    theVirginRep(virginHeader), theCauseRep(NULL)
 {
     if (virginCause)
-        theCauseRep = new MessageRep(theCause, NULL);
+        theCauseRep = new MessageRep(virginCause);
 }
 
 Ecap::XactionRep::~XactionRep()
 {
     assert(!theMaster);
     delete theCauseRep;
-    delete theAnswerRep;
+    theAnswerRep.reset();
 }
 
 void
@@ -43,11 +43,17 @@ void
 Ecap::XactionRep::start()
 {
     Must(theMaster);
+
+    // register as a consumer if there is a body
+    // we do not actually consume unless the adapter tells us to
+    BodyPipePointer &p = theVirginRep.raw().body_pipe;
+    Must(!p || p->setConsumerIfNotLate(this));
+
     theMaster->start();
 }
 
 void
-Ecap::XactionRep::swangSong()
+Ecap::XactionRep::swanSong()
 {
     terminateMaster();
     Adaptation::Initiate::swanSong();
@@ -69,62 +75,207 @@ Ecap::XactionRep::virgin()
     return theVirginRep;
 }
 
-const libecap::Message *
+const libecap::Message &
 Ecap::XactionRep::cause()
 {
-    return theCauseRep;
+    Must(theCauseRep != NULL);
+    return *theCauseRep;
 }
 
-void 
-Ecap::XactionRep::useVirgin()
+libecap::Message &
+Ecap::XactionRep::adapted()
 {
-    theMaster.reset();
-    Adaptation::Message::ShortCircuit(theVirgin, theAnswer);
-    Must(!theVirgin.body_pipe == !theAnswer.body_pipe);
-    sendAnswer(theAnswer.header);
+    Must(theAnswerRep != NULL);
+    return *theAnswerRep;
+}
+
+Adaptation::Message &
+Ecap::XactionRep::answer()
+{
+       MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
+       Must(rep);
+    return rep->raw();
+}
+
+bool
+Ecap::XactionRep::doneAll() const
+{
+    if (theMaster) {
+        if (!doneWithAdapted() || sendingVirgin())
+            return false;
+       }   
+
+    return Adaptation::Initiate::doneAll();
+}
+
+// are we still sending virgin body to theMaster?
+bool
+Ecap::XactionRep::doneWithAdapted() const
+{
+    if (!theAnswerRep)
+        return false;
+
+    // we are not done if we are producing
+    MessageRep *answer = dynamic_cast<MessageRep*>(theAnswerRep.get());
+       Must(answer);
+    const BodyPipePointer &ap = answer->raw().body_pipe;
+    return !ap || !ap->stillProducing(this);
+}
+
+// are we still sending virgin body to theMaster?
+bool
+Ecap::XactionRep::sendingVirgin() const
+{
+    // we are sending if we are consuming
+    const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
+    return vp != NULL && vp->stillConsuming(this);
+}
+
+// stops sending virgin to theMaster and enables auto-consumption
+void
+Ecap::XactionRep::dropVirgin(const char *reason)
+{
+    debugs(93,4, HERE << "because " << reason);
+
+    BodyPipePointer &p = theVirginRep.raw().body_pipe;
+    Must(p != NULL);
+    Must(p->stillConsuming(this));
+    stopConsumingFrom(p);
+    p->enableAutoConsumption();
+    if (doneWithAdapted())
+        theMaster.reset();
 }
 
 void 
-Ecap::XactionRep::adaptVirgin()
+Ecap::XactionRep::useVirgin()
 {
+    debugs(93,3, HERE << status());
+
     // XXX: check state everywhere
     Must(!theAnswerRep);
-    Must(!theAnswer.header);
-    Must(!theAnswer.body_pipe);
-    theAnswer.set(theVirgin.header->clone());
-       theAnswerRep = new MessageRep(theAnswer, this);
-    Must(!theAnswer.body_pipe);
+    theMaster.reset();
+
+       HttpMsg *answer = theVirginRep.raw().header->clone();
+       Must(!theVirginRep.raw().body_pipe == !answer->body_pipe); // check clone()
+
+       if (answer->body_pipe != NULL) {
+        // if libecap consumed, we cannot shortcircuit
+        Must(!answer->body_pipe->consumedSize());
+        Must(answer->body_pipe->stillConsuming(this));
+               stopConsumingFrom(answer->body_pipe);
+    }
+
+       sendAnswer(answer);
+    Must(done());
 }
 
 void 
-Ecap::XactionRep::adaptNewRequest()
+Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
 {
-    theAnswer.set(new HttpRequest);
-       theAnswerRep = new MessageRep(theAnswer, this);
+    debugs(93,3, HERE << status());
+    theAnswerRep = m;
+       MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
+       Must(rep);
+       HttpMsg *answer = rep->raw().header;
+    if (!theAnswerRep->body()) {
+        if (!sendingVirgin())
+            theMaster.reset();
+        sendAnswer(answer);
+       } else {
+               Must(!answer->body_pipe); // only host can set body pipes
+               rep->tieBody(this);
+        debugs(93,4, HERE << "adapter will produce body" << status());
+        // libecap will produce
+        sendAnswer(answer);
+    }
 }
 
-void 
-Ecap::XactionRep::adaptNewResponse()
+// if adapter does not want to consume, we should not either
+void
+Ecap::XactionRep::adapterWontConsume()
 {
-    theAnswer.set(new HttpReply);
-       theAnswerRep = new MessageRep(theAnswer, this);
+    if (sendingVirgin())
+        dropVirgin("adapterWontConsume");
 }
 
-libecap::Message *
-Ecap::XactionRep::adapted()
+void
+Ecap::XactionRep::adapterWillConsume()
 {
-    return theAnswerRep;
+    Must(sendingVirgin());
+    theMaster->noteVirginDataAvailable(); // XXX: async
 }
 
-void 
-Ecap::XactionRep::useAdapted()
+void
+Ecap::XactionRep::adapterDoneConsuming()
 {
-    theMaster.reset();
-    sendAnswer(theAnswer.header);
+    if (sendingVirgin())
+        dropVirgin("adapterDoneConsuming");
+}
+
+void
+Ecap::XactionRep::consumeVirgin(size_type n)
+{
+    BodyPipePointer &p = theVirginRep.raw().body_pipe;
+    Must(p != NULL);
+    const size_t size = static_cast<size_t>(n); // XXX: check for overflow
+    const size_t sizeMax = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
+    p->consume(min(size, sizeMax));
+}
+
+void
+Ecap::XactionRep::pauseVirginProduction()
+{
+    // TODO: support production pauses
+}
+
+void
+Ecap::XactionRep::resumeVirginProduction()
+{
+    // TODO: support production pauses
+}
+
+void
+Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
+{
+    Must(answer().body_pipe != NULL);
+    if (size.known())
+        answer().body_pipe->setBodySize(size.value());
+    // else the piped body size is unknown by default
+}
+
+void
+Ecap::XactionRep::appendAdapted(const libecap::Area &area)
+{
+    BodyPipe *p = answer().body_pipe.getRaw();
+    Must(p);
+    Must(p->putMoreData(area.start, area.size) == area.size);
+}
+
+bool
+Ecap::XactionRep::callable() const
+{
+    return !done();
+}
+
+void
+Ecap::XactionRep::noteAdaptedBodyEnd()
+{
+    Must(answer().body_pipe != NULL);
+    answer().body_pipe->clearProducer(true);
+    if (!sendingVirgin())
+        theMaster.reset();
+}
+
+void
+Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
+{
+    debugs(93,3, HERE << "adapter needs time: " <<
+       d.state << '/' << d.progress);
+    // XXX: set timeout?
 }
 
 void 
-Ecap::XactionRep::useNone()
+Ecap::XactionRep::adaptationAborted()
 {
     theMaster.reset();
     tellQueryAborted(true); // should eCAP support retries?
@@ -140,8 +291,7 @@ Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
 void 
 Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
 {
-    Must(theMaster);
-    theMaster->noteAdaptedAborted();
+    terminateMaster();
 }
 
 void
@@ -161,8 +311,7 @@ Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
 void
 Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
 {
-    Must(theMaster);
-    theMaster->noteVirginAborted();
+    terminateMaster();
 }
 
 void
@@ -173,5 +322,30 @@ Ecap::XactionRep::noteInitiatorAborted()
 
 const char *Ecap::XactionRep::status() const
 {
-       return Adaptation::Initiate::status();
+    static MemBuf buf;
+    buf.reset();
+
+    buf.append(" [", 2);
+
+    if (theAnswerRep != NULL) {
+               MessageRep *answer = dynamic_cast<MessageRep*>(theAnswerRep.get());
+               Must(answer);
+               const BodyPipePointer &ap = answer->raw().body_pipe;
+               if (ap != NULL && ap->stillProducing(this))
+                       buf.append("Ab ", 3);
+        else
+                       buf.append("A. ", 3);
+       }
+
+    const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
+    if (vp != NULL && vp->stillConsuming(this))
+               buf.append("Vb ", 3);
+    else
+               buf.append("V. ", 3);
+
+    buf.Printf(" ecapx%d]", id);
+
+    buf.terminate();
+
+    return buf.content();
 }
index 58a48ad44277490ed090526e26b5fbc7812d5919..ce193afa6fe339b678ab7830401ec42e89b70b4b 100644 (file)
@@ -22,7 +22,7 @@ namespace Ecap {
    xaction that Squid communicates with. One eCAP module may register many 
    eCAP xactions. */
 class XactionRep : public Adaptation::Initiate, public libecap::host::Xaction,
-    public BodyProducer, public BodyConsumer
+    public BodyConsumer, public BodyProducer
 {
 public:
     XactionRep(Adaptation::Initiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, const Adaptation::ServicePointer &service);
@@ -32,15 +32,25 @@ public:
        void master(const AdapterXaction &aMaster); // establish a link
 
     // libecap::host::Xaction API
-    virtual libecap::Message &virgin(); // request or response
-    virtual const libecap::Message *cause(); // request for the above response
-    virtual void useVirgin();  // final answer: no adaptation
-    virtual void adaptVirgin(); // adapted message starts as virgin
-    virtual void adaptNewRequest(); // make fresh adapted request
-    virtual void adaptNewResponse(); // make fresh adapted response
-    virtual libecap::Message *adapted(); // request or response
-    virtual void useAdapted(); // final answer: adapted msg is ready
-    virtual void useNone(); // final answer: no answer
+    virtual libecap::Message &virgin();
+    virtual const libecap::Message &cause();
+    virtual libecap::Message &adapted();
+    virtual void useVirgin();
+    virtual void useAdapted(const libecap::shared_ptr<libecap::Message> &msg);
+    virtual void adaptationDelayed(const libecap::Delay &);
+    virtual void adaptationAborted();
+    virtual void adapterWontConsume();
+    virtual void adapterWillConsume();
+    virtual void adapterDoneConsuming();
+    virtual void consumeVirgin(size_type n);
+    virtual void pauseVirginProduction();
+    virtual void resumeVirginProduction();
+    virtual void setAdaptedBodySize(const libecap::BodySize &size);
+    virtual void appendAdapted(const libecap::Area &area);
+    virtual void noteAdaptedBodyEnd();
+
+       // libecap::Callable API, via libecap::host::Xaction
+       virtual bool callable() const;
 
     // BodyProducer API
     virtual void noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp);
@@ -56,21 +66,28 @@ public:
 
     // AsyncJob API (via Initiate)
     virtual void start();
-    virtual void swangSong();
+    virtual bool doneAll() const;
+    virtual void swanSong();
     virtual const char *status() const;
 
 protected:
+    Adaptation::Message &answer();
+
+    bool sendingVirgin() const;
+    void dropVirgin(const char *reason);
+    bool doneWithAdapted() const;
+
     void terminateMaster();
+    void scheduleStop(const char *reason);
 
 private:
        AdapterXaction theMaster; // the actual adaptation xaction we represent
 
-       Adaptation::Message theVirgin;
-       Adaptation::Message theCause;
-       Adaptation::Message theAnswer;
        MessageRep theVirginRep;
        MessageRep *theCauseRep;
-       MessageRep *theAnswerRep;
+
+       typedef libecap::shared_ptr<libecap::Message> MessagePtr;
+       MessagePtr theAnswerRep;
 
        CBDATA_CLASS2(XactionRep);
 };