const Adaptation::ServicePointer &aService):
AsyncJob("Ecap::XactionRep"),
Adaptation::Initiate("Ecap::XactionRep", anInitiator, aService),
- theVirginRep(virginHeader), theCauseRep(NULL)
+ theVirginRep(virginHeader), theCauseRep(NULL),
+ proxyingVb(opUndecided), proxyingAb(opUndecided), canAccessVb(false)
{
if (virginCause)
theCauseRep = new MessageRep(virginCause);
{
Must(theMaster);
- // register as a consumer if there is a body
- // we do not actually consume unless the adapter tells us to
- BodyPipePointer &p = theVirginRep.raw().body_pipe;
- Must(!p || p->setConsumerIfNotLate(this));
+ if (theVirginRep.raw().body_pipe != NULL)
+ canAccessVb = true; /// assumes nobody is consuming; \todo check
+ else
+ proxyingVb = opNever;
theMaster->start();
}
Ecap::XactionRep::swanSong()
{
// clear body_pipes, if any
+ // this code does not maintain proxying* and canAccessVb states; should it?
if (theAnswerRep != NULL) {
BodyPipe::Pointer body_pipe = answer().body_pipe;
- if (body_pipe != NULL && body_pipe->stillProducing(this))
+ if (body_pipe != NULL) {
+ Must(body_pipe->stillProducing(this));
stopProducingFor(body_pipe, false);
+ }
}
{
BodyPipe::Pointer body_pipe = theVirginRep.raw().body_pipe;
- if (body_pipe != NULL && body_pipe->stillConsuming(this))
+ if (body_pipe != NULL) {
+ Must(body_pipe->stillConsuming(this));
stopConsumingFrom(body_pipe);
+ }
}
terminateMaster();
Adaptation::Initiate::swanSong();
}
-void
-Ecap::XactionRep::terminateMaster()
-{
- if (theMaster) {
- AdapterXaction x = theMaster;
- theMaster.reset();
- x->stop();
- }
-}
-
libecap::Message &
Ecap::XactionRep::virgin()
{
return rep->raw();
}
-bool
-Ecap::XactionRep::doneAll() const
+void
+Ecap::XactionRep::terminateMaster()
{
if (theMaster) {
- if (!doneWithAdapted() || sendingVirgin())
- return false;
- }
-
- return Adaptation::Initiate::doneAll();
-}
-
-bool
-Ecap::XactionRep::doneWithAdapted() const
-{
- if (!theAnswerRep)
- return false;
-
- // we are not done if we are producing
- MessageRep *answer = dynamic_cast<MessageRep*>(theAnswerRep.get());
- Must(answer);
- const BodyPipePointer &ap = answer->raw().body_pipe;
- return !ap || !ap->stillProducing(this);
+ AdapterXaction x = theMaster;
+ theMaster.reset();
+ x->stop();
+ }
}
-// are we still sending virgin body to theMaster?
bool
-Ecap::XactionRep::sendingVirgin() const
+Ecap::XactionRep::doneAll() const
{
- // we are sending if we are consuming
- const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
- return vp != NULL && vp->stillConsuming(this);
+ return proxyingVb >= opComplete && proxyingAb >= opComplete &&
+ Adaptation::Initiate::doneAll();
}
-// stops sending virgin to theMaster and enables auto-consumption
+// stops receiving virgin and enables auto-consumption
void
Ecap::XactionRep::dropVirgin(const char *reason)
{
- debugs(93,4, HERE << "because " << reason);
+ debugs(93,4, HERE << "because " << reason << "; status:" << status());
+ Must(proxyingVb = opOn);
BodyPipePointer &p = theVirginRep.raw().body_pipe;
Must(p != NULL);
Must(p->stillConsuming(this));
stopConsumingFrom(p);
p->enableAutoConsumption();
- if (doneWithAdapted())
- theMaster.reset();
+ proxyingVb = opComplete;
+ canAccessVb = false;
+
+ // called from adapter handler so does not inform adapter
}
void
Ecap::XactionRep::useVirgin()
{
debugs(93,3, HERE << status());
+ Must(proxyingAb == opUndecided);
+ proxyingAb = opNever;
- // XXX: check state everywhere
- Must(!theAnswerRep);
- theMaster.reset();
-
- HttpMsg *answer = theVirginRep.raw().header->clone();
- Must(!theVirginRep.raw().body_pipe == !answer->body_pipe); // check clone()
-
- if (answer->body_pipe != NULL) {
+ BodyPipePointer &vbody_pipe = theVirginRep.raw().body_pipe;
+ if (proxyingVb == opOn) {
+ Must(vbody_pipe->stillConsuming(this));
// if libecap consumed, we cannot shortcircuit
- Must(!answer->body_pipe->consumedSize());
- Must(answer->body_pipe->stillConsuming(this));
- stopConsumingFrom(answer->body_pipe);
- }
-
- sendAnswer(answer);
+ Must(!vbody_pipe->consumedSize());
+ stopConsumingFrom(vbody_pipe);
+ canAccessVb = false;
+ proxyingVb = opComplete;
+ } else
+ if (proxyingVb == opUndecided)
+ proxyingVb = opNever;
+
+ HttpMsg *clone = theVirginRep.raw().header->clone();
+ // check that clone() copies the pipe so that we do not have to
+ Must(!theVirginRep.raw().header->body_pipe == !clone->body_pipe);
+ sendAnswer(clone);
Must(done());
}
Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
{
debugs(93,3, HERE << status());
+ Must(m);
theAnswerRep = m;
- MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
- Must(rep);
- HttpMsg *answer = rep->raw().header;
- if (!theAnswerRep->body()) {
- if (!sendingVirgin())
- theMaster.reset();
- sendAnswer(answer);
- } else {
- Must(!answer->body_pipe); // only host can set body pipes
- rep->tieBody(this);
+ Must(proxyingAb == opUndecided);
+
+ HttpMsg *msg = answer().header;
+ if (!theAnswerRep->body()) { // final, bodyless answer
+ proxyingAb = opNever;
+ sendAnswer(msg);
+ } else { // got answer headers but need to handle body
+ proxyingAb = opOn;
+ Must(!msg->body_pipe); // only host can set body pipes
+ MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
+ Must(rep);
+ rep->tieBody(this); // sets us as a producer
+ Must(msg->body_pipe != NULL); // check tieBody
+
+ sendAnswer(msg);
+
debugs(93,4, HERE << "adapter will produce body" << status());
theMaster->abMake(); // libecap will produce
- sendAnswer(answer);
}
}
void
Ecap::XactionRep::vbIgnore()
{
+ Must(proxyingVb == opUndecided);
// if adapter does not need vb, we do not need to send it
- if (sendingVirgin())
- dropVirgin("vbIgnore");
+ dropVirgin("vbIgnore");
+ Must(proxyingVb == opNever);
}
void
Ecap::XactionRep::vbMake()
{
- Must(sendingVirgin());
- theMaster->noteVbContentAvailable(); // XXX: async
+ Must(proxyingVb == opUndecided);
+ BodyPipePointer &p = theVirginRep.raw().body_pipe;
+ Must(p != NULL);
+ Must(p->setConsumerIfNotLate(this)); // to make vb, we must receive vb
+ proxyingVb = opOn;
}
void
Ecap::XactionRep::vbStopMaking()
{
- // if adapter does not need vb, we do not need to send it
- if (sendingVirgin())
- dropVirgin("vbIgnore");
+ // if adapter does not need vb, we do not need to receive it
+ if (proxyingVb == opOn)
+ dropVirgin("vbStopMaking");
+ Must(proxyingVb == opComplete);
}
void
Ecap::XactionRep::vbMakeMore()
{
- Must(sendingVirgin() && !theVirginRep.raw().body_pipe->exhausted());
+ Must(proxyingVb == opOn); // cannot make more if done proxying
+ // we cannot guarantee more vb, but we can check that there is a chance
+ Must(!theVirginRep.raw().body_pipe->exhausted());
}
libecap::Area
Ecap::XactionRep::vbContent(libecap::off_type o, libecap::size_type s)
{
+ Must(canAccessVb);
+ // We may not be proxyingVb yet. It should be OK, but see vbContentShift().
+
const BodyPipePointer &p = theVirginRep.raw().body_pipe;
- const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
+ Must(p != NULL);
+
+ // TODO: make MemBuf use size_t?
+ const size_t haveSize = static_cast<size_t>(p->buf().contentSize());
// convert to Squid types; XXX: check for overflow
const uint64_t offset = static_cast<uint64_t>(o);
const size_t size = s == libecap::nsize ?
haveSize - offset : static_cast<size_t>(s);
- if (!size)
- return libecap::Area();
-
// XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
return libecap::Area::FromTempBuffer(p->buf().content() + offset,
min(static_cast<size_t>(haveSize - offset), size));
void
Ecap::XactionRep::vbContentShift(libecap::size_type n)
{
+ Must(canAccessVb);
+ // We may not be proxyingVb yet. It should be OK now, but if BodyPipe
+ // consume() requirements change, we would have to return empty vbContent
+ // until the adapter registers as a consumer
+
BodyPipePointer &p = theVirginRep.raw().body_pipe;
Must(p != NULL);
const size_t size = static_cast<size_t>(n); // XXX: check for overflow
void
Ecap::XactionRep::noteAbContentDone(bool atEnd)
{
- Must(!doneWithAdapted());
- answer().body_pipe->clearProducer(atEnd);
- if (!sendingVirgin())
- theMaster.reset();
+ Must(proxyingAb == opOn);
+ stopProducingFor(answer().body_pipe, atEnd);
+ proxyingAb = opComplete;
}
void
Ecap::XactionRep::noteAbContentAvailable()
{
+ Must(proxyingAb == opOn);
moveAbContent();
}
-#if 0
+#if 0 /* XXX: implement */
void
Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
{
void
Ecap::XactionRep::adaptationAborted()
{
- theMaster.reset();
tellQueryAborted(true); // should eCAP support retries?
+ mustStop("adaptationAborted");
}
bool
void
Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
{
- if (!doneWithAdapted())
- moveAbContent();
+ Must(proxyingAb == opOn);
+ moveAbContent();
}
void
Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
{
- terminateMaster();
+ Must(proxyingAb == opOn);
+ stopProducingFor(answer().body_pipe, false);
+ Must(theMaster);
+ theMaster->abStopMaking();
+ proxyingAb = opComplete;
}
void
Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
{
+ Must(proxyingVb == opOn);
Must(theMaster);
theMaster->noteVbContentAvailable();
}
void
Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
{
+ Must(proxyingVb == opOn);
Must(theMaster);
theMaster->noteVbContentDone(true);
- if (doneWithAdapted())
- theMaster.reset();
+ proxyingVb = opComplete;
}
void
Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
{
+ Must(proxyingVb == opOn);
Must(theMaster);
theMaster->noteVbContentDone(false);
- if (doneWithAdapted())
- theMaster.reset();
+ proxyingVb = opComplete;
}
void
void
Ecap::XactionRep::moveAbContent()
{
- Must(!doneWithAdapted());
+ Must(proxyingAb == opOn);
const libecap::Area c = theMaster->abContent(0, libecap::nsize);
debugs(93,5, HERE << " up to " << c.size << " bytes");
if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
buf.append(" [", 2);
- const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
- if (vp != NULL && vp->stillConsuming(this)) {
- buf.append("Vb", 2);
- buf.append(vp->status(), strlen(vp->status())); // XXX
+ if (proxyingVb == opOn) {
+ const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
+ if (!canAccessVb)
+ buf.append("x", 1);
+ if (vp != NULL && vp->stillConsuming(this)) {
+ buf.append("Vb", 2);
+ buf.append(vp->status(), strlen(vp->status())); // XXX
+ } else
+ buf.append("V.", 2);
}
- else
- buf.append("V.", 2);
- if (theAnswerRep != NULL) {
- MessageRep *answer = dynamic_cast<MessageRep*>(theAnswerRep.get());
- Must(answer);
- const BodyPipePointer &ap = answer->raw().body_pipe;
+ if (proxyingAb == opOn) {
+ MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
+ Must(rep);
+ const BodyPipePointer &ap = rep->raw().body_pipe;
if (ap != NULL && ap->stillProducing(this)) {
buf.append(" Ab", 3);
buf.append(ap->status(), strlen(ap->status())); // XXX
- }
- else
+ } else
buf.append(" A.", 3);
}