*/
#include "squid.h"
+#include "acl/Gadgets.h"
#include "base/TextException.h"
-#include "Server.h"
-#include "Store.h"
-#include "fde.h" /* for fd_table[fd].closing */
-#include "HttpRequest.h"
-#include "HttpReply.h"
+#include "comm/Connection.h"
+#include "comm/forward.h"
+#include "comm/Write.h"
+#include "err_detail_type.h"
#include "errorpage.h"
+#include "HttpReply.h"
+#include "HttpRequest.h"
+#include "protos.h"
+#include "Server.h"
#include "SquidTime.h"
+#include "StatCounters.h"
+#include "Store.h"
#if USE_ADAPTATION
#include "adaptation/AccessCheck.h"
+#include "adaptation/Answer.h"
#include "adaptation/Iterator.h"
+#include "base/AsyncCall.h"
#endif
// implemented in client_side_reply.cc until sides have a common parent
extern void purgeEntriesByUrl(HttpRequest * req, const char *url);
-
-ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateData"),requestSender(NULL)
+ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateData"),
+ requestSender(NULL),
#if USE_ADAPTATION
- , adaptedHeadSource(NULL)
- , adaptationAccessCheckPending(false)
- , startedAdaptation(false)
+ adaptedHeadSource(NULL),
+ adaptationAccessCheckPending(false),
+ startedAdaptation(false),
#endif
+ receivedWholeRequestBody(false),
+ theVirginReply(NULL),
+ theFinalReply(NULL)
{
fwd = theFwdState;
entry = fwd->entry;
#endif
}
-
HttpReply *
ServerStateData::virginReply()
{
assert(rep);
theFinalReply = HTTPMSGLOCK(rep);
- entry->replaceHttpReply(theFinalReply);
- haveParsedReplyHeaders();
+ // give entry the reply because haveParsedReplyHeaders() expects it there
+ entry->replaceHttpReply(theFinalReply, false); // but do not write yet
+ haveParsedReplyHeaders(); // update the entry/reply (e.g., set timestamps)
+ entry->startWriting(); // write the updated entry to store
return theFinalReply;
}
#endif
completeForwarding();
- quitIfAllDone();
}
-// When we are done talking to the primary server, we may be still talking
-// to the ICAP service. And vice versa. Here, we quit only if we are done
-// talking to both.
-void ServerStateData::quitIfAllDone()
+bool ServerStateData::doneAll() const
{
+ return doneWithServer() &&
#if USE_ADAPTATION
- if (!doneWithAdaptation()) {
- debugs(11,5, HERE << "transaction not done: still talking to ICAP");
- return;
- }
+ doneWithAdaptation() &&
+ Adaptation::Initiator::doneAll() &&
+ BodyProducer::doneAll() &&
#endif
-
- if (!doneWithServer()) {
- debugs(11,5, HERE << "transaction not done: still talking to server");
- return;
- }
-
- debugs(11,3, HERE << "transaction done");
-
- deleteThis("ServerStateData::quitIfAllDone");
+ BodyConsumer::doneAll();
}
// FTP side overloads this to work around multiple calls to fwd->complete
return;
}
#endif
- handleMoreRequestBodyAvailable();
+ if (requestBodySource == bp)
+ handleMoreRequestBodyAvailable();
}
// the entire request or adapted response body was provided, successfully
return;
}
#endif
- handleRequestBodyProductionEnded();
+ if (requestBodySource == bp)
+ handleRequestBodyProductionEnded();
}
// premature end of the request or adapted response body production
return;
}
#endif
- handleRequestBodyProducerAborted();
+ if (requestBodySource == bp)
+ handleRequestBodyProducerAborted();
}
-
// more origin request body data is available
void
ServerStateData::handleMoreRequestBodyAvailable()
void
ServerStateData::handleRequestBodyProductionEnded()
{
+ receivedWholeRequestBody = true;
if (!requestSender)
doneSendingRequestBody();
else
if (io.size > 0) {
fd_bytes(io.fd, io.size, FD_WRITE);
- kb_incr(&statCounter.server.all.kbytes_out, io.size);
+ kb_incr(&(statCounter.server.all.kbytes_out), io.size);
// kids should increment their counters
}
}
if (io.flag) {
- debugs(11, 1, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(errno));
+ debugs(11, DBG_IMPORTANT, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(io.xerrno));
ErrorState *err;
- err = errorCon(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request);
- err->xerrno = errno;
+ err = new ErrorState(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request);
+ err->xerrno = io.xerrno;
fwd->fail(err);
abortTransaction("I/O error while sending request body");
return;
return;
}
- if (requestBodySource->exhausted())
+ if (!requestBodySource->exhausted())
+ sendMoreRequestBody();
+ else if (receivedWholeRequestBody)
doneSendingRequestBody();
else
- sendMoreRequestBody();
-}
-
-bool
-ServerStateData::canSend(int fd) const
-{
- return fd >= 0 && !fd_table[fd].closing();
+ debugs(9,3, HERE << "waiting for body production end or abort");
}
void
assert(requestBodySource != NULL);
assert(!requestSender);
- const int fd = dataDescriptor();
+ const Comm::ConnectionPointer conn = dataConnection();
- if (!canSend(fd)) {
- debugs(9,3, HERE << "cannot send request body to closing FD " << fd);
+ if (!Comm::IsConnOpen(conn)) {
+ debugs(9,3, HERE << "cannot send request body to closing " << conn);
return; // wait for the kid's close handler; TODO: assert(closer);
}
MemBuf buf;
- if (requestBodySource->getMoreData(buf)) {
+ if (getMoreRequestBody(buf) && buf.contentSize() > 0) {
debugs(9,3, HERE << "will write " << buf.contentSize() << " request body bytes");
typedef CommCbMemFunT<ServerStateData, CommIoCbParams> Dialer;
- requestSender = JobCallback(93,3,
- Dialer, this, ServerStateData::sentRequestBody);
- comm_write_mbuf(fd, &buf, requestSender);
+ requestSender = JobCallback(93,3, Dialer, this, ServerStateData::sentRequestBody);
+ Comm::Write(conn, &buf, requestSender);
} else {
debugs(9,3, HERE << "will wait for more request body bytes or eof");
requestSender = NULL;
}
}
+/// either fill buf with available [encoded] request body bytes or return false
+bool
+ServerStateData::getMoreRequestBody(MemBuf &buf)
+{
+ // default implementation does not encode request body content
+ Must(requestBodySource != NULL);
+ return requestBodySource->getMoreData(buf);
+}
+
// Compares hosts in urls, returns false if different, no sheme, or no host.
static bool
sameUrlHosts(const char *url1, const char *url2)
purgeEntriesByHeader(request, reqUrl, theFinalReply, HDR_CONTENT_LOCATION);
}
-// called (usually by kids) when we have final (possibly adapted) reply headers
+/// called when we have final (possibly adapted) reply headers; kids extend
void
ServerStateData::haveParsedReplyHeaders()
{
}
adaptedHeadSource = initiateAdaptation(
- new Adaptation::Iterator(vrep, cause, group));
+ new Adaptation::Iterator(vrep, cause, group));
startedAdaptation = initiated(adaptedHeadSource);
Must(startedAdaptation);
}
// received adapted response headers (body may follow)
void
-ServerStateData::noteAdaptationAnswer(HttpMsg *msg)
+ServerStateData::noteAdaptationAnswer(const Adaptation::Answer &answer)
{
clearAdaptation(adaptedHeadSource); // we do not expect more messages
- if (abortOnBadEntry("entry went bad while waiting for adapted headers"))
+ switch (answer.kind) {
+ case Adaptation::Answer::akForward:
+ handleAdaptedHeader(answer.message);
+ break;
+
+ case Adaptation::Answer::akBlock:
+ handleAdaptationBlocked(answer);
+ break;
+
+ case Adaptation::Answer::akError:
+ handleAdaptationAborted(!answer.final);
+ break;
+ }
+}
+
+void
+ServerStateData::handleAdaptedHeader(HttpMsg *msg)
+{
+ if (abortOnBadEntry("entry went bad while waiting for adapted headers")) {
+ // If the adapted response has a body, the ICAP side needs to know
+ // that nobody will consume that body. We will be destroyed upon
+ // return. Tell the ICAP side that it is on its own.
+ HttpReply *rep = dynamic_cast<HttpReply*>(msg);
+ assert(rep);
+ if (rep->body_pipe != NULL)
+ rep->body_pipe->expectNoConsumption();
+
return;
+ }
HttpReply *rep = dynamic_cast<HttpReply*>(msg);
assert(rep);
}
}
-// will not receive adapted response headers (and, hence, body)
void
-ServerStateData::noteAdaptationQueryAbort(bool final)
+ServerStateData::resumeBodyStorage()
{
- clearAdaptation(adaptedHeadSource);
- handleAdaptationAborted(!final);
+ if (abortOnBadEntry("store entry aborted while kick producer callback"))
+ return;
+
+ if (!adaptedBodySource)
+ return;
+
+ handleMoreAdaptedBodyAvailable();
+
+ if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+ endAdaptedBodyConsumption();
}
// more adapted response body is available
void
ServerStateData::handleMoreAdaptedBodyAvailable()
{
- const size_t contentSize = adaptedBodySource->buf().contentSize();
-
- debugs(11,5, HERE << "consuming " << contentSize << " bytes of adapted " <<
- "response body at offset " << adaptedBodySource->consumedSize());
-
if (abortOnBadEntry("entry refuses adapted body"))
return;
assert(entry);
+
+ size_t contentSize = adaptedBodySource->buf().contentSize();
+
+ if (!contentSize)
+ return; // XXX: bytesWanted asserts on zero-size ranges
+
+ const size_t spaceAvailable = entry->bytesWanted(Range<size_t>(0, contentSize), true);
+
+ if (spaceAvailable < contentSize ) {
+ // No or partial body data consuming
+ typedef NullaryMemFunT<ServerStateData> Dialer;
+ AsyncCall::Pointer call = asyncCall(93, 5, "ServerStateData::resumeBodyStorage",
+ Dialer(this, &ServerStateData::resumeBodyStorage));
+ entry->deferProducer(call);
+ }
+
+ if (!spaceAvailable) {
+ debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " <<
+ "response body at offset " << adaptedBodySource->consumedSize());
+ return;
+ }
+
+ if (spaceAvailable < contentSize ) {
+ debugs(11, 5, HERE << "postponing storage of " <<
+ (contentSize - spaceAvailable) << " body bytes");
+ contentSize = spaceAvailable;
+ }
+
+ debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " <<
+ "response body at offset " << adaptedBodySource->consumedSize());
+
BodyPipeCheckout bpc(*adaptedBodySource);
- const StoreIOBuffer ioBuf(&bpc.buf, currentOffset);
- currentOffset += bpc.buf.size;
+ const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize);
+ currentOffset += ioBuf.length;
entry->write(ioBuf);
bpc.buf.consume(contentSize);
bpc.checkIn();
void
ServerStateData::handleAdaptedBodyProductionEnded()
{
- stopConsumingFrom(adaptedBodySource);
-
if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
return;
+ // end consumption if we consumed everything
+ if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+ endAdaptedBodyConsumption();
+ // else resumeBodyStorage() will eventually consume the rest
+}
+
+void
+ServerStateData::endAdaptedBodyConsumption()
+{
+ stopConsumingFrom(adaptedBodySource);
handleAdaptationCompleted();
}
}
completeForwarding();
- quitIfAllDone();
}
-
// common part of noteAdaptation*Aborted and noteBodyConsumerAborted methods
void
ServerStateData::handleAdaptationAborted(bool bypassable)
if (entry->isEmpty()) {
debugs(11,9, HERE << "creating ICAP error entry after ICAP failure");
- ErrorState *err = errorCon(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR, request);
- err->xerrno = errno;
+ ErrorState *err = new ErrorState(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR, request);
+ err->detailError(ERR_DETAIL_ICAP_RESPMOD_EARLY);
fwd->fail(err);
fwd->dontRetry(true);
+ } else if (request) { // update logged info directly
+ request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_ICAP_RESPMOD_LATE);
}
abortTransaction("ICAP failure");
}
+// adaptation service wants us to deny HTTP client access to this response
void
-ServerStateData::adaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
+ServerStateData::handleAdaptationBlocked(const Adaptation::Answer &answer)
+{
+ debugs(11,5, HERE << answer.ruleId);
+
+ if (abortOnBadEntry("entry went bad while ICAP aborted"))
+ return;
+
+ if (!entry->isEmpty()) { // too late to block (should not really happen)
+ if (request)
+ request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_RESPMOD_BLOCK_LATE);
+ abortTransaction("late adaptation block");
+ return;
+ }
+
+ debugs(11,7, HERE << "creating adaptation block response");
+
+ err_type page_id =
+ aclGetDenyInfoPage(&Config.denyInfoList, answer.ruleId.termedBuf(), 1);
+ if (page_id == ERR_NONE)
+ page_id = ERR_ACCESS_DENIED;
+
+ ErrorState *err = new ErrorState(page_id, HTTP_FORBIDDEN, request);
+ err->detailError(ERR_DETAIL_RESPMOD_BLOCK_EARLY);
+ fwd->fail(err);
+ fwd->dontRetry(true);
+
+ abortTransaction("timely adaptation block");
+}
+
+void
+ServerStateData::noteAdaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
{
adaptationAccessCheckPending = false;
startAdaptation(group, originalRequest());
processReplyBody();
}
-
-void
-ServerStateData::adaptationAclCheckDoneWrapper(Adaptation::ServiceGroupPointer group, void *data)
-{
- ServerStateData *state = (ServerStateData *)data;
- state->adaptationAclCheckDone(group);
-}
#endif
void
ServerStateData::sendBodyIsTooLargeError()
{
- ErrorState *err = errorCon(ERR_TOO_BIG, HTTP_FORBIDDEN, request);
- err->xerrno = errno;
+ ErrorState *err = new ErrorState(ERR_TOO_BIG, HTTP_FORBIDDEN, request);
fwd->fail(err);
fwd->dontRetry(true);
abortTransaction("Virgin body too large.");
// The callback can be called with a NULL service if adaptation is off.
adaptationAccessCheckPending = Adaptation::AccessCheck::Start(
Adaptation::methodRespmod, Adaptation::pointPreCache,
- request, virginReply(), adaptationAclCheckDoneWrapper, this);
+ originalRequest(), virginReply(), this);
debugs(11,5, HERE << "adaptationAccessCheckPending=" << adaptationAccessCheckPending);
if (adaptationAccessCheckPending)
return;