#include "adaptation/icap/Client.h"
#include "ChunkedCodingParser.h"
#include "TextException.h"
-#include "AuthUserRequest.h"
+#include "auth/UserRequest.h"
#include "adaptation/icap/Config.h"
#include "SquidTime.h"
+#include "AccessLogEntry.h"
+#include "adaptation/icap/History.h"
+#include "adaptation/History.h"
// flow and terminology:
// HTTP| --> receive --> encode --> write --> |network
}
Adaptation::Icap::ModXact::ModXact(Adaptation::Initiator *anInitiator, HttpMsg *virginHeader,
- HttpRequest *virginCause, Adaptation::Icap::ServiceRep::Pointer &aService):
+ HttpRequest *virginCause, Adaptation::Icap::ServiceRep::Pointer &aService):
AsyncJob("Adaptation::Icap::ModXact"),
Adaptation::Icap::Xaction("Adaptation::Icap::ModXact", anInitiator, aService),
- icapReply(NULL),
virginConsumed(0),
bodyParser(NULL),
- canStartBypass(false) // too early
+ canStartBypass(false), // too early
+ protectGroupBypass(true),
+ replyBodySize(0),
+ adaptHistoryId(-1)
{
assert(virginHeader);
// encoding
// nothing to do because we are using temporary buffers
- // parsing
- icapReply = new HttpReply;
+ // parsing; TODO: do not set until we parse, see ICAPOptXact
+ icapReply = HTTPMSGLOCK(new HttpReply);
icapReply->protoPrefix = "ICAP/"; // TODO: make an IcapReply class?
debugs(93,7, HERE << "initialized." << status());
{
Adaptation::Icap::Xaction::start();
+ // reserve an adaptation history slot (attempts are known at this time)
+ Adaptation::History::Pointer ah = virginRequest().adaptLogHistory();
+ if (ah != NULL)
+ adaptHistoryId = ah->recordXactStart(service().cfg().key, icap_tr_start, attempts > 1);
+
estimateVirginBody(); // before virgin disappears!
canStartBypass = service().cfg().bypass;
startWriting();
} else {
disableRetries();
+ disableRepeats("ICAP service is unusable");
throw TexcHere("ICAP service is unusable");
}
}
// write headers
state.writing = State::writingHeaders;
+ icap_tio_start = current_time;
scheduleWrite(requestBuf);
}
// determine next step
if (preview.enabled())
state.writing = preview.done() ? State::writingPaused : State::writingPreview;
- else
- if (virginBody.expected())
- state.writing = State::writingPrime;
- else {
- stopWriting(true);
- return;
- }
+ else if (virginBody.expected())
+ state.writing = State::writingPrime;
+ else {
+ stopWriting(true);
+ return;
+ }
writeMore();
}
Must(virgin.body_pipe != NULL);
const size_t sizeMax = (size_t)virgin.body_pipe->buf().contentSize();
- const size_t size = XMIN(preview.debt(), sizeMax);
+ const size_t size = min(preview.debt(), sizeMax);
writeSomeBody("preview body", size);
// change state once preview is written
writeBuf.init(); // note: we assume that last-chunk will fit
const size_t writableSize = virginContentSize(virginBodyWriting);
- const size_t chunkSize = XMIN(writableSize, size);
+ const size_t chunkSize = min(writableSize, size);
if (chunkSize) {
debugs(93, 7, HERE << "will write " << chunkSize <<
buf.append(ICAP::crlf, 2); // chunk-terminating CRLF
}
+const HttpRequest &Adaptation::Icap::ModXact::virginRequest() const
+{
+ const HttpRequest *request = virgin.cause ?
+ virgin.cause : dynamic_cast<const HttpRequest*>(virgin.header);
+ Must(request);
+ return *request;
+}
+
// did the activity reached the end of the virgin body?
bool Adaptation::Icap::ModXact::virginBodyEndReached(const Adaptation::Icap::VirginBodyAct &act) const
{
void Adaptation::Icap::ModXact::virginConsume()
{
- debugs(93, 9, HERE << "consumption guards: " << !virgin.body_pipe << isRetriable);
+ debugs(93, 9, HERE << "consumption guards: " << !virgin.body_pipe << isRetriable <<
+ isRepeatable << canStartBypass << protectGroupBypass);
if (!virgin.body_pipe)
return; // nothing to consume
return; // do not consume if we may have to retry later
BodyPipe &bp = *virgin.body_pipe;
+ const bool wantToPostpone = isRepeatable || canStartBypass || protectGroupBypass;
// Why > 2? HttpState does not use the last bytes in the buffer
// because delayAwareRead() is arguably broken. See
// HttpStateData::maybeReadVirginBody for more details.
- if (canStartBypass && bp.buf().spaceSize() > 2) {
+ if (wantToPostpone && bp.buf().spaceSize() > 2) {
// Postponing may increase memory footprint and slow the HTTP side
// down. Not postponing may increase the number of ICAP errors
// if the ICAP service fails. We may also use "potential" space to
" from " << virgin.body_pipe->status());
if (virginBodyWriting.active())
- offset = XMIN(virginBodyWriting.offset(), offset);
+ offset = min(virginBodyWriting.offset(), offset);
if (virginBodySending.active())
- offset = XMIN(virginBodySending.offset(), offset);
+ offset = min(virginBodySending.offset(), offset);
Must(virginConsumed <= offset && offset <= end);
bp.consume(size);
virginConsumed += size;
Must(!isRetriable); // or we should not be consuming
- disableBypass("consumed content");
+ disableRepeats("consumed content");
+ disableBypass("consumed content", true);
}
}
void Adaptation::Icap::ModXact::handleCommRead(size_t)
{
Must(!state.doneParsing());
+ icap_tio_finish = current_time;
parseMore();
readMore();
}
debugs(93,5, HERE << "echoed " << size << " out of " << sizeMax <<
" bytes");
virginBodySending.progress(size);
+ disableRepeats("echoed content");
+ disableBypass("echoed content", true);
virginConsume();
- disableBypass("echoed content");
}
if (virginBodyEndReached(virginBodySending)) {
// stop (or do not start) sending adapted message body
void Adaptation::Icap::ModXact::stopSending(bool nicely)
{
+ debugs(93, 7, HERE << "Enter stop sending ");
if (doneSending())
return;
+ debugs(93, 7, HERE << "Proceed with stop sending ");
if (state.sending != State::sendingUndecided) {
debugs(93, 7, HERE << "will no longer send" << status());
try {
debugs(93, 3, HERE << "bypassing " << inCall << " exception: " <<
- e.what() << ' ' << status());
+ e.what() << ' ' << status());
bypassFailure();
} catch (const std::exception &bypassE) {
Adaptation::Icap::Xaction::callException(bypassE);
void Adaptation::Icap::ModXact::bypassFailure()
{
- disableBypass("already started to bypass");
+ disableBypass("already started to bypass", false);
Must(!isRetriable); // or we should not be bypassing
+ // TODO: should the same be enforced for isRepeatable? Check icap_repeat??
prepEchoing();
}
}
-void Adaptation::Icap::ModXact::disableBypass(const char *reason)
+void Adaptation::Icap::ModXact::disableBypass(const char *reason, bool includingGroupBypass)
{
if (canStartBypass) {
debugs(93,7, HERE << "will never start bypass because " << reason);
canStartBypass = false;
}
+ if (protectGroupBypass && includingGroupBypass) {
+ debugs(93,7, HERE << "not protecting group bypass because " << reason);
+ protectGroupBypass = false;
+ }
}
if (gotEncapsulated("res-hdr")) {
adapted.setHeader(new HttpReply);
+ setOutcome(service().cfg().method == ICAP::methodReqmod ?
+ xoSatisfied : xoModified);
} else if (gotEncapsulated("req-hdr")) {
adapted.setHeader(new HttpRequest);
+ setOutcome(xoModified);
} else
throw TexcHere("Neither res-hdr nor req-hdr in maybeAllocateHttpMsg()");
}
// called after parsing all headers or when bypassing an exception
void Adaptation::Icap::ModXact::startSending()
{
- disableBypass("sent headers");
+ disableRepeats("sent headers");
+ disableBypass("sent headers", true);
sendAnswer(adapted.header);
if (state.sending == State::sendingVirgin)
break;
}
+ const HttpRequest *request = dynamic_cast<HttpRequest*>(adapted.header);
+ if (!request)
+ request = &virginRequest();
+
+ // update the cross-transactional database if needed (all status codes!)
+ if (const char *xxName = Adaptation::Config::masterx_shared_name) {
+ Adaptation::History::Pointer ah = request->adaptHistory(true);
+ if (ah != NULL) {
+ const String val = icapReply->header.getByName(xxName);
+ if (val.size() > 0) // XXX: HttpHeader lacks empty value detection
+ ah->updateXxRecord(xxName, val);
+ }
+ }
+
+ // update the adaptation plan if needed (all status codes!)
+ if (service().cfg().routing) {
+ String services;
+ if (icapReply->header.getList(HDR_X_NEXT_SERVICES, &services)) {
+ Adaptation::History::Pointer ah = request->adaptHistory(true);
+ if (ah != NULL)
+ ah->updateNextServices(services);
+ }
+ } // TODO: else warn (occasionally!) if we got HDR_X_NEXT_SERVICES
+
+ // We need to store received ICAP headers for <icapLastHeader logformat option.
+ // If we already have stored headers from previous ICAP transaction related to this
+ // request, old headers will be replaced with the new one.
+
+ Adaptation::Icap::History::Pointer h = request->icapHistory();
+ if (h != NULL) {
+ h->mergeIcapHeaders(&icapReply->header);
+ h->setIcapLastHeader(&icapReply->header);
+ }
+
// handle100Continue() manages state.writing on its own.
// Non-100 status means the server needs no postPreview data from us.
if (state.writing == State::writingPaused)
// We actually start sending (echoig or not) in startSending.
void Adaptation::Icap::ModXact::prepEchoing()
{
- disableBypass("preparing to echo content");
+ disableRepeats("preparing to echo content");
+ disableBypass("preparing to echo content", true);
+ setOutcome(xoEcho);
// We want to clone the HTTP message, but we do not want
// to copy some non-HTTP state parts that HttpMsg kids carry in them.
// Thus, we cannot use a smart pointer, copy constructor, or equivalent.
// Instead, we simply write the HTTP message and "clone" it by parsing.
+ // TODO: use HttpMsg::clone()!
HttpMsg *oldHead = virgin.header;
debugs(93, 7, HERE << "cloning virgin message " << oldHead);
// allocate the adapted message and copy metainfo
Must(!adapted.header);
HttpMsg *newHead = NULL;
- if (dynamic_cast<const HttpRequest*>(oldHead)) {
+ if (const HttpRequest *oldR = dynamic_cast<const HttpRequest*>(oldHead)) {
HttpRequest *newR = new HttpRequest;
+ newR->canonical = oldR->canonical ?
+ xstrdup(oldR->canonical) : NULL; // parse() does not set it
newHead = newR;
} else if (dynamic_cast<const HttpReply*>(oldHead)) {
HttpReply *newRep = new HttpReply;
return false;
}
+ if (HttpRequest *r = dynamic_cast<HttpRequest*>(head))
+ urlCanonical(r); // parse does not set HttpRequest::canonical
+
debugs(93, 5, HERE << "parse success, consume " << head->hdr_sz << " bytes, return true");
readBuf.consume(head->hdr_sz);
return true;
debugs(93, 5, HERE << "have " << readBuf.contentSize() << " body bytes after " <<
"parse; parsed all: " << parsed);
+ replyBodySize += adapted.body_pipe->buf().contentSize();
// TODO: expose BodyPipe::putSize() to make this check simpler and clearer
- if (adapted.body_pipe->buf().contentSize() > 0) // parsed something sometime
- disableBypass("sent adapted content");
+ // TODO: do we really need this if we disable when sending headers?
+ if (adapted.body_pipe->buf().contentSize() > 0) { // parsed something sometime
+ disableRepeats("sent adapted content");
+ disableBypass("sent adapted content", true);
+ }
if (parsed) {
stopParsing();
stopWriting(false);
stopSending(false);
- if (icapReply) {
- delete icapReply;
- icapReply = NULL;
- }
+ // update adaptation history if start was called and we reserved a slot
+ Adaptation::History::Pointer ah = virginRequest().adaptLogHistory();
+ if (ah != NULL && adaptHistoryId >= 0)
+ ah->recordXactFinish(adaptHistoryId);
Adaptation::Icap::Xaction::swanSong();
}
+void prepareLogWithRequestDetails(HttpRequest *, AccessLogEntry *);
+
+void Adaptation::Icap::ModXact::finalizeLogInfo()
+{
+ HttpRequest * request_ = NULL;
+ HttpReply * reply_ = NULL;
+ if (!(request_ = dynamic_cast<HttpRequest*>(adapted.header))) {
+ request_ = (virgin.cause? virgin.cause: dynamic_cast<HttpRequest*>(virgin.header));
+ reply_ = dynamic_cast<HttpReply*>(adapted.header);
+ }
+
+ Adaptation::Icap::History::Pointer h = request_->icapHistory();
+ Must(h != NULL); // ICAPXaction::maybeLog calls only if there is a log
+ al.icp.opcode = ICP_INVALID;
+ al.url = h->log_uri.termedBuf();
+ const Adaptation::Icap::ServiceRep &s = service();
+ al.icap.reqMethod = s.cfg().method;
+
+ al.cache.caddr = request_->client_addr;
+
+ al.request = HTTPMSGLOCK(request_);
+ if (reply_)
+ al.reply = HTTPMSGLOCK(reply_);
+ else
+ al.reply = NULL;
+
+ if (h->rfc931.size())
+ al.cache.rfc931 = h->rfc931.termedBuf();
+
+#if USE_SSL
+ if (h->ssluser.size())
+ al.cache.ssluser = h->ssluser.termedBuf();
+#endif
+ al.cache.code = h->logType;
+ al.cache.requestSize = h->req_sz;
+ if (reply_) {
+ al.http.code = reply_->sline.status;
+ al.http.content_type = reply_->content_type.termedBuf();
+ al.cache.replySize = replyBodySize + reply_->hdr_sz;
+ al.cache.highOffset = replyBodySize;
+ //don't set al.cache.objectSize because it hasn't exist yet
+
+ Packer p;
+ MemBuf mb;
+
+ mb.init();
+ packerToMemInit(&p, &mb);
+
+ reply_->header.packInto(&p);
+ al.headers.reply = xstrdup(mb.buf);
+
+ packerClean(&p);
+ mb.clean();
+ }
+ prepareLogWithRequestDetails(request_, &al);
+ Xaction::finalizeLogInfo();
+}
+
+
void Adaptation::Icap::ModXact::makeRequestHeaders(MemBuf &buf)
{
char ntoabuf[MAX_IPSTRLEN];
buf.Printf("Proxy-Authorization: " SQUIDSTRINGPH "\r\n", SQUIDSTRINGPRINT(vh));
}
+ const HttpRequest *request = &virginRequest();
+
+ // share the cross-transactional database records if needed
+ if (Adaptation::Config::masterx_shared_name) {
+ Adaptation::History::Pointer ah = request->adaptHistory(true);
+ if (ah != NULL) {
+ String name, value;
+ if (ah->getXxRecord(name, value)) {
+ buf.Printf(SQUIDSTRINGPH ": " SQUIDSTRINGPH "\r\n",
+ SQUIDSTRINGPRINT(name), SQUIDSTRINGPRINT(value));
+ }
+ }
+ }
+
+
buf.Printf("Encapsulated: ");
MemBuf httpBuf;
// build HTTP request header, if any
ICAP::Method m = s.method;
- const HttpRequest *request = virgin.cause ?
- virgin.cause :
- dynamic_cast<const HttpRequest*>(virgin.header);
-
// to simplify, we could assume that request is always available
String urlPath;
urlPath = request->urlpath;
if (ICAP::methodRespmod == m)
encapsulateHead(buf, "req-hdr", httpBuf, request);
- else
- if (ICAP::methodReqmod == m)
- encapsulateHead(buf, "req-hdr", httpBuf, virgin.header);
+ else if (ICAP::methodReqmod == m)
+ encapsulateHead(buf, "req-hdr", httpBuf, virgin.header);
}
if (ICAP::methodRespmod == m)
buf.append(ICAP::crlf, 2); // terminate ICAP header
+ // fill icapRequest for logging
+ Must(icapRequest->parseCharBuf(buf.content(), buf.contentSize()));
+
// start ICAP request body with encapsulated HTTP headers
buf.append(httpBuf.content(), httpBuf.contentSize());
if (const HttpRequest* old_request = dynamic_cast<const HttpRequest*>(head)) {
HttpRequest* new_request = new HttpRequest;
- urlParse(old_request->method, old_request->canonical,new_request);
+ assert(old_request->canonical);
+ urlParse(old_request->method, old_request->canonical, new_request);
new_request->http_ver = old_request->http_ver;
headClone = new_request;
} else if (const HttpReply *old_reply = dynamic_cast<const HttpReply*>(head)) {
return;
}
- const HttpRequest *request = virgin.cause ?
- virgin.cause :
- dynamic_cast<const HttpRequest*>(virgin.header);
- const String urlPath = request ? request->urlpath : String();
+ const String urlPath = virginRequest().urlpath;
size_t wantedSize;
if (!service().wantsPreview(urlPath, wantedSize)) {
debugs(93, 5, HERE << "should not offer preview for " << urlPath);
Must(wantedSize >= 0);
// cannot preview more than we can backup
- size_t ad = XMIN(wantedSize, TheBackupLimit);
+ size_t ad = min(wantedSize, TheBackupLimit);
if (!virginBody.expected())
ad = 0;
- else
- if (virginBody.knownSize())
- ad = XMIN(static_cast<uint64_t>(ad), virginBody.size()); // not more than we have
+ else if (virginBody.knownSize())
+ ad = min(static_cast<uint64_t>(ad), virginBody.size()); // not more than we have
debugs(93, 5, HERE << "should offer " << ad << "-byte preview " <<
"(service wanted " << wantedSize << ")");
if (canStartBypass)
buf.append("Y", 1);
+
+ if (protectGroupBypass)
+ buf.append("G", 1);
}
void Adaptation::Icap::ModXact::fillDoneStatus(MemBuf &buf) const
if (virgin.cause)
method = virgin.cause->method;
+ else if (HttpRequest *req = dynamic_cast<HttpRequest*>(msg))
+ method = req->method;
else
- if (HttpRequest *req = dynamic_cast<HttpRequest*>(msg))
- method = req->method;
- else
- method = METHOD_NONE;
+ method = METHOD_NONE;
int64_t size;
// expectingBody returns true for zero-sized bodies, but we will not
if (wroteEof)
theState = stIeof; // written size is irrelevant
- else
- if (theWritten >= theAd)
- theState = stDone;
+ else if (theWritten >= theAd)
+ theState = stDone;
}
bool Adaptation::Icap::ModXact::fillVirginHttpHeader(MemBuf &mb) const
{
virgin.setHeader(virginHeader);
virgin.setCause(virginCause);
+ updateHistory(true);
}
Adaptation::Icap::Xaction *Adaptation::Icap::ModXactLauncher::createXaction()
Must(s != NULL);
return new Adaptation::Icap::ModXact(this, virgin.header, virgin.cause, s);
}
+
+void Adaptation::Icap::ModXactLauncher::swanSong()
+{
+ debugs(93, 5, HERE << "swan sings");
+ updateHistory(false);
+ Adaptation::Icap::Launcher::swanSong();
+}
+
+void Adaptation::Icap::ModXactLauncher::updateHistory(bool start)
+{
+ HttpRequest *r = virgin.cause ?
+ virgin.cause : dynamic_cast<HttpRequest*>(virgin.header);
+
+ // r should never be NULL but we play safe; TODO: add Should()
+ if (r) {
+ Adaptation::Icap::History::Pointer h = r->icapHistory();
+ if (h != NULL) {
+ if (start)
+ h->start("ICAPModXactLauncher");
+ else
+ h->stop("ICAPModXactLauncher");
+ }
+ }
+}