From: Alex Rousskov Date: Mon, 29 Sep 2008 03:50:32 +0000 (-0600) Subject: Polished code by introducing two explicit and mostly independent states X-Git-Tag: SQUID_3_1_0_1~45^2~11^2~14 X-Git-Url: http://git.ipfire.org/gitweb/?a=commitdiff_plain;h=ea76d91e0b7d12414eb047f145bbf437f4c34d4d;p=thirdparty%2Fsquid.git Polished code by introducing two explicit and mostly independent states (proxying virgin body and proxying adapted body) as well as a flag to check for virgin body content access after the pipe was invalidated. --- diff --git a/src/eCAP/XactionRep.cc b/src/eCAP/XactionRep.cc index 73cf6ba1f8..4c07de8bfa 100644 --- a/src/eCAP/XactionRep.cc +++ b/src/eCAP/XactionRep.cc @@ -18,7 +18,8 @@ Ecap::XactionRep::XactionRep(Adaptation::Initiator *anInitiator, const Adaptation::ServicePointer &aService): AsyncJob("Ecap::XactionRep"), Adaptation::Initiate("Ecap::XactionRep", anInitiator, aService), - theVirginRep(virginHeader), theCauseRep(NULL) + theVirginRep(virginHeader), theCauseRep(NULL), + proxyingVb(opUndecided), proxyingAb(opUndecided), canAccessVb(false) { if (virginCause) theCauseRep = new MessageRep(virginCause); @@ -44,10 +45,10 @@ Ecap::XactionRep::start() { Must(theMaster); - // register as a consumer if there is a body - // we do not actually consume unless the adapter tells us to - BodyPipePointer &p = theVirginRep.raw().body_pipe; - Must(!p || p->setConsumerIfNotLate(this)); + if (theVirginRep.raw().body_pipe != NULL) + canAccessVb = true; /// assumes nobody is consuming; \todo check + else + proxyingVb = opNever; theMaster->start(); } @@ -56,33 +57,28 @@ void Ecap::XactionRep::swanSong() { // clear body_pipes, if any + // this code does not maintain proxying* and canAccessVb states; should it? if (theAnswerRep != NULL) { BodyPipe::Pointer body_pipe = answer().body_pipe; - if (body_pipe != NULL && body_pipe->stillProducing(this)) + if (body_pipe != NULL) { + Must(body_pipe->stillProducing(this)); stopProducingFor(body_pipe, false); + } } { BodyPipe::Pointer body_pipe = theVirginRep.raw().body_pipe; - if (body_pipe != NULL && body_pipe->stillConsuming(this)) + if (body_pipe != NULL) { + Must(body_pipe->stillConsuming(this)); stopConsumingFrom(body_pipe); + } } terminateMaster(); Adaptation::Initiate::swanSong(); } -void -Ecap::XactionRep::terminateMaster() -{ - if (theMaster) { - AdapterXaction x = theMaster; - theMaster.reset(); - x->stop(); - } -} - libecap::Message & Ecap::XactionRep::virgin() { @@ -111,74 +107,64 @@ Ecap::XactionRep::answer() return rep->raw(); } -bool -Ecap::XactionRep::doneAll() const +void +Ecap::XactionRep::terminateMaster() { if (theMaster) { - if (!doneWithAdapted() || sendingVirgin()) - return false; - } - - return Adaptation::Initiate::doneAll(); -} - -bool -Ecap::XactionRep::doneWithAdapted() const -{ - if (!theAnswerRep) - return false; - - // we are not done if we are producing - MessageRep *answer = dynamic_cast(theAnswerRep.get()); - Must(answer); - const BodyPipePointer &ap = answer->raw().body_pipe; - return !ap || !ap->stillProducing(this); + AdapterXaction x = theMaster; + theMaster.reset(); + x->stop(); + } } -// are we still sending virgin body to theMaster? bool -Ecap::XactionRep::sendingVirgin() const +Ecap::XactionRep::doneAll() const { - // we are sending if we are consuming - const BodyPipePointer &vp = theVirginRep.raw().body_pipe; - return vp != NULL && vp->stillConsuming(this); + return proxyingVb >= opComplete && proxyingAb >= opComplete && + Adaptation::Initiate::doneAll(); } -// stops sending virgin to theMaster and enables auto-consumption +// stops receiving virgin and enables auto-consumption void Ecap::XactionRep::dropVirgin(const char *reason) { - debugs(93,4, HERE << "because " << reason); + debugs(93,4, HERE << "because " << reason << "; status:" << status()); + Must(proxyingVb = opOn); BodyPipePointer &p = theVirginRep.raw().body_pipe; Must(p != NULL); Must(p->stillConsuming(this)); stopConsumingFrom(p); p->enableAutoConsumption(); - if (doneWithAdapted()) - theMaster.reset(); + proxyingVb = opComplete; + canAccessVb = false; + + // called from adapter handler so does not inform adapter } void Ecap::XactionRep::useVirgin() { debugs(93,3, HERE << status()); + Must(proxyingAb == opUndecided); + proxyingAb = opNever; - // XXX: check state everywhere - Must(!theAnswerRep); - theMaster.reset(); - - HttpMsg *answer = theVirginRep.raw().header->clone(); - Must(!theVirginRep.raw().body_pipe == !answer->body_pipe); // check clone() - - if (answer->body_pipe != NULL) { + BodyPipePointer &vbody_pipe = theVirginRep.raw().body_pipe; + if (proxyingVb == opOn) { + Must(vbody_pipe->stillConsuming(this)); // if libecap consumed, we cannot shortcircuit - Must(!answer->body_pipe->consumedSize()); - Must(answer->body_pipe->stillConsuming(this)); - stopConsumingFrom(answer->body_pipe); - } - - sendAnswer(answer); + Must(!vbody_pipe->consumedSize()); + stopConsumingFrom(vbody_pipe); + canAccessVb = false; + proxyingVb = opComplete; + } else + if (proxyingVb == opUndecided) + proxyingVb = opNever; + + HttpMsg *clone = theVirginRep.raw().header->clone(); + // check that clone() copies the pipe so that we do not have to + Must(!theVirginRep.raw().header->body_pipe == !clone->body_pipe); + sendAnswer(clone); Must(done()); } @@ -186,57 +172,76 @@ void Ecap::XactionRep::useAdapted(const libecap::shared_ptr &m) { debugs(93,3, HERE << status()); + Must(m); theAnswerRep = m; - MessageRep *rep = dynamic_cast(theAnswerRep.get()); - Must(rep); - HttpMsg *answer = rep->raw().header; - if (!theAnswerRep->body()) { - if (!sendingVirgin()) - theMaster.reset(); - sendAnswer(answer); - } else { - Must(!answer->body_pipe); // only host can set body pipes - rep->tieBody(this); + Must(proxyingAb == opUndecided); + + HttpMsg *msg = answer().header; + if (!theAnswerRep->body()) { // final, bodyless answer + proxyingAb = opNever; + sendAnswer(msg); + } else { // got answer headers but need to handle body + proxyingAb = opOn; + Must(!msg->body_pipe); // only host can set body pipes + MessageRep *rep = dynamic_cast(theAnswerRep.get()); + Must(rep); + rep->tieBody(this); // sets us as a producer + Must(msg->body_pipe != NULL); // check tieBody + + sendAnswer(msg); + debugs(93,4, HERE << "adapter will produce body" << status()); theMaster->abMake(); // libecap will produce - sendAnswer(answer); } } void Ecap::XactionRep::vbIgnore() { + Must(proxyingVb == opUndecided); // if adapter does not need vb, we do not need to send it - if (sendingVirgin()) - dropVirgin("vbIgnore"); + dropVirgin("vbIgnore"); + Must(proxyingVb == opNever); } void Ecap::XactionRep::vbMake() { - Must(sendingVirgin()); - theMaster->noteVbContentAvailable(); // XXX: async + Must(proxyingVb == opUndecided); + BodyPipePointer &p = theVirginRep.raw().body_pipe; + Must(p != NULL); + Must(p->setConsumerIfNotLate(this)); // to make vb, we must receive vb + proxyingVb = opOn; } void Ecap::XactionRep::vbStopMaking() { - // if adapter does not need vb, we do not need to send it - if (sendingVirgin()) - dropVirgin("vbIgnore"); + // if adapter does not need vb, we do not need to receive it + if (proxyingVb == opOn) + dropVirgin("vbStopMaking"); + Must(proxyingVb == opComplete); } void Ecap::XactionRep::vbMakeMore() { - Must(sendingVirgin() && !theVirginRep.raw().body_pipe->exhausted()); + Must(proxyingVb == opOn); // cannot make more if done proxying + // we cannot guarantee more vb, but we can check that there is a chance + Must(!theVirginRep.raw().body_pipe->exhausted()); } libecap::Area Ecap::XactionRep::vbContent(libecap::off_type o, libecap::size_type s) { + Must(canAccessVb); + // We may not be proxyingVb yet. It should be OK, but see vbContentShift(). + const BodyPipePointer &p = theVirginRep.raw().body_pipe; - const size_t haveSize = static_cast(p->buf().contentSize()); // TODO: make MemBuf use size_t? + Must(p != NULL); + + // TODO: make MemBuf use size_t? + const size_t haveSize = static_cast(p->buf().contentSize()); // convert to Squid types; XXX: check for overflow const uint64_t offset = static_cast(o); @@ -246,9 +251,6 @@ Ecap::XactionRep::vbContent(libecap::off_type o, libecap::size_type s) const size_t size = s == libecap::nsize ? haveSize - offset : static_cast(s); - if (!size) - return libecap::Area(); - // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src) return libecap::Area::FromTempBuffer(p->buf().content() + offset, min(static_cast(haveSize - offset), size)); @@ -257,6 +259,11 @@ Ecap::XactionRep::vbContent(libecap::off_type o, libecap::size_type s) void Ecap::XactionRep::vbContentShift(libecap::size_type n) { + Must(canAccessVb); + // We may not be proxyingVb yet. It should be OK now, but if BodyPipe + // consume() requirements change, we would have to return empty vbContent + // until the adapter registers as a consumer + BodyPipePointer &p = theVirginRep.raw().body_pipe; Must(p != NULL); const size_t size = static_cast(n); // XXX: check for overflow @@ -267,19 +274,19 @@ Ecap::XactionRep::vbContentShift(libecap::size_type n) void Ecap::XactionRep::noteAbContentDone(bool atEnd) { - Must(!doneWithAdapted()); - answer().body_pipe->clearProducer(atEnd); - if (!sendingVirgin()) - theMaster.reset(); + Must(proxyingAb == opOn); + stopProducingFor(answer().body_pipe, atEnd); + proxyingAb = opComplete; } void Ecap::XactionRep::noteAbContentAvailable() { + Must(proxyingAb == opOn); moveAbContent(); } -#if 0 +#if 0 /* XXX: implement */ void Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size) { @@ -301,8 +308,8 @@ Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d) void Ecap::XactionRep::adaptationAborted() { - theMaster.reset(); tellQueryAborted(true); // should eCAP support retries? + mustStop("adaptationAborted"); } bool @@ -314,19 +321,24 @@ Ecap::XactionRep::callable() const void Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount bp) { - if (!doneWithAdapted()) - moveAbContent(); + Must(proxyingAb == opOn); + moveAbContent(); } void Ecap::XactionRep::noteBodyConsumerAborted(RefCount bp) { - terminateMaster(); + Must(proxyingAb == opOn); + stopProducingFor(answer().body_pipe, false); + Must(theMaster); + theMaster->abStopMaking(); + proxyingAb = opComplete; } void Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount bp) { + Must(proxyingVb == opOn); Must(theMaster); theMaster->noteVbContentAvailable(); } @@ -334,19 +346,19 @@ Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount bp) void Ecap::XactionRep::noteBodyProductionEnded(RefCount bp) { + Must(proxyingVb == opOn); Must(theMaster); theMaster->noteVbContentDone(true); - if (doneWithAdapted()) - theMaster.reset(); + proxyingVb = opComplete; } void Ecap::XactionRep::noteBodyProducerAborted(RefCount bp) { + Must(proxyingVb == opOn); Must(theMaster); theMaster->noteVbContentDone(false); - if (doneWithAdapted()) - theMaster.reset(); + proxyingVb = opComplete; } void @@ -359,7 +371,7 @@ Ecap::XactionRep::noteInitiatorAborted() void Ecap::XactionRep::moveAbContent() { - Must(!doneWithAdapted()); + Must(proxyingAb == opOn); const libecap::Area c = theMaster->abContent(0, libecap::nsize); debugs(93,5, HERE << " up to " << c.size << " bytes"); if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size)) @@ -374,23 +386,25 @@ Ecap::XactionRep::status() const buf.append(" [", 2); - const BodyPipePointer &vp = theVirginRep.raw().body_pipe; - if (vp != NULL && vp->stillConsuming(this)) { - buf.append("Vb", 2); - buf.append(vp->status(), strlen(vp->status())); // XXX + if (proxyingVb == opOn) { + const BodyPipePointer &vp = theVirginRep.raw().body_pipe; + if (!canAccessVb) + buf.append("x", 1); + if (vp != NULL && vp->stillConsuming(this)) { + buf.append("Vb", 2); + buf.append(vp->status(), strlen(vp->status())); // XXX + } else + buf.append("V.", 2); } - else - buf.append("V.", 2); - if (theAnswerRep != NULL) { - MessageRep *answer = dynamic_cast(theAnswerRep.get()); - Must(answer); - const BodyPipePointer &ap = answer->raw().body_pipe; + if (proxyingAb == opOn) { + MessageRep *rep = dynamic_cast(theAnswerRep.get()); + Must(rep); + const BodyPipePointer &ap = rep->raw().body_pipe; if (ap != NULL && ap->stillProducing(this)) { buf.append(" Ab", 3); buf.append(ap->status(), strlen(ap->status())); // XXX - } - else + } else buf.append(" A.", 3); } diff --git a/src/eCAP/XactionRep.h b/src/eCAP/XactionRep.h index 8ea0c7d02b..f9fe27c05b 100644 --- a/src/eCAP/XactionRep.h +++ b/src/eCAP/XactionRep.h @@ -72,10 +72,7 @@ public: protected: Adaptation::Message &answer(); - bool sendingVirgin() const; void dropVirgin(const char *reason); - bool doneWithAdapted() const; - void moveAbContent(); void terminateMaster(); @@ -90,6 +87,11 @@ private: typedef libecap::shared_ptr MessagePtr; MessagePtr theAnswerRep; + typedef enum { opUndecided, opOn, opComplete, opNever } OperationState; + OperationState proxyingVb; // delivering virgin body from core to adapter + OperationState proxyingAb; // delivering adapted body from adapter to core + bool canAccessVb; // virgin BodyPipe content is accessible + CBDATA_CLASS2(XactionRep); };