]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/adaptation/ecap/XactionRep.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / adaptation / ecap / XactionRep.cc
index 055906d3a4d57a863af4d97dbb8f3d15cacf2f08..7ee3f1d9e3dcbfa6d516575b92a21d2fb389ef3b 100644 (file)
@@ -1,28 +1,57 @@
+/*
+ * DEBUG: section 93    eCAP Interface
+ */
 #include "squid.h"
 #include <libecap/common/area.h>
 #include <libecap/common/delay.h>
+#include <libecap/common/named_values.h>
+#include <libecap/common/names.h>
 #include <libecap/adapter/xaction.h>
-#include "TextException.h"
-#include "HttpRequest.h"
-#include "HttpReply.h"
+#include "adaptation/Answer.h"
+#include "adaptation/ecap/Config.h"
 #include "adaptation/ecap/XactionRep.h"
+#include "adaptation/Initiator.h"
+#include "base/TextException.h"
+#include "HttpReply.h"
+#include "HttpRequest.h"
+#include "SquidTime.h"
+
+CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep, XactionRep);
+
+/// a libecap Visitor for converting adapter transaction options to HttpHeader
+class OptionsExtractor: public libecap::NamedValueVisitor
+{
+public:
+    typedef libecap::Name Name;
+    typedef libecap::Area Area;
 
-CBDATA_NAMESPACED_CLASS_INIT(Ecap::XactionRep, XactionRep);
+    OptionsExtractor(HttpHeader &aMeta): meta(aMeta) {}
 
+    // libecap::NamedValueVisitor API
+    virtual void visit(const Name &name, const Area &value) {
+        meta.putExt(name.image().c_str(), value.toString().c_str());
+    }
+
+    HttpHeader &meta; ///< where to put extracted options
+};
 
-Ecap::XactionRep::XactionRep(Adaptation::Initiator *anInitiator,
-                             HttpMsg *virginHeader, HttpRequest *virginCause,
-                             const Adaptation::ServicePointer &aService):
-        AsyncJob("Ecap::XactionRep"),
-        Adaptation::Initiate("Ecap::XactionRep", anInitiator, aService),
+Adaptation::Ecap::XactionRep::XactionRep(
+    HttpMsg *virginHeader, HttpRequest *virginCause,
+    const Adaptation::ServicePointer &aService):
+        AsyncJob("Adaptation::Ecap::XactionRep"),
+        Adaptation::Initiate("Adaptation::Ecap::XactionRep"),
+        theService(aService),
         theVirginRep(virginHeader), theCauseRep(NULL),
-        proxyingVb(opUndecided), proxyingAb(opUndecided), canAccessVb(false)
+        makingVb(opUndecided), proxyingAb(opUndecided),
+        adaptHistoryId(-1),
+        vbProductionFinished(false),
+        abProductionFinished(false), abProductionAtEnd(false)
 {
     if (virginCause)
         theCauseRep = new MessageRep(virginCause);
 }
 
-Ecap::XactionRep::~XactionRep()
+Adaptation::Ecap::XactionRep::~XactionRep()
 {
     assert(!theMaster);
     delete theCauseRep;
@@ -30,28 +59,192 @@ Ecap::XactionRep::~XactionRep()
 }
 
 void
-Ecap::XactionRep::master(const AdapterXaction &x)
+Adaptation::Ecap::XactionRep::master(const AdapterXaction &x)
 {
     Must(!theMaster);
     Must(x != NULL);
     theMaster = x;
 }
 
+Adaptation::Service &
+Adaptation::Ecap::XactionRep::service()
+{
+    Must(theService != NULL);
+    return *theService;
+}
+
+const libecap::Area
+Adaptation::Ecap::XactionRep::option(const libecap::Name &name) const
+{
+    if (name == libecap::metaClientIp)
+        return clientIpValue();
+    if (name == libecap::metaUserName)
+        return usernameValue();
+    if (Adaptation::Config::masterx_shared_name && name == Adaptation::Config::masterx_shared_name)
+        return masterxSharedValue(name);
+
+    // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
+
+    // If the name is unknown, metaValue returns an emtpy area
+    return metaValue(name);
+}
+
 void
-Ecap::XactionRep::start()
+Adaptation::Ecap::XactionRep::visitEachOption(libecap::NamedValueVisitor &visitor) const
+{
+    if (const libecap::Area value = clientIpValue())
+        visitor.visit(libecap::metaClientIp, value);
+    if (const libecap::Area value = usernameValue())
+        visitor.visit(libecap::metaUserName, value);
+
+    if (Adaptation::Config::masterx_shared_name) {
+        const libecap::Name name(Adaptation::Config::masterx_shared_name);
+        if (const libecap::Area value = masterxSharedValue(name))
+            visitor.visit(name, value);
+    }
+
+    visitEachMetaHeader(visitor);
+
+    // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
+}
+
+const libecap::Area
+Adaptation::Ecap::XactionRep::clientIpValue() const
+{
+    const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
+                                 theCauseRep->raw().header : theVirginRep.raw().header);
+    Must(request);
+    // TODO: move this logic into HttpRequest::clientIp(bool) and
+    // HttpRequest::clientIpString(bool) and reuse everywhere
+    if (TheConfig.send_client_ip && request) {
+        Ip::Address client_addr;
+#if FOLLOW_X_FORWARDED_FOR
+        if (TheConfig.use_indirect_client) {
+            client_addr = request->indirect_client_addr;
+        } else
+#endif
+            client_addr = request->client_addr;
+        if (!client_addr.isAnyAddr() && !client_addr.isNoAddr()) {
+            char ntoabuf[MAX_IPSTRLEN] = "";
+            client_addr.toStr(ntoabuf,MAX_IPSTRLEN);
+            return libecap::Area::FromTempBuffer(ntoabuf, strlen(ntoabuf));
+        }
+    }
+    return libecap::Area();
+}
+
+const libecap::Area
+Adaptation::Ecap::XactionRep::usernameValue() const
+{
+#if USE_AUTH
+    const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
+                                 theCauseRep->raw().header : theVirginRep.raw().header);
+    Must(request);
+    if (request->auth_user_request != NULL) {
+        if (char const *name = request->auth_user_request->username())
+            return libecap::Area::FromTempBuffer(name, strlen(name));
+        else if (request->extacl_user.defined() && request->extacl_user.size())
+            return libecap::Area::FromTempBuffer(request->extacl_user.rawBuf(),
+                                                 request->extacl_user.size());
+    }
+#endif
+    return libecap::Area();
+}
+
+const libecap::Area
+Adaptation::Ecap::XactionRep::masterxSharedValue(const libecap::Name &name) const
+{
+    const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
+                                 theCauseRep->raw().header : theVirginRep.raw().header);
+    Must(request);
+    if (name.known()) { // must check to avoid empty names matching unset cfg
+        Adaptation::History::Pointer ah = request->adaptHistory(false);
+        if (ah != NULL) {
+            String name, value;
+            if (ah->getXxRecord(name, value))
+                return libecap::Area::FromTempBuffer(value.rawBuf(), value.size());
+        }
+    }
+    return libecap::Area();
+}
+
+const libecap::Area
+Adaptation::Ecap::XactionRep::metaValue(const libecap::Name &name) const
+{
+    HttpRequest *request = dynamic_cast<HttpRequest*>(theCauseRep ?
+                           theCauseRep->raw().header : theVirginRep.raw().header);
+    Must(request);
+    HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
+
+    if (name.known()) { // must check to avoid empty names matching unset cfg
+        typedef Notes::iterator ACAMLI;
+        for (ACAMLI i = Adaptation::Config::metaHeaders.begin(); i != Adaptation::Config::metaHeaders.end(); ++i) {
+            if (name == (*i)->key.termedBuf()) {
+                if (const char *value = (*i)->match(request, reply))
+                    return libecap::Area::FromTempString(value);
+                else
+                    return libecap::Area();
+            }
+        }
+    }
+
+    return libecap::Area();
+}
+
+void
+Adaptation::Ecap::XactionRep::visitEachMetaHeader(libecap::NamedValueVisitor &visitor) const
+{
+    HttpRequest *request = dynamic_cast<HttpRequest*>(theCauseRep ?
+                           theCauseRep->raw().header : theVirginRep.raw().header);
+    Must(request);
+    HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
+
+    typedef Notes::iterator ACAMLI;
+    for (ACAMLI i = Adaptation::Config::metaHeaders.begin(); i != Adaptation::Config::metaHeaders.end(); ++i) {
+        const char *v = (*i)->match(request, reply);
+        if (v) {
+            const libecap::Name name((*i)->key.termedBuf());
+            const libecap::Area value = libecap::Area::FromTempString(v);
+            visitor.visit(name, value);
+        }
+    }
+}
+
+void
+Adaptation::Ecap::XactionRep::start()
 {
     Must(theMaster);
 
-    if (theVirginRep.raw().body_pipe != NULL)
-        canAccessVb = true; /// assumes nobody is consuming; \todo check
-    else
-        proxyingVb = opNever;
+    if (!theVirginRep.raw().body_pipe)
+        makingVb = opNever; // there is nothing to deliver
+
+    HttpRequest *request = dynamic_cast<HttpRequest*> (theCauseRep ?
+                           theCauseRep->raw().header : theVirginRep.raw().header);
+    Must(request);
+
+    HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
+
+    Adaptation::History::Pointer ah = request->adaptLogHistory();
+    if (ah != NULL) {
+        // retrying=false because ecap never retries transactions
+        adaptHistoryId = ah->recordXactStart(service().cfg().key, current_time, false);
+        typedef Notes::iterator ACAMLI;
+        for (ACAMLI i = Adaptation::Config::metaHeaders.begin(); i != Adaptation::Config::metaHeaders.end(); ++i) {
+            const char *v = (*i)->match(request, reply);
+            if (v) {
+                if (ah->metaHeaders == NULL)
+                    ah->metaHeaders = new NotePairs();
+                if (!ah->metaHeaders->hasPair((*i)->key.termedBuf(), v))
+                    ah->metaHeaders->add((*i)->key.termedBuf(), v);
+            }
+        }
+    }
 
     theMaster->start();
 }
 
 void
-Ecap::XactionRep::swanSong()
+Adaptation::Ecap::XactionRep::swanSong()
 {
     // clear body_pipes, if any
     // this code does not maintain proxying* and canAccessVb states; should it?
@@ -64,40 +257,44 @@ Ecap::XactionRep::swanSong()
         }
     }
 
-    {
-        BodyPipe::Pointer body_pipe = theVirginRep.raw().body_pipe;
-        if (body_pipe != NULL) {
-            Must(body_pipe->stillConsuming(this));
-            stopConsumingFrom(body_pipe);
-        }
-    }
+    BodyPipe::Pointer &body_pipe = theVirginRep.raw().body_pipe;
+    if (body_pipe != NULL && body_pipe->stillConsuming(this))
+        stopConsumingFrom(body_pipe);
 
     terminateMaster();
+
+    const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
+                                 theCauseRep->raw().header : theVirginRep.raw().header);
+    Must(request);
+    Adaptation::History::Pointer ah = request->adaptLogHistory();
+    if (ah != NULL && adaptHistoryId >= 0)
+        ah->recordXactFinish(adaptHistoryId);
+
     Adaptation::Initiate::swanSong();
 }
 
 libecap::Message &
-Ecap::XactionRep::virgin()
+Adaptation::Ecap::XactionRep::virgin()
 {
     return theVirginRep;
 }
 
 const libecap::Message &
-Ecap::XactionRep::cause()
+Adaptation::Ecap::XactionRep::cause()
 {
     Must(theCauseRep != NULL);
     return *theCauseRep;
 }
 
 libecap::Message &
-Ecap::XactionRep::adapted()
+Adaptation::Ecap::XactionRep::adapted()
 {
     Must(theAnswerRep != NULL);
     return *theAnswerRep;
 }
 
 Adaptation::Message &
-Ecap::XactionRep::answer()
+Adaptation::Ecap::XactionRep::answer()
 {
     MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
     Must(rep);
@@ -105,7 +302,7 @@ Ecap::XactionRep::answer()
 }
 
 void
-Ecap::XactionRep::terminateMaster()
+Adaptation::Ecap::XactionRep::terminateMaster()
 {
     if (theMaster) {
         AdapterXaction x = theMaster;
@@ -115,62 +312,78 @@ Ecap::XactionRep::terminateMaster()
 }
 
 bool
-Ecap::XactionRep::doneAll() const
+Adaptation::Ecap::XactionRep::doneAll() const
 {
-    return proxyingVb >= opComplete && proxyingAb >= opComplete &&
+    return makingVb >= opComplete && proxyingAb >= opComplete &&
            Adaptation::Initiate::doneAll();
 }
 
-// stops receiving virgin and enables auto-consumption
+// stops receiving virgin and enables auto-consumption, dropping any vb bytes
+void
+Adaptation::Ecap::XactionRep::sinkVb(const char *reason)
+{
+    debugs(93,4, HERE << "sink for " << reason << "; status:" << status());
+
+    // we reset raw().body_pipe when we are done, so use this one for checking
+    const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
+    if (permPipe != NULL)
+        permPipe->enableAutoConsumption();
+
+    forgetVb(reason);
+}
+
+// stops receiving virgin but preserves it for others to use
 void
-Ecap::XactionRep::dropVirgin(const char *reason)
+Adaptation::Ecap::XactionRep::preserveVb(const char *reason)
 {
-    debugs(93,4, HERE << "because " << reason << "; status:" << status());
-    Must(proxyingVb = opOn);
+    debugs(93,4, HERE << "preserve for " << reason << "; status:" << status());
+
+    // we reset raw().body_pipe when we are done, so use this one for checking
+    const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
+    if (permPipe != NULL) {
+        // if libecap consumed, we cannot preserve
+        Must(!permPipe->consumedSize());
+    }
+
+    forgetVb(reason);
+}
+
+// disassociates us from vb; the last step of sinking or preserving vb
+void
+Adaptation::Ecap::XactionRep::forgetVb(const char *reason)
+{
+    debugs(93,9, HERE << "forget vb " << reason << "; status:" << status());
 
     BodyPipePointer &p = theVirginRep.raw().body_pipe;
-    Must(p != NULL);
-    Must(p->stillConsuming(this));
-    stopConsumingFrom(p);
-    p->enableAutoConsumption();
-    proxyingVb = opComplete;
-    canAccessVb = false;
+    if (p != NULL && p->stillConsuming(this))
+        stopConsumingFrom(p);
 
-    // called from adapter handler so does not inform adapter
+    if (makingVb == opUndecided)
+        makingVb = opNever;
+    else if (makingVb == opOn)
+        makingVb = opComplete;
 }
 
 void
-Ecap::XactionRep::useVirgin()
+Adaptation::Ecap::XactionRep::useVirgin()
 {
     debugs(93,3, HERE << status());
     Must(proxyingAb == opUndecided);
     proxyingAb = opNever;
 
-    BodyPipePointer &vbody_pipe = theVirginRep.raw().body_pipe;
+    preserveVb("useVirgin");
 
     HttpMsg *clone = theVirginRep.raw().header->clone();
     // check that clone() copies the pipe so that we do not have to
-    Must(!vbody_pipe == !clone->body_pipe);
-
-    if (proxyingVb == opOn) {
-        Must(vbody_pipe->stillConsuming(this));
-        // if libecap consumed, we cannot shortcircuit
-        Must(!vbody_pipe->consumedSize());
-        stopConsumingFrom(vbody_pipe);
-        canAccessVb = false;
-        proxyingVb = opComplete;
-    } else
-        if (proxyingVb == opUndecided) {
-            vbody_pipe = NULL; // it is not our pipe anymore
-            proxyingVb = opNever;
-        }
+    Must(!theVirginRep.raw().header->body_pipe == !clone->body_pipe);
 
-    sendAnswer(clone);
+    updateHistory(clone);
+    sendAnswer(Answer::Forward(clone));
     Must(done());
 }
 
 void
-Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
+Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
 {
     debugs(93,3, HERE << status());
     Must(m);
@@ -180,7 +393,8 @@ Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
     HttpMsg *msg = answer().header;
     if (!theAnswerRep->body()) { // final, bodyless answer
         proxyingAb = opNever;
-        sendAnswer(msg);
+        updateHistory(msg);
+        sendAnswer(Answer::Forward(msg));
     } else { // got answer headers but need to handle body
         proxyingAb = opOn;
         Must(!msg->body_pipe); // only host can set body pipes
@@ -189,7 +403,8 @@ Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
         rep->tieBody(this); // sets us as a producer
         Must(msg->body_pipe != NULL); // check tieBody
 
-        sendAnswer(msg);
+        updateHistory(msg);
+        sendAnswer(Answer::Forward(msg));
 
         debugs(93,4, HERE << "adapter will produce body" << status());
         theMaster->abMake(); // libecap will produce
@@ -197,46 +412,112 @@ Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
 }
 
 void
-Ecap::XactionRep::vbDiscard()
+Adaptation::Ecap::XactionRep::blockVirgin()
+{
+    debugs(93,3, HERE << status());
+    Must(proxyingAb == opUndecided);
+    proxyingAb = opNever;
+
+    sinkVb("blockVirgin");
+
+    updateHistory(NULL);
+    sendAnswer(Answer::Block(service().cfg().key));
+    Must(done());
+}
+
+/// Called just before sendAnswer() to record adapter meta-information
+/// which may affect answer processing and may be needed for logging.
+void
+Adaptation::Ecap::XactionRep::updateHistory(HttpMsg *adapted)
 {
-    Must(proxyingVb == opUndecided);
+    if (!theMaster) // all updates rely on being able to query the adapter
+        return;
+
+    const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
+                                 theCauseRep->raw().header : theVirginRep.raw().header);
+    Must(request);
+
+    // TODO: move common ICAP/eCAP logic to Adaptation::Xaction or similar
+    // TODO: optimize Area-to-String conversion
+
+    // update the cross-transactional database if needed
+    if (const char *xxNameStr = Adaptation::Config::masterx_shared_name) {
+        Adaptation::History::Pointer ah = request->adaptHistory(true);
+        if (ah != NULL) {
+            libecap::Name xxName(xxNameStr); // TODO: optimize?
+            if (const libecap::Area val = theMaster->option(xxName))
+                ah->updateXxRecord(xxNameStr, val.toString().c_str());
+        }
+    }
+
+    // update the adaptation plan if needed
+    if (service().cfg().routing) {
+        String services;
+        if (const libecap::Area services = theMaster->option(libecap::metaNextServices)) {
+            Adaptation::History::Pointer ah = request->adaptHistory(true);
+            if (ah != NULL)
+                ah->updateNextServices(services.toString().c_str());
+        }
+    } // TODO: else warn (occasionally!) if we got libecap::metaNextServices
+
+    // Store received meta headers for adapt::<last_h logformat code use.
+    // If we already have stored headers from a previous adaptation transaction
+    // related to the same master transction, they will be replaced.
+    Adaptation::History::Pointer ah = request->adaptLogHistory();
+    if (ah != NULL) {
+        HttpHeader meta(hoReply);
+        OptionsExtractor extractor(meta);
+        theMaster->visitEachOption(extractor);
+        ah->recordMeta(&meta);
+    }
+
+    // Add just-created history to the adapted/cloned request that lacks it.
+    if (HttpRequest *adaptedReq = dynamic_cast<HttpRequest*>(adapted))
+        adaptedReq->adaptHistoryImport(*request);
+}
+
+void
+Adaptation::Ecap::XactionRep::vbDiscard()
+{
+    Must(makingVb == opUndecided);
     // if adapter does not need vb, we do not need to send it
-    dropVirgin("vbDiscard");
-    Must(proxyingVb == opNever);
+    sinkVb("vbDiscard");
+    Must(makingVb == opNever);
 }
 
 void
-Ecap::XactionRep::vbMake()
+Adaptation::Ecap::XactionRep::vbMake()
 {
-    Must(proxyingVb == opUndecided);
+    Must(makingVb == opUndecided);
     BodyPipePointer &p = theVirginRep.raw().body_pipe;
     Must(p != NULL);
-    Must(p->setConsumerIfNotLate(this)); // to make vb, we must receive vb
-    proxyingVb = opOn;
+    Must(p->setConsumerIfNotLate(this)); // to deliver vb, we must receive vb
+    makingVb = opOn;
 }
 
 void
-Ecap::XactionRep::vbStopMaking()
+Adaptation::Ecap::XactionRep::vbStopMaking()
 {
+    Must(makingVb == opOn);
     // if adapter does not need vb, we do not need to receive it
-    if (proxyingVb == opOn)
-        dropVirgin("vbStopMaking");
-    Must(proxyingVb == opComplete);
+    sinkVb("vbStopMaking");
+    Must(makingVb == opComplete);
 }
 
 void
-Ecap::XactionRep::vbMakeMore()
+Adaptation::Ecap::XactionRep::vbMakeMore()
 {
-    Must(proxyingVb == opOn); // cannot make more if done proxying
+    Must(makingVb == opOn); // cannot make more if done proxying
     // we cannot guarantee more vb, but we can check that there is a chance
-    Must(!theVirginRep.raw().body_pipe->exhausted());
+    const BodyPipePointer &p = theVirginRep.raw().body_pipe;
+    Must(p != NULL && p->stillConsuming(this)); // we are plugged in
+    Must(!p->productionEnded() && p->mayNeedMoreData()); // and may get more
 }
 
 libecap::Area
-Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
+Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
 {
-    Must(canAccessVb);
-    // We may not be proxyingVb yet. It should be OK, but see vbContentShift().
+    // We may not be makingVb yet. It should be OK, but see vbContentShift().
 
     const BodyPipePointer &p = theVirginRep.raw().body_pipe;
     Must(p != NULL);
@@ -258,10 +539,9 @@ Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
 }
 
 void
-Ecap::XactionRep::vbContentShift(libecap::size_type n)
+Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n)
 {
-    Must(canAccessVb);
-    // We may not be proxyingVb yet. It should be OK now, but if BodyPipe
+    // We may not be makingVb yet. It should be OK now, but if BodyPipe
     // consume() requirements change, we would have to return empty vbContent
     // until the adapter registers as a consumer
 
@@ -273,23 +553,25 @@ Ecap::XactionRep::vbContentShift(libecap::size_type n)
 }
 
 void
-Ecap::XactionRep::noteAbContentDone(bool atEnd)
+Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd)
 {
-    Must(proxyingAb == opOn);
-    stopProducingFor(answer().body_pipe, atEnd);
-    proxyingAb = opComplete;
+    Must(proxyingAb == opOn && !abProductionFinished);
+    abProductionFinished = true;
+    abProductionAtEnd = atEnd; // store until ready to stop producing ourselves
+    debugs(93,5, HERE << "adapted body production ended");
+    moveAbContent();
 }
 
 void
-Ecap::XactionRep::noteAbContentAvailable()
+Adaptation::Ecap::XactionRep::noteAbContentAvailable()
 {
-    Must(proxyingAb == opOn);
+    Must(proxyingAb == opOn && !abProductionFinished);
     moveAbContent();
 }
 
 #if 0 /* XXX: implement */
 void
-Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
+Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
 {
     Must(answer().body_pipe != NULL);
     if (size.known())
@@ -299,7 +581,7 @@ Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
 #endif
 
 void
-Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
+Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
 {
     debugs(93,3, HERE << "adapter needs time: " <<
            d.state << '/' << d.progress);
@@ -307,27 +589,27 @@ Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
 }
 
 void
-Ecap::XactionRep::adaptationAborted()
+Adaptation::Ecap::XactionRep::adaptationAborted()
 {
     tellQueryAborted(true); // should eCAP support retries?
     mustStop("adaptationAborted");
 }
 
 bool
-Ecap::XactionRep::callable() const
+Adaptation::Ecap::XactionRep::callable() const
 {
     return !done();
 }
 
 void
-Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
+Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
 {
     Must(proxyingAb == opOn);
     moveAbContent();
 }
 
 void
-Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
+Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
 {
     Must(proxyingAb == opOn);
     stopProducingFor(answer().body_pipe, false);
@@ -337,79 +619,91 @@ Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
 }
 
 void
-Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
+Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
 {
-    Must(proxyingVb == opOn);
+    Must(makingVb == opOn); // or we would not be registered as a consumer
     Must(theMaster);
     theMaster->noteVbContentAvailable();
 }
 
 void
-Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
+Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
 {
-    Must(proxyingVb == opOn);
+    Must(makingVb == opOn); // or we would not be registered as a consumer
     Must(theMaster);
     theMaster->noteVbContentDone(true);
-    proxyingVb = opComplete;
+    vbProductionFinished = true;
 }
 
 void
-Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
+Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
 {
-    Must(proxyingVb == opOn);
+    Must(makingVb == opOn); // or we would not be registered as a consumer
     Must(theMaster);
     theMaster->noteVbContentDone(false);
-    proxyingVb = opComplete;
+    vbProductionFinished = true;
 }
 
 void
-Ecap::XactionRep::noteInitiatorAborted()
+Adaptation::Ecap::XactionRep::noteInitiatorAborted()
 {
     mustStop("initiator aborted");
 }
 
 // get content from the adapter and put it into the adapted pipe
 void
-Ecap::XactionRep::moveAbContent()
+Adaptation::Ecap::XactionRep::moveAbContent()
 {
     Must(proxyingAb == opOn);
     const libecap::Area c = theMaster->abContent(0, libecap::nsize);
-    debugs(93,5, HERE << " up to " << c.size << " bytes");
-    if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
-        theMaster->abContentShift(used);
+    debugs(93,5, HERE << "up to " << c.size << " bytes");
+    if (c.size == 0 && abProductionFinished) { // no ab now and in the future
+        stopProducingFor(answer().body_pipe, abProductionAtEnd);
+        proxyingAb = opComplete;
+        debugs(93,5, HERE << "last adapted body data retrieved");
+    } else if (c.size > 0) {
+        if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
+            theMaster->abContentShift(used);
+    }
 }
 
 const char *
-Ecap::XactionRep::status() const
+Adaptation::Ecap::XactionRep::status() const
 {
     static MemBuf buf;
     buf.reset();
 
     buf.append(" [", 2);
 
-    if (proxyingVb == opOn) {
-        const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
-        if (!canAccessVb)
-            buf.append("x", 1);
-        if (vp != NULL && vp->stillConsuming(this)) {
-            buf.append("Vb", 2);
-            buf.append(vp->status(), strlen(vp->status())); // XXX
-        } else
-            buf.append("V.", 2);
-    }
+    if (makingVb)
+        buf.Printf("M%d", static_cast<int>(makingVb));
+
+    const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
+    if (!vp)
+        buf.append(" !V", 3);
+    else if (vp->stillConsuming(const_cast<XactionRep*>(this)))
+        buf.append(" Vc", 3);
+    else
+        buf.append(" V?", 3);
+
+    if (vbProductionFinished)
+        buf.append(".", 1);
+
+    buf.Printf(" A%d", static_cast<int>(proxyingAb));
 
     if (proxyingAb == opOn) {
         MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
         Must(rep);
         const BodyPipePointer &ap = rep->raw().body_pipe;
-        if (ap != NULL && ap->stillProducing(this)) {
-            buf.append(" Ab", 3);
-            buf.append(ap->status(), strlen(ap->status())); // XXX
-        } else
-            buf.append(" A.", 3);
+        if (!ap)
+            buf.append(" !A", 3);
+        else if (ap->stillProducing(const_cast<XactionRep*>(this)))
+            buf.append(" Ap", 3);
+        else
+            buf.append(" A?", 3);
     }
 
-    buf.Printf(" ecapx%d]", id);
+    buf.Printf(" %s%u]", id.Prefix, id.value);
 
     buf.terminate();