From: Alex Rousskov Date: Sun, 28 Sep 2008 17:53:44 +0000 (-0600) Subject: Synced with the new libecap pull-pull interface where the host pulls X-Git-Tag: SQUID_3_1_0_1~45^2~11^2~16 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8679e6c2ca08b150626ddd65f837afc3e457bb1b;p=thirdparty%2Fsquid.git Synced with the new libecap pull-pull interface where the host pulls adapted body and the adapter pulls virgin body. --- diff --git a/src/eCAP/XactionRep.cc b/src/eCAP/XactionRep.cc index eb4cf0152d..73cf6ba1f8 100644 --- a/src/eCAP/XactionRep.cc +++ b/src/eCAP/XactionRep.cc @@ -122,7 +122,6 @@ Ecap::XactionRep::doneAll() const return Adaptation::Initiate::doneAll(); } -// are we still sending virgin body to theMaster? bool Ecap::XactionRep::doneWithAdapted() const { @@ -199,86 +198,97 @@ Ecap::XactionRep::useAdapted(const libecap::shared_ptr &m) Must(!answer->body_pipe); // only host can set body pipes rep->tieBody(this); debugs(93,4, HERE << "adapter will produce body" << status()); - // libecap will produce + theMaster->abMake(); // libecap will produce sendAnswer(answer); } } -// if adapter does not want to consume, we should not either void -Ecap::XactionRep::adapterWontConsume() +Ecap::XactionRep::vbIgnore() { + // if adapter does not need vb, we do not need to send it if (sendingVirgin()) - dropVirgin("adapterWontConsume"); + dropVirgin("vbIgnore"); } void -Ecap::XactionRep::adapterWillConsume() +Ecap::XactionRep::vbMake() { Must(sendingVirgin()); - theMaster->noteVirginDataAvailable(); // XXX: async + theMaster->noteVbContentAvailable(); // XXX: async } void -Ecap::XactionRep::adapterDoneConsuming() +Ecap::XactionRep::vbStopMaking() { + // if adapter does not need vb, we do not need to send it if (sendingVirgin()) - dropVirgin("adapterDoneConsuming"); + dropVirgin("vbIgnore"); } void -Ecap::XactionRep::consumeVirgin(size_type n) +Ecap::XactionRep::vbMakeMore() { - BodyPipePointer &p = theVirginRep.raw().body_pipe; - Must(p != NULL); - const size_t size = static_cast(n); // XXX: check for overflow - const size_t sizeMax = static_cast(p->buf().contentSize()); // TODO: make MemBuf use size_t? - p->consume(min(size, sizeMax)); + Must(sendingVirgin() && !theVirginRep.raw().body_pipe->exhausted()); } -void -Ecap::XactionRep::pauseVirginProduction() +libecap::Area +Ecap::XactionRep::vbContent(libecap::off_type o, libecap::size_type s) { - // TODO: support production pauses -} + const BodyPipePointer &p = theVirginRep.raw().body_pipe; + const size_t haveSize = static_cast(p->buf().contentSize()); // TODO: make MemBuf use size_t? -void -Ecap::XactionRep::resumeVirginProduction() -{ - // TODO: support production pauses + // convert to Squid types; XXX: check for overflow + const uint64_t offset = static_cast(o); + Must(offset <= haveSize); // equal iff at the end of content + + // nsize means no size limit: all content starting from offset + const size_t size = s == libecap::nsize ? + haveSize - offset : static_cast(s); + + if (!size) + return libecap::Area(); + + // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src) + return libecap::Area::FromTempBuffer(p->buf().content() + offset, + min(static_cast(haveSize - offset), size)); } void -Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size) +Ecap::XactionRep::vbContentShift(libecap::size_type n) { - Must(answer().body_pipe != NULL); - if (size.known()) - answer().body_pipe->setBodySize(size.value()); - // else the piped body size is unknown by default + BodyPipePointer &p = theVirginRep.raw().body_pipe; + Must(p != NULL); + const size_t size = static_cast(n); // XXX: check for overflow + const size_t haveSize = static_cast(p->buf().contentSize()); // TODO: make MemBuf use size_t? + p->consume(min(size, haveSize)); } void -Ecap::XactionRep::appendAdapted(const libecap::Area &area) +Ecap::XactionRep::noteAbContentDone(bool atEnd) { - BodyPipe *p = answer().body_pipe.getRaw(); - Must(p); - Must(p->putMoreData(area.start, area.size) == area.size); + Must(!doneWithAdapted()); + answer().body_pipe->clearProducer(atEnd); + if (!sendingVirgin()) + theMaster.reset(); } -bool -Ecap::XactionRep::callable() const +void +Ecap::XactionRep::noteAbContentAvailable() { - return !done(); + moveAbContent(); } +#if 0 void -Ecap::XactionRep::noteAdaptedBodyEnd() +Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size) { Must(answer().body_pipe != NULL); - answer().body_pipe->clearProducer(true); - if (!sendingVirgin()) - theMaster.reset(); + if (size.known()) + answer().body_pipe->setBodySize(size.value()); + // else the piped body size is unknown by default } +#endif void Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d) @@ -295,11 +305,17 @@ Ecap::XactionRep::adaptationAborted() tellQueryAborted(true); // should eCAP support retries? } +bool +Ecap::XactionRep::callable() const +{ + return !done(); +} + void Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount bp) { - Must(theMaster); - theMaster->noteAdaptedSpaceAvailable(); + if (!doneWithAdapted()) + moveAbContent(); } void @@ -312,20 +328,25 @@ void Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount bp) { Must(theMaster); - theMaster->noteVirginDataAvailable(); + theMaster->noteVbContentAvailable(); } void Ecap::XactionRep::noteBodyProductionEnded(RefCount bp) { Must(theMaster); - theMaster->noteVirginDataEnded(); + theMaster->noteVbContentDone(true); + if (doneWithAdapted()) + theMaster.reset(); } void Ecap::XactionRep::noteBodyProducerAborted(RefCount bp) { - terminateMaster(); + Must(theMaster); + theMaster->noteVbContentDone(false); + if (doneWithAdapted()) + theMaster.reset(); } void @@ -334,7 +355,19 @@ Ecap::XactionRep::noteInitiatorAborted() mustStop("initiator aborted"); } -const char *Ecap::XactionRep::status() const +// get content from the adapter and put it into the adapted pipe +void +Ecap::XactionRep::moveAbContent() +{ + Must(!doneWithAdapted()); + const libecap::Area c = theMaster->abContent(0, libecap::nsize); + debugs(93,5, HERE << " up to " << c.size << " bytes"); + if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size)) + theMaster->abContentShift(used); +} + +const char * +Ecap::XactionRep::status() const { static MemBuf buf; buf.reset(); diff --git a/src/eCAP/XactionRep.h b/src/eCAP/XactionRep.h index ce193afa6f..8ea0c7d02b 100644 --- a/src/eCAP/XactionRep.h +++ b/src/eCAP/XactionRep.h @@ -39,15 +39,14 @@ public: virtual void useAdapted(const libecap::shared_ptr &msg); virtual void adaptationDelayed(const libecap::Delay &); virtual void adaptationAborted(); - virtual void adapterWontConsume(); - virtual void adapterWillConsume(); - virtual void adapterDoneConsuming(); - virtual void consumeVirgin(size_type n); - virtual void pauseVirginProduction(); - virtual void resumeVirginProduction(); - virtual void setAdaptedBodySize(const libecap::BodySize &size); - virtual void appendAdapted(const libecap::Area &area); - virtual void noteAdaptedBodyEnd(); + virtual void vbIgnore(); + virtual void vbMake(); + virtual void vbStopMaking(); + virtual void vbMakeMore(); + virtual libecap::Area vbContent(libecap::off_type offset, libecap::size_type size); + virtual void vbContentShift(libecap::size_type size); + virtual void noteAbContentDone(bool atEnd); + virtual void noteAbContentAvailable(); // libecap::Callable API, via libecap::host::Xaction virtual bool callable() const; @@ -77,6 +76,8 @@ protected: void dropVirgin(const char *reason); bool doneWithAdapted() const; + void moveAbContent(); + void terminateMaster(); void scheduleStop(const char *reason);