#include "comm/Connection.h"
#include "comm/forward.h"
#include "comm/Write.h"
-#include "Server.h"
-#include "Store.h"
-#include "HttpRequest.h"
-#include "HttpReply.h"
-#include "errorpage.h"
#include "err_detail_type.h"
+#include "errorpage.h"
+#include "HttpReply.h"
+#include "HttpRequest.h"
+#include "protos.h"
+#include "Server.h"
#include "SquidTime.h"
+#include "StatCounters.h"
+#include "Store.h"
#if USE_ADAPTATION
#include "adaptation/AccessCheck.h"
#include "adaptation/Answer.h"
#include "adaptation/Iterator.h"
+#include "base/AsyncCall.h"
#endif
// implemented in client_side_reply.cc until sides have a common parent
extern void purgeEntriesByUrl(HttpRequest * req, const char *url);
-
ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateData"),
requestSender(NULL),
#if USE_ADAPTATION
adaptationAccessCheckPending(false),
startedAdaptation(false),
#endif
- receivedWholeRequestBody(false)
+ receivedWholeRequestBody(false),
+ theVirginReply(NULL),
+ theFinalReply(NULL)
{
fwd = theFwdState;
entry = fwd->entry;
#endif
}
-
HttpReply *
ServerStateData::virginReply()
{
#endif
completeForwarding();
- quitIfAllDone();
}
-// When we are done talking to the primary server, we may be still talking
-// to the ICAP service. And vice versa. Here, we quit only if we are done
-// talking to both.
-void ServerStateData::quitIfAllDone()
+bool ServerStateData::doneAll() const
{
+ return doneWithServer() &&
#if USE_ADAPTATION
- if (!doneWithAdaptation()) {
- debugs(11,5, HERE << "transaction not done: still talking to ICAP");
- return;
- }
+ doneWithAdaptation() &&
+ Adaptation::Initiator::doneAll() &&
+ BodyProducer::doneAll() &&
#endif
-
- if (!doneWithServer()) {
- debugs(11,5, HERE << "transaction not done: still talking to server");
- return;
- }
-
- debugs(11,3, HERE << "transaction done");
-
- deleteThis("ServerStateData::quitIfAllDone");
+ BodyConsumer::doneAll();
}
// FTP side overloads this to work around multiple calls to fwd->complete
return;
}
#endif
- handleMoreRequestBodyAvailable();
+ if (requestBodySource == bp)
+ handleMoreRequestBodyAvailable();
}
// the entire request or adapted response body was provided, successfully
return;
}
#endif
- handleRequestBodyProductionEnded();
+ if (requestBodySource == bp)
+ handleRequestBodyProductionEnded();
}
// premature end of the request or adapted response body production
return;
}
#endif
- handleRequestBodyProducerAborted();
+ if (requestBodySource == bp)
+ handleRequestBodyProducerAborted();
}
-
// more origin request body data is available
void
ServerStateData::handleMoreRequestBodyAvailable()
if (io.size > 0) {
fd_bytes(io.fd, io.size, FD_WRITE);
- kb_incr(&statCounter.server.all.kbytes_out, io.size);
+ kb_incr(&(statCounter.server.all.kbytes_out), io.size);
// kids should increment their counters
}
}
if (io.flag) {
- debugs(11, 1, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(io.xerrno));
+ debugs(11, DBG_IMPORTANT, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(io.xerrno));
ErrorState *err;
- err = errorCon(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request);
+ err = new ErrorState(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request);
err->xerrno = io.xerrno;
fwd->fail(err);
abortTransaction("I/O error while sending request body");
}
}
-// more adapted response body is available
void
-ServerStateData::handleMoreAdaptedBodyAvailable()
+ServerStateData::resumeBodyStorage()
{
- const size_t contentSize = adaptedBodySource->buf().contentSize();
+ if (abortOnBadEntry("store entry aborted while kick producer callback"))
+ return;
- debugs(11,5, HERE << "consuming " << contentSize << " bytes of adapted " <<
- "response body at offset " << adaptedBodySource->consumedSize());
+ if (!adaptedBodySource)
+ return;
+
+ handleMoreAdaptedBodyAvailable();
+
+ if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+ endAdaptedBodyConsumption();
+}
+// more adapted response body is available
+void
+ServerStateData::handleMoreAdaptedBodyAvailable()
+{
if (abortOnBadEntry("entry refuses adapted body"))
return;
assert(entry);
+
+ size_t contentSize = adaptedBodySource->buf().contentSize();
+
+ if (!contentSize)
+ return; // XXX: bytesWanted asserts on zero-size ranges
+
+ const size_t spaceAvailable = entry->bytesWanted(Range<size_t>(0, contentSize), true);
+
+ if (spaceAvailable < contentSize ) {
+ // No or partial body data consuming
+ typedef NullaryMemFunT<ServerStateData> Dialer;
+ AsyncCall::Pointer call = asyncCall(93, 5, "ServerStateData::resumeBodyStorage",
+ Dialer(this, &ServerStateData::resumeBodyStorage));
+ entry->deferProducer(call);
+ }
+
+ if (!spaceAvailable) {
+ debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " <<
+ "response body at offset " << adaptedBodySource->consumedSize());
+ return;
+ }
+
+ if (spaceAvailable < contentSize ) {
+ debugs(11, 5, HERE << "postponing storage of " <<
+ (contentSize - spaceAvailable) << " body bytes");
+ contentSize = spaceAvailable;
+ }
+
+ debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " <<
+ "response body at offset " << adaptedBodySource->consumedSize());
+
BodyPipeCheckout bpc(*adaptedBodySource);
- const StoreIOBuffer ioBuf(&bpc.buf, currentOffset);
- currentOffset += bpc.buf.size;
+ const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize);
+ currentOffset += ioBuf.length;
entry->write(ioBuf);
bpc.buf.consume(contentSize);
bpc.checkIn();
void
ServerStateData::handleAdaptedBodyProductionEnded()
{
- stopConsumingFrom(adaptedBodySource);
-
if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
return;
+ // end consumption if we consumed everything
+ if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+ endAdaptedBodyConsumption();
+ // else resumeBodyStorage() will eventually consume the rest
+}
+
+void
+ServerStateData::endAdaptedBodyConsumption()
+{
+ stopConsumingFrom(adaptedBodySource);
handleAdaptationCompleted();
}
}
completeForwarding();
- quitIfAllDone();
}
-
// common part of noteAdaptation*Aborted and noteBodyConsumerAborted methods
void
ServerStateData::handleAdaptationAborted(bool bypassable)
if (entry->isEmpty()) {
debugs(11,9, HERE << "creating ICAP error entry after ICAP failure");
- ErrorState *err = errorCon(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR, request);
- err->xerrno = ERR_DETAIL_ICAP_RESPMOD_EARLY;
+ ErrorState *err = new ErrorState(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR, request);
+ err->detailError(ERR_DETAIL_ICAP_RESPMOD_EARLY);
fwd->fail(err);
fwd->dontRetry(true);
} else if (request) { // update logged info directly
if (page_id == ERR_NONE)
page_id = ERR_ACCESS_DENIED;
- ErrorState *err = errorCon(page_id, HTTP_FORBIDDEN, request);
- err->xerrno = ERR_DETAIL_RESPMOD_BLOCK_EARLY;
+ ErrorState *err = new ErrorState(page_id, HTTP_FORBIDDEN, request);
+ err->detailError(ERR_DETAIL_RESPMOD_BLOCK_EARLY);
fwd->fail(err);
fwd->dontRetry(true);
}
void
-ServerStateData::adaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
+ServerStateData::noteAdaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
{
adaptationAccessCheckPending = false;
startAdaptation(group, originalRequest());
processReplyBody();
}
-
-void
-ServerStateData::adaptationAclCheckDoneWrapper(Adaptation::ServiceGroupPointer group, void *data)
-{
- ServerStateData *state = (ServerStateData *)data;
- state->adaptationAclCheckDone(group);
-}
#endif
void
ServerStateData::sendBodyIsTooLargeError()
{
- ErrorState *err = errorCon(ERR_TOO_BIG, HTTP_FORBIDDEN, request);
- err->xerrno = errno;
+ ErrorState *err = new ErrorState(ERR_TOO_BIG, HTTP_FORBIDDEN, request);
fwd->fail(err);
fwd->dontRetry(true);
abortTransaction("Virgin body too large.");
// The callback can be called with a NULL service if adaptation is off.
adaptationAccessCheckPending = Adaptation::AccessCheck::Start(
Adaptation::methodRespmod, Adaptation::pointPreCache,
- originalRequest(), virginReply(), adaptationAclCheckDoneWrapper, this);
+ originalRequest(), virginReply(), this);
debugs(11,5, HERE << "adaptationAccessCheckPending=" << adaptationAccessCheckPending);
if (adaptationAccessCheckPending)
return;