From: Alex Rousskov Date: Sat, 27 Sep 2008 18:17:22 +0000 (-0600) Subject: Use message representatives to store virgin and adapted messages. X-Git-Tag: SQUID_3_1_0_1~45^2~11^2~22^2~1 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=4d0854d44f00362bffe01e7e6d79aae7a9d24a3b;p=thirdparty%2Fsquid.git Use message representatives to store virgin and adapted messages. Migrating to a model where all message changes are done via transaction, not the message itself. A message cannot handle many changes on its own because it is not a job, and placing some changes in MessageRep and some in XactionRep results in messy code. Polished body handling. Needs more work. Added satus reporting. Needs more work. --- diff --git a/src/eCAP/XactionRep.cc b/src/eCAP/XactionRep.cc index 6426d4da35..c497d70862 100644 --- a/src/eCAP/XactionRep.cc +++ b/src/eCAP/XactionRep.cc @@ -1,4 +1,6 @@ #include "squid.h" +#include +#include #include #include "TextException.h" #include "assert.h" @@ -16,19 +18,17 @@ Ecap::XactionRep::XactionRep(Adaptation::Initiator *anInitiator, const Adaptation::ServicePointer &aService): AsyncJob("Ecap::XactionRep"), Adaptation::Initiate("Ecap::XactionRep", anInitiator, aService), - theVirgin(virginHeader), theCause(virginCause), - theVirginRep(theVirgin, NULL), theCauseRep(NULL), - theAnswerRep(NULL) + theVirginRep(virginHeader), theCauseRep(NULL) { if (virginCause) - theCauseRep = new MessageRep(theCause, NULL); + theCauseRep = new MessageRep(virginCause); } Ecap::XactionRep::~XactionRep() { assert(!theMaster); delete theCauseRep; - delete theAnswerRep; + theAnswerRep.reset(); } void @@ -43,11 +43,17 @@ void Ecap::XactionRep::start() { Must(theMaster); + + // register as a consumer if there is a body + // we do not actually consume unless the adapter tells us to + BodyPipePointer &p = theVirginRep.raw().body_pipe; + Must(!p || p->setConsumerIfNotLate(this)); + theMaster->start(); } void -Ecap::XactionRep::swangSong() +Ecap::XactionRep::swanSong() { terminateMaster(); Adaptation::Initiate::swanSong(); @@ -69,62 +75,207 @@ Ecap::XactionRep::virgin() return theVirginRep; } -const libecap::Message * +const libecap::Message & Ecap::XactionRep::cause() { - return theCauseRep; + Must(theCauseRep != NULL); + return *theCauseRep; } -void -Ecap::XactionRep::useVirgin() +libecap::Message & +Ecap::XactionRep::adapted() { - theMaster.reset(); - Adaptation::Message::ShortCircuit(theVirgin, theAnswer); - Must(!theVirgin.body_pipe == !theAnswer.body_pipe); - sendAnswer(theAnswer.header); + Must(theAnswerRep != NULL); + return *theAnswerRep; +} + +Adaptation::Message & +Ecap::XactionRep::answer() +{ + MessageRep *rep = dynamic_cast(theAnswerRep.get()); + Must(rep); + return rep->raw(); +} + +bool +Ecap::XactionRep::doneAll() const +{ + if (theMaster) { + if (!doneWithAdapted() || sendingVirgin()) + return false; + } + + return Adaptation::Initiate::doneAll(); +} + +// are we still sending virgin body to theMaster? +bool +Ecap::XactionRep::doneWithAdapted() const +{ + if (!theAnswerRep) + return false; + + // we are not done if we are producing + MessageRep *answer = dynamic_cast(theAnswerRep.get()); + Must(answer); + const BodyPipePointer &ap = answer->raw().body_pipe; + return !ap || !ap->stillProducing(this); +} + +// are we still sending virgin body to theMaster? +bool +Ecap::XactionRep::sendingVirgin() const +{ + // we are sending if we are consuming + const BodyPipePointer &vp = theVirginRep.raw().body_pipe; + return vp != NULL && vp->stillConsuming(this); +} + +// stops sending virgin to theMaster and enables auto-consumption +void +Ecap::XactionRep::dropVirgin(const char *reason) +{ + debugs(93,4, HERE << "because " << reason); + + BodyPipePointer &p = theVirginRep.raw().body_pipe; + Must(p != NULL); + Must(p->stillConsuming(this)); + stopConsumingFrom(p); + p->enableAutoConsumption(); + if (doneWithAdapted()) + theMaster.reset(); } void -Ecap::XactionRep::adaptVirgin() +Ecap::XactionRep::useVirgin() { + debugs(93,3, HERE << status()); + // XXX: check state everywhere Must(!theAnswerRep); - Must(!theAnswer.header); - Must(!theAnswer.body_pipe); - theAnswer.set(theVirgin.header->clone()); - theAnswerRep = new MessageRep(theAnswer, this); - Must(!theAnswer.body_pipe); + theMaster.reset(); + + HttpMsg *answer = theVirginRep.raw().header->clone(); + Must(!theVirginRep.raw().body_pipe == !answer->body_pipe); // check clone() + + if (answer->body_pipe != NULL) { + // if libecap consumed, we cannot shortcircuit + Must(!answer->body_pipe->consumedSize()); + Must(answer->body_pipe->stillConsuming(this)); + stopConsumingFrom(answer->body_pipe); + } + + sendAnswer(answer); + Must(done()); } void -Ecap::XactionRep::adaptNewRequest() +Ecap::XactionRep::useAdapted(const libecap::shared_ptr &m) { - theAnswer.set(new HttpRequest); - theAnswerRep = new MessageRep(theAnswer, this); + debugs(93,3, HERE << status()); + theAnswerRep = m; + MessageRep *rep = dynamic_cast(theAnswerRep.get()); + Must(rep); + HttpMsg *answer = rep->raw().header; + if (!theAnswerRep->body()) { + if (!sendingVirgin()) + theMaster.reset(); + sendAnswer(answer); + } else { + Must(!answer->body_pipe); // only host can set body pipes + rep->tieBody(this); + debugs(93,4, HERE << "adapter will produce body" << status()); + // libecap will produce + sendAnswer(answer); + } } -void -Ecap::XactionRep::adaptNewResponse() +// if adapter does not want to consume, we should not either +void +Ecap::XactionRep::adapterWontConsume() { - theAnswer.set(new HttpReply); - theAnswerRep = new MessageRep(theAnswer, this); + if (sendingVirgin()) + dropVirgin("adapterWontConsume"); } -libecap::Message * -Ecap::XactionRep::adapted() +void +Ecap::XactionRep::adapterWillConsume() { - return theAnswerRep; + Must(sendingVirgin()); + theMaster->noteVirginDataAvailable(); // XXX: async } -void -Ecap::XactionRep::useAdapted() +void +Ecap::XactionRep::adapterDoneConsuming() { - theMaster.reset(); - sendAnswer(theAnswer.header); + if (sendingVirgin()) + dropVirgin("adapterDoneConsuming"); +} + +void +Ecap::XactionRep::consumeVirgin(size_type n) +{ + BodyPipePointer &p = theVirginRep.raw().body_pipe; + Must(p != NULL); + const size_t size = static_cast(n); // XXX: check for overflow + const size_t sizeMax = static_cast(p->buf().contentSize()); // TODO: make MemBuf use size_t? + p->consume(min(size, sizeMax)); +} + +void +Ecap::XactionRep::pauseVirginProduction() +{ + // TODO: support production pauses +} + +void +Ecap::XactionRep::resumeVirginProduction() +{ + // TODO: support production pauses +} + +void +Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size) +{ + Must(answer().body_pipe != NULL); + if (size.known()) + answer().body_pipe->setBodySize(size.value()); + // else the piped body size is unknown by default +} + +void +Ecap::XactionRep::appendAdapted(const libecap::Area &area) +{ + BodyPipe *p = answer().body_pipe.getRaw(); + Must(p); + Must(p->putMoreData(area.start, area.size) == area.size); +} + +bool +Ecap::XactionRep::callable() const +{ + return !done(); +} + +void +Ecap::XactionRep::noteAdaptedBodyEnd() +{ + Must(answer().body_pipe != NULL); + answer().body_pipe->clearProducer(true); + if (!sendingVirgin()) + theMaster.reset(); +} + +void +Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d) +{ + debugs(93,3, HERE << "adapter needs time: " << + d.state << '/' << d.progress); + // XXX: set timeout? } void -Ecap::XactionRep::useNone() +Ecap::XactionRep::adaptationAborted() { theMaster.reset(); tellQueryAborted(true); // should eCAP support retries? @@ -140,8 +291,7 @@ Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount bp) void Ecap::XactionRep::noteBodyConsumerAborted(RefCount bp) { - Must(theMaster); - theMaster->noteAdaptedAborted(); + terminateMaster(); } void @@ -161,8 +311,7 @@ Ecap::XactionRep::noteBodyProductionEnded(RefCount bp) void Ecap::XactionRep::noteBodyProducerAborted(RefCount bp) { - Must(theMaster); - theMaster->noteVirginAborted(); + terminateMaster(); } void @@ -173,5 +322,30 @@ Ecap::XactionRep::noteInitiatorAborted() const char *Ecap::XactionRep::status() const { - return Adaptation::Initiate::status(); + static MemBuf buf; + buf.reset(); + + buf.append(" [", 2); + + if (theAnswerRep != NULL) { + MessageRep *answer = dynamic_cast(theAnswerRep.get()); + Must(answer); + const BodyPipePointer &ap = answer->raw().body_pipe; + if (ap != NULL && ap->stillProducing(this)) + buf.append("Ab ", 3); + else + buf.append("A. ", 3); + } + + const BodyPipePointer &vp = theVirginRep.raw().body_pipe; + if (vp != NULL && vp->stillConsuming(this)) + buf.append("Vb ", 3); + else + buf.append("V. ", 3); + + buf.Printf(" ecapx%d]", id); + + buf.terminate(); + + return buf.content(); } diff --git a/src/eCAP/XactionRep.h b/src/eCAP/XactionRep.h index 58a48ad442..ce193afa6f 100644 --- a/src/eCAP/XactionRep.h +++ b/src/eCAP/XactionRep.h @@ -22,7 +22,7 @@ namespace Ecap { xaction that Squid communicates with. One eCAP module may register many eCAP xactions. */ class XactionRep : public Adaptation::Initiate, public libecap::host::Xaction, - public BodyProducer, public BodyConsumer + public BodyConsumer, public BodyProducer { public: XactionRep(Adaptation::Initiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, const Adaptation::ServicePointer &service); @@ -32,15 +32,25 @@ public: void master(const AdapterXaction &aMaster); // establish a link // libecap::host::Xaction API - virtual libecap::Message &virgin(); // request or response - virtual const libecap::Message *cause(); // request for the above response - virtual void useVirgin(); // final answer: no adaptation - virtual void adaptVirgin(); // adapted message starts as virgin - virtual void adaptNewRequest(); // make fresh adapted request - virtual void adaptNewResponse(); // make fresh adapted response - virtual libecap::Message *adapted(); // request or response - virtual void useAdapted(); // final answer: adapted msg is ready - virtual void useNone(); // final answer: no answer + virtual libecap::Message &virgin(); + virtual const libecap::Message &cause(); + virtual libecap::Message &adapted(); + virtual void useVirgin(); + virtual void useAdapted(const libecap::shared_ptr &msg); + virtual void adaptationDelayed(const libecap::Delay &); + virtual void adaptationAborted(); + virtual void adapterWontConsume(); + virtual void adapterWillConsume(); + virtual void adapterDoneConsuming(); + virtual void consumeVirgin(size_type n); + virtual void pauseVirginProduction(); + virtual void resumeVirginProduction(); + virtual void setAdaptedBodySize(const libecap::BodySize &size); + virtual void appendAdapted(const libecap::Area &area); + virtual void noteAdaptedBodyEnd(); + + // libecap::Callable API, via libecap::host::Xaction + virtual bool callable() const; // BodyProducer API virtual void noteMoreBodySpaceAvailable(RefCount bp); @@ -56,21 +66,28 @@ public: // AsyncJob API (via Initiate) virtual void start(); - virtual void swangSong(); + virtual bool doneAll() const; + virtual void swanSong(); virtual const char *status() const; protected: + Adaptation::Message &answer(); + + bool sendingVirgin() const; + void dropVirgin(const char *reason); + bool doneWithAdapted() const; + void terminateMaster(); + void scheduleStop(const char *reason); private: AdapterXaction theMaster; // the actual adaptation xaction we represent - Adaptation::Message theVirgin; - Adaptation::Message theCause; - Adaptation::Message theAnswer; MessageRep theVirginRep; MessageRep *theCauseRep; - MessageRep *theAnswerRep; + + typedef libecap::shared_ptr MessagePtr; + MessagePtr theAnswerRep; CBDATA_CLASS2(XactionRep); };