#include "squid.h"
#include <libecap/common/area.h>
#include <libecap/common/delay.h>
+#include <libecap/common/named_values.h>
+#include <libecap/common/names.h>
#include <libecap/adapter/xaction.h>
#include "HttpRequest.h"
#include "HttpReply.h"
#include "SquidTime.h"
+#include "adaptation/Answer.h"
#include "adaptation/ecap/XactionRep.h"
+#include "adaptation/ecap/Config.h"
+#include "adaptation/Initiator.h"
#include "base/TextException.h"
CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep, XactionRep);
+/// a libecap Visitor for converting adapter transaction options to HttpHeader
+class OptionsExtractor: public libecap::NamedValueVisitor
+{
+public:
+ typedef libecap::Name Name;
+ typedef libecap::Area Area;
+
+ OptionsExtractor(HttpHeader &aMeta): meta(aMeta) {}
+
+ // libecap::NamedValueVisitor API
+ virtual void visit(const Name &name, const Area &value) {
+ meta.putExt(name.image().c_str(), value.toString().c_str());
+ }
-Adaptation::Ecap::XactionRep::XactionRep(Adaptation::Initiator *anInitiator,
- HttpMsg *virginHeader, HttpRequest *virginCause,
- const Adaptation::ServicePointer &aService):
+ HttpHeader &meta; ///< where to put extracted options
+};
+
+Adaptation::Ecap::XactionRep::XactionRep(
+ HttpMsg *virginHeader, HttpRequest *virginCause,
+ const Adaptation::ServicePointer &aService):
AsyncJob("Adaptation::Ecap::XactionRep"),
- Adaptation::Initiate("Adaptation::Ecap::XactionRep", anInitiator),
+ Adaptation::Initiate("Adaptation::Ecap::XactionRep"),
theService(aService),
theVirginRep(virginHeader), theCauseRep(NULL),
- proxyingVb(opUndecided), proxyingAb(opUndecided),
+ makingVb(opUndecided), proxyingAb(opUndecided),
adaptHistoryId(-1),
- canAccessVb(false),
+ vbProductionFinished(false),
abProductionFinished(false), abProductionAtEnd(false)
{
if (virginCause)
return *theService;
}
+const libecap::Area
+Adaptation::Ecap::XactionRep::option(const libecap::Name &name) const
+{
+ if (name == libecap::metaClientIp)
+ return clientIpValue();
+ if (name == libecap::metaUserName)
+ return usernameValue();
+ if (Adaptation::Config::masterx_shared_name && name == Adaptation::Config::masterx_shared_name)
+ return masterxSharedValue(name);
+
+ // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
+
+ // If the name is unknown, metaValue returns an emtpy area
+ return metaValue(name);
+}
+
+void
+Adaptation::Ecap::XactionRep::visitEachOption(libecap::NamedValueVisitor &visitor) const
+{
+ if (const libecap::Area value = clientIpValue())
+ visitor.visit(libecap::metaClientIp, value);
+ if (const libecap::Area value = usernameValue())
+ visitor.visit(libecap::metaUserName, value);
+
+ if (Adaptation::Config::masterx_shared_name) {
+ const libecap::Name name(Adaptation::Config::masterx_shared_name);
+ if (const libecap::Area value = masterxSharedValue(name))
+ visitor.visit(name, value);
+ }
+
+ visitEachMetaHeader(visitor);
+
+ // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
+}
+
+const libecap::Area
+Adaptation::Ecap::XactionRep::clientIpValue() const
+{
+ const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
+ theCauseRep->raw().header : theVirginRep.raw().header);
+ Must(request);
+ // TODO: move this logic into HttpRequest::clientIp(bool) and
+ // HttpRequest::clientIpString(bool) and reuse everywhere
+ if (TheConfig.send_client_ip && request) {
+ Ip::Address client_addr;
+#if FOLLOW_X_FORWARDED_FOR
+ if (TheConfig.use_indirect_client) {
+ client_addr = request->indirect_client_addr;
+ } else
+#endif
+ client_addr = request->client_addr;
+ if (!client_addr.IsAnyAddr() && !client_addr.IsNoAddr()) {
+ char ntoabuf[MAX_IPSTRLEN] = "";
+ client_addr.NtoA(ntoabuf,MAX_IPSTRLEN);
+ return libecap::Area::FromTempBuffer(ntoabuf, strlen(ntoabuf));
+ }
+ }
+ return libecap::Area();
+}
+
+const libecap::Area
+Adaptation::Ecap::XactionRep::usernameValue() const
+{
+#if USE_AUTH
+ const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
+ theCauseRep->raw().header : theVirginRep.raw().header);
+ Must(request);
+ if (request->auth_user_request != NULL) {
+ if (char const *name = request->auth_user_request->username())
+ return libecap::Area::FromTempBuffer(name, strlen(name));
+ else if (request->extacl_user.defined() && request->extacl_user.size())
+ return libecap::Area::FromTempBuffer(request->extacl_user.rawBuf(),
+ request->extacl_user.size());
+ }
+#endif
+ return libecap::Area();
+}
+
+const libecap::Area
+Adaptation::Ecap::XactionRep::masterxSharedValue(const libecap::Name &name) const
+{
+ const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
+ theCauseRep->raw().header : theVirginRep.raw().header);
+ Must(request);
+ if (name.known()) { // must check to avoid empty names matching unset cfg
+ Adaptation::History::Pointer ah = request->adaptHistory(false);
+ if (ah != NULL) {
+ String name, value;
+ if (ah->getXxRecord(name, value))
+ return libecap::Area::FromTempBuffer(value.rawBuf(), value.size());
+ }
+ }
+ return libecap::Area();
+}
+
+const libecap::Area
+Adaptation::Ecap::XactionRep::metaValue(const libecap::Name &name) const
+{
+ HttpRequest *request = dynamic_cast<HttpRequest*>(theCauseRep ?
+ theCauseRep->raw().header : theVirginRep.raw().header);
+ Must(request);
+ HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
+
+ if (name.known()) { // must check to avoid empty names matching unset cfg
+ typedef Adaptation::Config::MetaHeaders::iterator ACAMLI;
+ for (ACAMLI i = Adaptation::Config::metaHeaders.begin(); i != Adaptation::Config::metaHeaders.end(); ++i) {
+ if (name == (*i)->name.termedBuf()) {
+ if (const char *value = (*i)->match(request, reply))
+ return libecap::Area::FromTempString(value);
+ else
+ return libecap::Area();
+ }
+ }
+ }
+
+ return libecap::Area();
+}
+
+void
+Adaptation::Ecap::XactionRep::visitEachMetaHeader(libecap::NamedValueVisitor &visitor) const
+{
+ HttpRequest *request = dynamic_cast<HttpRequest*>(theCauseRep ?
+ theCauseRep->raw().header : theVirginRep.raw().header);
+ Must(request);
+ HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
+
+ typedef Adaptation::Config::MetaHeaders::iterator ACAMLI;
+ for (ACAMLI i = Adaptation::Config::metaHeaders.begin(); i != Adaptation::Config::metaHeaders.end(); ++i) {
+ const char *v = (*i)->match(request, reply);
+ if (v) {
+ const libecap::Name name((*i)->name.termedBuf());
+ const libecap::Area value = libecap::Area::FromTempString(v);
+ visitor.visit(name, value);
+ }
+ }
+}
+
void
Adaptation::Ecap::XactionRep::start()
{
Must(theMaster);
- if (theVirginRep.raw().body_pipe != NULL)
- canAccessVb = true; /// assumes nobody is consuming; \todo check
- else
- proxyingVb = opNever;
+ if (!theVirginRep.raw().body_pipe)
+ makingVb = opNever; // there is nothing to deliver
const HttpRequest *request = dynamic_cast<const HttpRequest*> (theCauseRep ?
theCauseRep->raw().header : theVirginRep.raw().header);
}
}
- {
- BodyPipe::Pointer body_pipe = theVirginRep.raw().body_pipe;
- if (body_pipe != NULL) {
- Must(body_pipe->stillConsuming(this));
- stopConsumingFrom(body_pipe);
- }
- }
+ BodyPipe::Pointer &body_pipe = theVirginRep.raw().body_pipe;
+ if (body_pipe != NULL && body_pipe->stillConsuming(this))
+ stopConsumingFrom(body_pipe);
terminateMaster();
bool
Adaptation::Ecap::XactionRep::doneAll() const
{
- return proxyingVb >= opComplete && proxyingAb >= opComplete &&
+ return makingVb >= opComplete && proxyingAb >= opComplete &&
Adaptation::Initiate::doneAll();
}
-// stops receiving virgin and enables auto-consumption
+// stops receiving virgin and enables auto-consumption, dropping any vb bytes
void
-Adaptation::Ecap::XactionRep::dropVirgin(const char *reason)
+Adaptation::Ecap::XactionRep::sinkVb(const char *reason)
{
- debugs(93,4, HERE << "because " << reason << "; status:" << status());
- Must(proxyingVb = opOn);
+ debugs(93,4, HERE << "sink for " << reason << "; status:" << status());
+
+ // we reset raw().body_pipe when we are done, so use this one for checking
+ const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
+ if (permPipe != NULL)
+ permPipe->enableAutoConsumption();
+
+ forgetVb(reason);
+}
+
+// stops receiving virgin but preserves it for others to use
+void
+Adaptation::Ecap::XactionRep::preserveVb(const char *reason)
+{
+ debugs(93,4, HERE << "preserve for " << reason << "; status:" << status());
+
+ // we reset raw().body_pipe when we are done, so use this one for checking
+ const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
+ if (permPipe != NULL) {
+ // if libecap consumed, we cannot preserve
+ Must(!permPipe->consumedSize());
+ }
+
+ forgetVb(reason);
+}
+
+// disassociates us from vb; the last step of sinking or preserving vb
+void
+Adaptation::Ecap::XactionRep::forgetVb(const char *reason)
+{
+ debugs(93,9, HERE << "forget vb " << reason << "; status:" << status());
BodyPipePointer &p = theVirginRep.raw().body_pipe;
- Must(p != NULL);
- Must(p->stillConsuming(this));
- stopConsumingFrom(p);
- p->enableAutoConsumption();
- proxyingVb = opComplete;
- canAccessVb = false;
+ if (p != NULL && p->stillConsuming(this))
+ stopConsumingFrom(p);
- // called from adapter handler so does not inform adapter
+ if (makingVb == opUndecided)
+ makingVb = opNever;
+ else if (makingVb == opOn)
+ makingVb = opComplete;
}
void
Must(proxyingAb == opUndecided);
proxyingAb = opNever;
- BodyPipePointer &vbody_pipe = theVirginRep.raw().body_pipe;
+ preserveVb("useVirgin");
HttpMsg *clone = theVirginRep.raw().header->clone();
// check that clone() copies the pipe so that we do not have to
- Must(!vbody_pipe == !clone->body_pipe);
-
- if (proxyingVb == opOn) {
- Must(vbody_pipe->stillConsuming(this));
- // if libecap consumed, we cannot shortcircuit
- Must(!vbody_pipe->consumedSize());
- stopConsumingFrom(vbody_pipe);
- canAccessVb = false;
- proxyingVb = opComplete;
- } else if (proxyingVb == opUndecided) {
- vbody_pipe = NULL; // it is not our pipe anymore
- proxyingVb = opNever;
- }
+ Must(!theVirginRep.raw().header->body_pipe == !clone->body_pipe);
- sendAnswer(clone);
+ updateHistory(clone);
+ sendAnswer(Answer::Forward(clone));
Must(done());
}
HttpMsg *msg = answer().header;
if (!theAnswerRep->body()) { // final, bodyless answer
proxyingAb = opNever;
- sendAnswer(msg);
+ updateHistory(msg);
+ sendAnswer(Answer::Forward(msg));
} else { // got answer headers but need to handle body
proxyingAb = opOn;
Must(!msg->body_pipe); // only host can set body pipes
rep->tieBody(this); // sets us as a producer
Must(msg->body_pipe != NULL); // check tieBody
- sendAnswer(msg);
+ updateHistory(msg);
+ sendAnswer(Answer::Forward(msg));
debugs(93,4, HERE << "adapter will produce body" << status());
theMaster->abMake(); // libecap will produce
}
}
+void
+Adaptation::Ecap::XactionRep::blockVirgin()
+{
+ debugs(93,3, HERE << status());
+ Must(proxyingAb == opUndecided);
+ proxyingAb = opNever;
+
+ sinkVb("blockVirgin");
+
+ updateHistory(NULL);
+ sendAnswer(Answer::Block(service().cfg().key));
+ Must(done());
+}
+
+/// Called just before sendAnswer() to record adapter meta-information
+/// which may affect answer processing and may be needed for logging.
+void
+Adaptation::Ecap::XactionRep::updateHistory(HttpMsg *adapted)
+{
+ if (!theMaster) // all updates rely on being able to query the adapter
+ return;
+
+ const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
+ theCauseRep->raw().header : theVirginRep.raw().header);
+ Must(request);
+
+ // TODO: move common ICAP/eCAP logic to Adaptation::Xaction or similar
+ // TODO: optimize Area-to-String conversion
+
+ // update the cross-transactional database if needed
+ if (const char *xxNameStr = Adaptation::Config::masterx_shared_name) {
+ Adaptation::History::Pointer ah = request->adaptHistory(true);
+ if (ah != NULL) {
+ libecap::Name xxName(xxNameStr); // TODO: optimize?
+ if (const libecap::Area val = theMaster->option(xxName))
+ ah->updateXxRecord(xxNameStr, val.toString().c_str());
+ }
+ }
+
+ // update the adaptation plan if needed
+ if (service().cfg().routing) {
+ String services;
+ if (const libecap::Area services = theMaster->option(libecap::metaNextServices)) {
+ Adaptation::History::Pointer ah = request->adaptHistory(true);
+ if (ah != NULL)
+ ah->updateNextServices(services.toString().c_str());
+ }
+ } // TODO: else warn (occasionally!) if we got libecap::metaNextServices
+
+ // Store received meta headers for adapt::<last_h logformat code use.
+ // If we already have stored headers from a previous adaptation transaction
+ // related to the same master transction, they will be replaced.
+ Adaptation::History::Pointer ah = request->adaptLogHistory();
+ if (ah != NULL) {
+ HttpHeader meta(hoReply);
+ OptionsExtractor extractor(meta);
+ theMaster->visitEachOption(extractor);
+ ah->recordMeta(&meta);
+ }
+
+ // Add just-created history to the adapted/cloned request that lacks it.
+ if (HttpRequest *adaptedReq = dynamic_cast<HttpRequest*>(adapted))
+ adaptedReq->adaptHistoryImport(*request);
+}
+
void
Adaptation::Ecap::XactionRep::vbDiscard()
{
- Must(proxyingVb == opUndecided);
+ Must(makingVb == opUndecided);
// if adapter does not need vb, we do not need to send it
- dropVirgin("vbDiscard");
- Must(proxyingVb == opNever);
+ sinkVb("vbDiscard");
+ Must(makingVb == opNever);
}
void
Adaptation::Ecap::XactionRep::vbMake()
{
- Must(proxyingVb == opUndecided);
+ Must(makingVb == opUndecided);
BodyPipePointer &p = theVirginRep.raw().body_pipe;
Must(p != NULL);
- Must(p->setConsumerIfNotLate(this)); // to make vb, we must receive vb
- proxyingVb = opOn;
+ Must(p->setConsumerIfNotLate(this)); // to deliver vb, we must receive vb
+ makingVb = opOn;
}
void
Adaptation::Ecap::XactionRep::vbStopMaking()
{
+ Must(makingVb == opOn);
// if adapter does not need vb, we do not need to receive it
- if (proxyingVb == opOn)
- dropVirgin("vbStopMaking");
- Must(proxyingVb == opComplete);
+ sinkVb("vbStopMaking");
+ Must(makingVb == opComplete);
}
void
Adaptation::Ecap::XactionRep::vbMakeMore()
{
- Must(proxyingVb == opOn); // cannot make more if done proxying
+ Must(makingVb == opOn); // cannot make more if done proxying
// we cannot guarantee more vb, but we can check that there is a chance
- Must(!theVirginRep.raw().body_pipe->exhausted());
+ const BodyPipePointer &p = theVirginRep.raw().body_pipe;
+ Must(p != NULL && p->stillConsuming(this)); // we are plugged in
+ Must(!p->productionEnded() && p->mayNeedMoreData()); // and may get more
}
libecap::Area
Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
{
- Must(canAccessVb);
- // We may not be proxyingVb yet. It should be OK, but see vbContentShift().
+ // We may not be makingVb yet. It should be OK, but see vbContentShift().
const BodyPipePointer &p = theVirginRep.raw().body_pipe;
Must(p != NULL);
void
Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n)
{
- Must(canAccessVb);
- // We may not be proxyingVb yet. It should be OK now, but if BodyPipe
+ // We may not be makingVb yet. It should be OK now, but if BodyPipe
// consume() requirements change, we would have to return empty vbContent
// until the adapter registers as a consumer
void
Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
{
- Must(proxyingVb == opOn);
+ Must(makingVb == opOn); // or we would not be registered as a consumer
Must(theMaster);
theMaster->noteVbContentAvailable();
}
void
Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
{
- Must(proxyingVb == opOn);
+ Must(makingVb == opOn); // or we would not be registered as a consumer
Must(theMaster);
theMaster->noteVbContentDone(true);
- proxyingVb = opComplete;
+ vbProductionFinished = true;
}
void
Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
{
- Must(proxyingVb == opOn);
+ Must(makingVb == opOn); // or we would not be registered as a consumer
Must(theMaster);
theMaster->noteVbContentDone(false);
- proxyingVb = opComplete;
+ vbProductionFinished = true;
}
void
buf.append(" [", 2);
- if (proxyingVb == opOn) {
- const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
- if (!canAccessVb)
- buf.append("x", 1);
- if (vp != NULL && vp->stillConsuming(this)) {
- buf.append("Vb", 2);
- buf.append(vp->status(), strlen(vp->status())); // XXX
- } else
- buf.append("V.", 2);
- }
+ if (makingVb)
+ buf.Printf("M%d", static_cast<int>(makingVb));
+
+ const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
+ if (!vp)
+ buf.append(" !V", 3);
+ else if (vp->stillConsuming(const_cast<XactionRep*>(this)))
+ buf.append(" Vc", 3);
+ else
+ buf.append(" V?", 3);
+
+ if (vbProductionFinished)
+ buf.append(".", 1);
+
+ buf.Printf(" A%d", static_cast<int>(proxyingAb));
if (proxyingAb == opOn) {
MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
Must(rep);
const BodyPipePointer &ap = rep->raw().body_pipe;
- if (ap != NULL && ap->stillProducing(this)) {
- buf.append(" Ab", 3);
- buf.append(ap->status(), strlen(ap->status())); // XXX
- } else
- buf.append(" A.", 3);
+ if (!ap)
+ buf.append(" !A", 3);
+ else if (ap->stillProducing(const_cast<XactionRep*>(this)))
+ buf.append(" Ap", 3);
+ else
+ buf.append(" A?", 3);
}
- buf.Printf(" ecapx%d]", id);
+ buf.Printf(" %s%u]", id.Prefix, id.value);
buf.terminate();