]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Synced with the new libecap pull-pull interface where the host pulls
authorAlex Rousskov <rousskov@measurement-factory.com>
Sun, 28 Sep 2008 17:53:44 +0000 (11:53 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Sun, 28 Sep 2008 17:53:44 +0000 (11:53 -0600)
adapted body and the adapter pulls virgin body.

src/eCAP/XactionRep.cc
src/eCAP/XactionRep.h

index eb4cf0152df8fedb5b6046ac31db14870abae8af..73cf6ba1f85906962683caf0c8ec3dbe868838fa 100644 (file)
@@ -122,7 +122,6 @@ Ecap::XactionRep::doneAll() const
     return Adaptation::Initiate::doneAll();
 }
 
-// are we still sending virgin body to theMaster?
 bool
 Ecap::XactionRep::doneWithAdapted() const
 {
@@ -199,86 +198,97 @@ Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
                Must(!answer->body_pipe); // only host can set body pipes
                rep->tieBody(this);
         debugs(93,4, HERE << "adapter will produce body" << status());
-        // libecap will produce
+        theMaster->abMake(); // libecap will produce
         sendAnswer(answer);
     }
 }
 
-// if adapter does not want to consume, we should not either
 void
-Ecap::XactionRep::adapterWontConsume()
+Ecap::XactionRep::vbIgnore()
 {
+    // if adapter does not need vb, we do not need to send it
     if (sendingVirgin())
-        dropVirgin("adapterWontConsume");
+        dropVirgin("vbIgnore");
 }
 
 void
-Ecap::XactionRep::adapterWillConsume()
+Ecap::XactionRep::vbMake()
 {
     Must(sendingVirgin());
-    theMaster->noteVirginDataAvailable(); // XXX: async
+    theMaster->noteVbContentAvailable(); // XXX: async
 }
 
 void
-Ecap::XactionRep::adapterDoneConsuming()
+Ecap::XactionRep::vbStopMaking()
 {
+    // if adapter does not need vb, we do not need to send it
     if (sendingVirgin())
-        dropVirgin("adapterDoneConsuming");
+        dropVirgin("vbIgnore");
 }
 
 void
-Ecap::XactionRep::consumeVirgin(size_type n)
+Ecap::XactionRep::vbMakeMore()
 {
-    BodyPipePointer &p = theVirginRep.raw().body_pipe;
-    Must(p != NULL);
-    const size_t size = static_cast<size_t>(n); // XXX: check for overflow
-    const size_t sizeMax = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
-    p->consume(min(size, sizeMax));
+    Must(sendingVirgin() && !theVirginRep.raw().body_pipe->exhausted());
 }
 
-void
-Ecap::XactionRep::pauseVirginProduction()
+libecap::Area
+Ecap::XactionRep::vbContent(libecap::off_type o, libecap::size_type s)
 {
-    // TODO: support production pauses
-}
+    const BodyPipePointer &p = theVirginRep.raw().body_pipe;
+    const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
 
-void
-Ecap::XactionRep::resumeVirginProduction()
-{
-    // TODO: support production pauses
+    // convert to Squid types; XXX: check for overflow
+    const uint64_t offset = static_cast<uint64_t>(o);
+    Must(offset <= haveSize); // equal iff at the end of content
+
+    // nsize means no size limit: all content starting from offset
+    const size_t size = s == libecap::nsize ?
+               haveSize - offset : static_cast<size_t>(s);
+
+    if (!size)
+        return libecap::Area();
+
+    // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
+    return libecap::Area::FromTempBuffer(p->buf().content() + offset,
+               min(static_cast<size_t>(haveSize - offset), size));
 }
 
 void
-Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
+Ecap::XactionRep::vbContentShift(libecap::size_type n)
 {
-    Must(answer().body_pipe != NULL);
-    if (size.known())
-        answer().body_pipe->setBodySize(size.value());
-    // else the piped body size is unknown by default
+    BodyPipePointer &p = theVirginRep.raw().body_pipe;
+    Must(p != NULL);
+    const size_t size = static_cast<size_t>(n); // XXX: check for overflow
+    const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
+    p->consume(min(size, haveSize));
 }
 
 void
-Ecap::XactionRep::appendAdapted(const libecap::Area &area)
+Ecap::XactionRep::noteAbContentDone(bool atEnd)
 {
-    BodyPipe *p = answer().body_pipe.getRaw();
-    Must(p);
-    Must(p->putMoreData(area.start, area.size) == area.size);
+    Must(!doneWithAdapted());
+    answer().body_pipe->clearProducer(atEnd);
+    if (!sendingVirgin())
+        theMaster.reset();
 }
 
-bool
-Ecap::XactionRep::callable() const
+void
+Ecap::XactionRep::noteAbContentAvailable()
 {
-    return !done();
+    moveAbContent();
 }
 
+#if 0
 void
-Ecap::XactionRep::noteAdaptedBodyEnd()
+Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
 {
     Must(answer().body_pipe != NULL);
-    answer().body_pipe->clearProducer(true);
-    if (!sendingVirgin())
-        theMaster.reset();
+    if (size.known())
+        answer().body_pipe->setBodySize(size.value());
+    // else the piped body size is unknown by default
 }
+#endif
 
 void
 Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
@@ -295,11 +305,17 @@ Ecap::XactionRep::adaptationAborted()
     tellQueryAborted(true); // should eCAP support retries?
 }
 
+bool
+Ecap::XactionRep::callable() const
+{
+    return !done();
+}
+
 void 
 Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
 {
-    Must(theMaster);
-    theMaster->noteAdaptedSpaceAvailable();
+    if (!doneWithAdapted())
+        moveAbContent();
 }
 
 void 
@@ -312,20 +328,25 @@ void
 Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
 {
     Must(theMaster);
-    theMaster->noteVirginDataAvailable();
+    theMaster->noteVbContentAvailable();
 }
 
 void
 Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
 {
     Must(theMaster);
-    theMaster->noteVirginDataEnded();
+    theMaster->noteVbContentDone(true);
+    if (doneWithAdapted())
+        theMaster.reset();
 }
 
 void
 Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
 {
-    terminateMaster();
+    Must(theMaster);
+    theMaster->noteVbContentDone(false);
+    if (doneWithAdapted())
+        theMaster.reset();
 }
 
 void
@@ -334,7 +355,19 @@ Ecap::XactionRep::noteInitiatorAborted()
     mustStop("initiator aborted");
 }
 
-const char *Ecap::XactionRep::status() const
+// get content from the adapter and put it into the adapted pipe
+void
+Ecap::XactionRep::moveAbContent()
+{
+    Must(!doneWithAdapted());
+    const libecap::Area c = theMaster->abContent(0, libecap::nsize);
+    debugs(93,5, HERE << " up to " << c.size << " bytes");
+    if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
+               theMaster->abContentShift(used);
+}
+
+const char *
+Ecap::XactionRep::status() const
 {
     static MemBuf buf;
     buf.reset();
index ce193afa6fe339b678ab7830401ec42e89b70b4b..8ea0c7d02bdd837fdcb2cf1401354cfd6bae533b 100644 (file)
@@ -39,15 +39,14 @@ public:
     virtual void useAdapted(const libecap::shared_ptr<libecap::Message> &msg);
     virtual void adaptationDelayed(const libecap::Delay &);
     virtual void adaptationAborted();
-    virtual void adapterWontConsume();
-    virtual void adapterWillConsume();
-    virtual void adapterDoneConsuming();
-    virtual void consumeVirgin(size_type n);
-    virtual void pauseVirginProduction();
-    virtual void resumeVirginProduction();
-    virtual void setAdaptedBodySize(const libecap::BodySize &size);
-    virtual void appendAdapted(const libecap::Area &area);
-    virtual void noteAdaptedBodyEnd();
+    virtual void vbIgnore();
+    virtual void vbMake();
+    virtual void vbStopMaking();
+    virtual void vbMakeMore();
+    virtual libecap::Area vbContent(libecap::off_type offset, libecap::size_type size);
+    virtual void vbContentShift(libecap::size_type size);
+    virtual void noteAbContentDone(bool atEnd);
+    virtual void noteAbContentAvailable();
 
        // libecap::Callable API, via libecap::host::Xaction
        virtual bool callable() const;
@@ -77,6 +76,8 @@ protected:
     void dropVirgin(const char *reason);
     bool doneWithAdapted() const;
 
+    void moveAbContent();
+
     void terminateMaster();
     void scheduleStop(const char *reason);