*/
#include "squid.h"
+#include "AccessLogEntry.h"
+#include "adaptation/History.h"
+#include "adaptation/icap/Client.h"
+#include "adaptation/icap/Config.h"
+#include "adaptation/icap/History.h"
+#include "adaptation/icap/Launcher.h"
+#include "adaptation/icap/ModXact.h"
+#include "adaptation/icap/ServiceRep.h"
+#include "adaptation/Initiator.h"
+#include "auth/UserRequest.h"
+#include "base/TextException.h"
+#include "base64.h"
+#include "ChunkedCodingParser.h"
#include "comm.h"
+#include "comm/Connection.h"
#include "HttpMsg.h"
#include "HttpRequest.h"
#include "HttpReply.h"
-#include "adaptation/Initiator.h"
-#include "adaptation/icap/ServiceRep.h"
-#include "adaptation/icap/Launcher.h"
-#include "adaptation/icap/ModXact.h"
-#include "adaptation/icap/Client.h"
-#include "ChunkedCodingParser.h"
-#include "TextException.h"
-#include "auth/UserRequest.h"
-#include "adaptation/icap/Config.h"
#include "SquidTime.h"
-#include "AccessLogEntry.h"
-#include "adaptation/icap/History.h"
-#include "adaptation/History.h"
+#include "err_detail_type.h"
// flow and terminology:
// HTTP| --> receive --> encode --> write --> |network
memset(this, 0, sizeof(*this));
}
-Adaptation::Icap::ModXact::ModXact(Adaptation::Initiator *anInitiator, HttpMsg *virginHeader,
+Adaptation::Icap::ModXact::ModXact(HttpMsg *virginHeader,
HttpRequest *virginCause, Adaptation::Icap::ServiceRep::Pointer &aService):
AsyncJob("Adaptation::Icap::ModXact"),
- Adaptation::Icap::Xaction("Adaptation::Icap::ModXact", anInitiator, aService),
+ Adaptation::Icap::Xaction("Adaptation::Icap::ModXact", aService),
virginConsumed(0),
bodyParser(NULL),
canStartBypass(false), // too early
protectGroupBypass(true),
- replyBodySize(0),
+ replyHttpHeaderSize(-1),
+ replyHttpBodySize(-1),
adaptHistoryId(-1)
{
assert(virginHeader);
// nothing to do because we are using temporary buffers
// parsing; TODO: do not set until we parse, see ICAPOptXact
- icapReply = HTTPMSGLOCK(new HttpReply);
+ icapReply = new HttpReply;
icapReply->protoPrefix = "ICAP/"; // TODO: make an IcapReply class?
debugs(93,7, HERE << "initialized." << status());
{
Must(!state.serviceWaiting);
debugs(93, 7, HERE << "will wait for the ICAP service" << status());
- state.serviceWaiting = true;
- AsyncCall::Pointer call = asyncCall(93,5, "Adaptation::Icap::ModXact::noteServiceReady",
- MemFun(this, &Adaptation::Icap::ModXact::noteServiceReady));
+ typedef NullaryMemFunT<ModXact> Dialer;
+ AsyncCall::Pointer call = JobCallback(93,5,
+ Dialer, this, Adaptation::Icap::ModXact::noteServiceReady);
service().callWhenReady(call);
+ state.serviceWaiting = true; // after callWhenReady() which may throw
}
void Adaptation::Icap::ModXact::noteServiceReady()
Must(state.writing == State::writingHeaders);
// determine next step
- if (preview.enabled())
- state.writing = preview.done() ? State::writingPaused : State::writingPreview;
- else if (virginBody.expected())
+ if (preview.enabled()) {
+ if (preview.done())
+ decideWritingAfterPreview("zero-size");
+ else
+ state.writing = State::writingPreview;
+ } else if (virginBody.expected()) {
state.writing = State::writingPrime;
- else {
+ } else {
stopWriting(true);
return;
}
// change state once preview is written
- if (preview.done()) {
- debugs(93, 7, HERE << "wrote entire Preview body" << status());
+ if (preview.done())
+ decideWritingAfterPreview("body");
+}
- if (preview.ieof())
- stopWriting(true);
- else
- state.writing = State::writingPaused;
- }
+/// determine state.writing after we wrote the entire preview
+void Adaptation::Icap::ModXact::decideWritingAfterPreview(const char *kind)
+{
+ if (preview.ieof()) // nothing more to write
+ stopWriting(true);
+ else if (state.parsing == State::psIcapHeader) // did not get a reply yet
+ state.writing = State::writingPaused; // wait for the ICAP server reply
+ else
+ stopWriting(true); // ICAP server reply implies no post-preview writing
+
+ debugs(93, 6, HERE << "decided on writing after " << kind << " preview" <<
+ status());
}
void Adaptation::Icap::ModXact::writePrimeBody()
void Adaptation::Icap::ModXact::startReading()
{
- Must(connection >= 0);
+ Must(haveConnection());
Must(!reader);
Must(!adapted.header);
Must(!adapted.body_pipe);
void Adaptation::Icap::ModXact::callException(const std::exception &e)
{
if (!canStartBypass || isRetriable) {
+ if (!isRetriable) {
+ if (const TextException *te = dynamic_cast<const TextException *>(&e))
+ detailError(ERR_DETAIL_EXCEPTION_START + te->id());
+ else
+ detailError(ERR_DETAIL_EXCEPTION_OTHER);
+ }
Adaptation::Icap::Xaction::callException(e);
return;
}
debugs(93, 3, HERE << "bypassing " << inCall << " exception: " <<
e.what() << ' ' << status());
bypassFailure();
+ } catch (const TextException &bypassTe) {
+ detailError(ERR_DETAIL_EXCEPTION_START + bypassTe.id());
+ Adaptation::Icap::Xaction::callException(bypassTe);
} catch (const std::exception &bypassE) {
+ detailError(ERR_DETAIL_EXCEPTION_OTHER);
Adaptation::Icap::Xaction::callException(bypassE);
}
}
stopParsing();
stopWriting(true); // or should we force it?
- if (connection >= 0) {
+ if (haveConnection()) {
reuseConnection = false; // be conservative
cancelRead(); // may not work; and we cannot stop connecting either
if (!doneWithIo())
handle204NoContent();
break;
+ case 206:
+ handle206PartialContent();
+ break;
+
default:
debugs(93, 5, HERE << "ICAP status " << icapReply->sline.status);
handleUnknownScode();
// server must not respond before the end of preview: we may send ieof
Must(preview.enabled() && preview.done() && !preview.ieof());
- // 100 "Continue" cancels our preview commitment, not 204s outside preview
- if (!state.allowedPostview204)
+ // 100 "Continue" cancels our Preview commitment,
+ // but not commitment to handle 204 or 206 outside Preview
+ if (!state.allowedPostview204 && !state.allowedPostview206)
stopBackup();
state.parsing = State::psIcapHeader; // eventually
prepEchoing();
}
+void Adaptation::Icap::ModXact::handle206PartialContent()
+{
+ if (state.writing == State::writingPaused) {
+ Must(preview.enabled());
+ Must(state.allowedPreview206);
+ debugs(93, 7, HERE << "206 inside preview");
+ } else {
+ Must(state.writing > State::writingPaused);
+ Must(state.allowedPostview206);
+ debugs(93, 7, HERE << "206 outside preview");
+ }
+ state.parsing = State::psHttpHeader;
+ state.sending = State::sendingAdapted;
+ state.readyForUob = true;
+ checkConsuming();
+}
+
// Called when we receive a 204 No Content response and
// when we are trying to bypass a service failure.
// We actually start sending (echoig or not) in startSending.
// allocate the adapted message and copy metainfo
Must(!adapted.header);
- HttpMsg *newHead = NULL;
- if (const HttpRequest *oldR = dynamic_cast<const HttpRequest*>(oldHead)) {
- HttpRequest *newR = new HttpRequest;
- newR->canonical = oldR->canonical ?
- xstrdup(oldR->canonical) : NULL; // parse() does not set it
- newHead = newR;
- } else if (dynamic_cast<const HttpReply*>(oldHead)) {
- HttpReply *newRep = new HttpReply;
- newHead = newRep;
- }
- Must(newHead);
- newHead->inheritProperties(oldHead);
+ {
+ HttpMsg::Pointer newHead;
+ if (const HttpRequest *oldR = dynamic_cast<const HttpRequest*>(oldHead)) {
+ HttpRequest::Pointer newR(new HttpRequest);
+ newR->canonical = oldR->canonical ?
+ xstrdup(oldR->canonical) : NULL; // parse() does not set it
+ newHead = newR;
+ } else if (dynamic_cast<const HttpReply*>(oldHead)) {
+ newHead = new HttpReply;
+ }
+ Must(newHead != NULL);
+
+ newHead->inheritProperties(oldHead);
- adapted.setHeader(newHead);
+ adapted.setHeader(newHead);
+ }
// parse the buffer back
http_status error = HTTP_STATUS_NONE;
- Must(newHead->parse(&httpBuf, true, &error));
+ Must(adapted.header->parse(&httpBuf, true, &error));
- Must(newHead->hdr_sz == httpBuf.contentSize()); // no leftovers
+ Must(adapted.header->hdr_sz == httpBuf.contentSize()); // no leftovers
httpBuf.clean();
debugs(93, 7, HERE << "cloned virgin message " << oldHead << " to " <<
- newHead);
+ adapted.header);
// setup adapted body pipe if needed
if (oldHead->body_pipe != NULL) {
}
}
+/// Called when we received use-original-body chunk extension in 206 response.
+/// We actually start sending (echoing or not) in startSending().
+void Adaptation::Icap::ModXact::prepPartialBodyEchoing(uint64_t pos)
+{
+ Must(virginBodySending.active());
+ Must(virgin.header->body_pipe != NULL);
+
+ setOutcome(xoPartEcho);
+
+ debugs(93, 7, HERE << "will echo virgin body suffix from " <<
+ virgin.header->body_pipe << " offset " << pos );
+
+ // check that use-original-body=N does not point beyond buffered data
+ const uint64_t virginDataEnd = virginConsumed +
+ virgin.body_pipe->buf().contentSize();
+ Must(pos <= virginDataEnd);
+ virginBodySending.progress(static_cast<size_t>(pos));
+
+ state.sending = State::sendingVirgin;
+ checkConsuming();
+
+ if (virgin.header->body_pipe->bodySizeKnown())
+ adapted.body_pipe->expectProductionEndAfter(virgin.header->body_pipe->bodySize() - pos);
+
+ debugs(93, 7, HERE << "will echo virgin body suffix to " <<
+ adapted.body_pipe);
+
+ // Start echoing data
+ echoMore();
+}
+
void Adaptation::Icap::ModXact::handleUnknownScode()
{
stopParsing();
void Adaptation::Icap::ModXact::parseHttpHead()
{
if (gotEncapsulated("res-hdr") || gotEncapsulated("req-hdr")) {
+ replyHttpHeaderSize = 0;
maybeAllocateHttpMsg();
if (!parseHead(adapted.header))
return; // need more header data
+ if (adapted.header)
+ replyHttpHeaderSize = adapted.header->hdr_sz;
+
if (dynamic_cast<HttpRequest*>(adapted.header)) {
const HttpRequest *oldR = dynamic_cast<const HttpRequest*>(virgin.header);
Must(oldR);
if (gotEncapsulated("res-body") || gotEncapsulated("req-body")) {
debugs(93, 5, HERE << "expecting a body");
state.parsing = State::psBody;
+ replyHttpBodySize = 0;
bodyParser = new ChunkedCodingParser;
makeAdaptedBodyPipe("adapted response from the ICAP server");
Must(state.sending == State::sendingAdapted);
debugs(93, 5, HERE << "have " << readBuf.contentSize() << " body bytes after " <<
"parse; parsed all: " << parsed);
- replyBodySize += adapted.body_pipe->buf().contentSize();
+ replyHttpBodySize += adapted.body_pipe->buf().contentSize();
// TODO: expose BodyPipe::putSize() to make this check simpler and clearer
// TODO: do we really need this if we disable when sending headers?
}
if (parsed) {
+ if (state.readyForUob && bodyParser->useOriginBody >= 0) {
+ prepPartialBodyEchoing(
+ static_cast<uint64_t>(bodyParser->useOriginBody));
+ stopParsing();
+ return;
+ }
+
stopParsing();
stopSending(true); // the parser succeeds only if all parsed data fits
return;
// adapted body consumer aborted
void Adaptation::Icap::ModXact::noteBodyConsumerAborted(BodyPipe::Pointer)
{
+ detailError(ERR_DETAIL_ICAP_XACT_BODY_CONSUMER_ABORT);
mustStop("adapted body consumer aborted");
}
+Adaptation::Icap::ModXact::~ModXact()
+{
+ delete bodyParser;
+}
+
// internal cleanup
void Adaptation::Icap::ModXact::swanSong()
{
stopWriting(false);
stopSending(false);
+ if (theInitiator.set()) // we have not sent the answer to the initiator
+ detailError(ERR_DETAIL_ICAP_XACT_OTHER);
+
// update adaptation history if start was called and we reserved a slot
Adaptation::History::Pointer ah = virginRequest().adaptLogHistory();
if (ah != NULL && adaptHistoryId >= 0)
#endif
al.cache.code = h->logType;
al.cache.requestSize = h->req_sz;
+
+ // leave al.icap.bodyBytesRead negative if no body
+ if (replyHttpHeaderSize >= 0 || replyHttpBodySize >= 0) {
+ const int64_t zero = 0; // to make max() argument types the same
+ al.icap.bodyBytesRead =
+ max(zero, replyHttpHeaderSize) + max(zero, replyHttpBodySize);
+ }
+
if (reply_) {
al.http.code = reply_->sline.status;
al.http.content_type = reply_->content_type.termedBuf();
- al.cache.replySize = replyBodySize + reply_->hdr_sz;
- al.cache.highOffset = replyBodySize;
+ if (replyHttpBodySize >= 0) {
+ al.cache.replySize = replyHttpBodySize + reply_->hdr_sz;
+ al.cache.highOffset = replyHttpBodySize;
+ }
//don't set al.cache.objectSize because it hasn't exist yet
Packer p;
if (preview.enabled()) {
buf.Printf("Preview: %d\r\n", (int)preview.ad());
- if (virginBody.expected()) // there is a body to preview
- virginBodySending.plan();
- else
+ if (!virginBody.expected()) // there is no body to preview
finishNullOrEmptyBodyPreview(httpBuf);
}
- if (shouldAllow204()) {
- debugs(93,5, HERE << "will allow 204s outside of preview");
- state.allowedPostview204 = true;
- buf.Printf("Allow: 204\r\n");
- if (virginBody.expected()) // there is a body to echo
- virginBodySending.plan();
- }
+ makeAllowHeader(buf);
- if (TheConfig.send_client_ip && request)
- if (!request->client_addr.IsAnyAddr() && !request->client_addr.IsNoAddr())
- buf.Printf("X-Client-IP: %s\r\n", request->client_addr.NtoA(ntoabuf,MAX_IPSTRLEN));
+ if (TheConfig.send_client_ip && request) {
+ Ip::Address client_addr;
+#if FOLLOW_X_FORWARDED_FOR
+ if (TheConfig.icap_uses_indirect_client) {
+ client_addr = request->indirect_client_addr;
+ } else
+#endif
+ client_addr = request->client_addr;
+ if (!client_addr.IsAnyAddr() && !client_addr.IsNoAddr())
+ buf.Printf("X-Client-IP: %s\r\n", client_addr.NtoA(ntoabuf,MAX_IPSTRLEN));
+ }
if (TheConfig.send_client_username && request)
makeUsernameHeader(request, buf);
httpBuf.clean();
}
+// decides which Allow values to write and updates the request buffer
+void Adaptation::Icap::ModXact::makeAllowHeader(MemBuf &buf)
+{
+ const bool allow204in = preview.enabled(); // TODO: add shouldAllow204in()
+ const bool allow204out = state.allowedPostview204 = shouldAllow204();
+ const bool allow206in = state.allowedPreview206 = shouldAllow206in();
+ const bool allow206out = state.allowedPostview206 = shouldAllow206out();
+
+ debugs(93,9, HERE << "Allows: " << allow204in << allow204out <<
+ allow206in << allow206out);
+
+ const bool allow204 = allow204in || allow204out;
+ const bool allow206 = allow206in || allow206out;
+
+ if (!allow204 && !allow206)
+ return; // nothing to do
+
+ if (virginBody.expected()) // if there is a virgin body, plan to send it
+ virginBodySending.plan();
+
+ // writing Preview:... means we will honor 204 inside preview
+ // writing Allow/204 means we will honor 204 outside preview
+ // writing Allow:206 means we will honor 206 inside preview
+ // writing Allow:204,206 means we will honor 206 outside preview
+ const char *allowHeader = NULL;
+ if (allow204out && allow206)
+ allowHeader = "Allow: 204, 206\r\n";
+ else if (allow204out)
+ allowHeader = "Allow: 204\r\n";
+ else if (allow206)
+ allowHeader = "Allow: 206\r\n";
+
+ if (allowHeader) { // may be nil if only allow204in is true
+ buf.append(allowHeader, strlen(allowHeader));
+ debugs(93,5, HERE << "Will write " << allowHeader);
+ }
+}
+
void Adaptation::Icap::ModXact::makeUsernameHeader(const HttpRequest *request, MemBuf &buf)
{
- if (const AuthUserRequest *auth = request->auth_user_request) {
- if (char const *name = auth->username()) {
- const char *value = TheConfig.client_username_encode ?
- base64_encode(name) : name;
- buf.Printf("%s: %s\r\n", TheConfig.client_username_header,
- value);
+ if (request->auth_user_request != NULL) {
+ char const *name = request->auth_user_request->username();
+ if (name) {
+ const char *value = TheConfig.client_username_encode ? base64_encode(name) : name;
+ buf.Printf("%s: %s\r\n", TheConfig.client_username_header, value);
}
}
}
icapBuf.Printf("%s=%d, ", section, (int) httpBuf.contentSize());
// begin cloning
- HttpMsg *headClone = NULL;
+ HttpMsg::Pointer headClone;
if (const HttpRequest* old_request = dynamic_cast<const HttpRequest*>(head)) {
- HttpRequest* new_request = new HttpRequest;
- assert(old_request->canonical);
+ HttpRequest::Pointer new_request(new HttpRequest);
+ Must(old_request->canonical);
urlParse(old_request->method, old_request->canonical, new_request);
new_request->http_ver = old_request->http_ver;
headClone = new_request;
} else if (const HttpReply *old_reply = dynamic_cast<const HttpReply*>(head)) {
- HttpReply* new_reply = new HttpReply;
+ HttpReply::Pointer new_reply(new HttpReply);
new_reply->sline = old_reply->sline;
headClone = new_reply;
}
-
- Must(headClone);
+ Must(headClone != NULL);
headClone->inheritProperties(head);
HttpHeaderPos pos = HttpHeaderInitPos;
// pack polished HTTP header
packHead(httpBuf, headClone);
- delete headClone;
+ // headClone unlocks and, hence, deletes the message we packed
}
void Adaptation::Icap::ModXact::packHead(MemBuf &httpBuf, const HttpMsg *head)
return canBackupEverything();
}
+// decides whether to allow 206 responses in some mode
+bool Adaptation::Icap::ModXact::shouldAllow206any()
+{
+ return TheConfig.allow206_enable && service().allows206() &&
+ virginBody.expected(); // no need for 206 without a body
+}
+
+// decides whether to allow 206 responses in preview mode
+bool Adaptation::Icap::ModXact::shouldAllow206in()
+{
+ return shouldAllow206any() && preview.enabled();
+}
+
+// decides whether to allow 206 responses outside of preview
+bool Adaptation::Icap::ModXact::shouldAllow206out()
+{
+ return shouldAllow206any() && canBackupEverything();
+}
+
// used by shouldAllow204 and decideOnRetries
bool Adaptation::Icap::ModXact::canBackupEverything() const
{
if (virgin.body_pipe != NULL)
buf.append("R", 1);
- if (connection > 0 && !doneReading())
+ if (haveConnection() && !doneReading())
buf.append("r", 1);
if (!state.doneWriting() && state.writing != State::writingInit)
if (!doneSending() && state.sending != State::sendingUndecided)
buf.Printf("S(%d)", state.sending);
+ if (state.readyForUob)
+ buf.append("6", 1);
+
if (canStartBypass)
buf.append("Y", 1);
return true;
}
+void Adaptation::Icap::ModXact::detailError(int errDetail)
+{
+ if (HttpRequest *request = virgin.cause ?
+ virgin.cause : dynamic_cast<HttpRequest*>(virgin.header)) {
+ request->detailError(ERR_ICAP_FAILURE, errDetail);
+ }
+}
/* Adaptation::Icap::ModXactLauncher */
-Adaptation::Icap::ModXactLauncher::ModXactLauncher(Adaptation::Initiator *anInitiator, HttpMsg *virginHeader, HttpRequest *virginCause, Adaptation::ServicePointer aService):
+Adaptation::Icap::ModXactLauncher::ModXactLauncher(HttpMsg *virginHeader, HttpRequest *virginCause, Adaptation::ServicePointer aService):
AsyncJob("Adaptation::Icap::ModXactLauncher"),
- Adaptation::Icap::Launcher("Adaptation::Icap::ModXactLauncher", anInitiator, aService)
+ Adaptation::Icap::Launcher("Adaptation::Icap::ModXactLauncher", aService)
{
virgin.setHeader(virginHeader);
virgin.setCause(virginCause);
Adaptation::Icap::ServiceRep::Pointer s =
dynamic_cast<Adaptation::Icap::ServiceRep*>(theService.getRaw());
Must(s != NULL);
- return new Adaptation::Icap::ModXact(this, virgin.header, virgin.cause, s);
+ return new Adaptation::Icap::ModXact(virgin.header, virgin.cause, s);
}
void Adaptation::Icap::ModXactLauncher::swanSong()