]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/Server.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / Server.cc
index 136aeb88e5ba85cb6a5edad4f1f14c54ad2bb8b6..927245b4ae0121248372ae25b6138be649576166 100644 (file)
 #include "comm/Connection.h"
 #include "comm/forward.h"
 #include "comm/Write.h"
-#include "Server.h"
-#include "Store.h"
-#include "HttpRequest.h"
-#include "HttpReply.h"
-#include "errorpage.h"
 #include "err_detail_type.h"
+#include "errorpage.h"
+#include "HttpReply.h"
+#include "HttpRequest.h"
+#include "protos.h"
+#include "Server.h"
 #include "SquidTime.h"
+#include "StatCounters.h"
+#include "Store.h"
 
 #if USE_ADAPTATION
 #include "adaptation/AccessCheck.h"
 #include "adaptation/Answer.h"
 #include "adaptation/Iterator.h"
+#include "base/AsyncCall.h"
 #endif
 
 // implemented in client_side_reply.cc until sides have a common parent
 extern void purgeEntriesByUrl(HttpRequest * req, const char *url);
 
-
 ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateData"),
         requestSender(NULL),
 #if USE_ADAPTATION
@@ -63,7 +65,9 @@ ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateDa
         adaptationAccessCheckPending(false),
         startedAdaptation(false),
 #endif
-        receivedWholeRequestBody(false)
+        receivedWholeRequestBody(false),
+        theVirginReply(NULL),
+        theFinalReply(NULL)
 {
     fwd = theFwdState;
     entry = fwd->entry;
@@ -122,7 +126,6 @@ ServerStateData::swanSong()
 #endif
 }
 
-
 HttpReply *
 ServerStateData::virginReply()
 {
@@ -211,29 +214,17 @@ ServerStateData::serverComplete2()
 #endif
 
     completeForwarding();
-    quitIfAllDone();
 }
 
-// When we are done talking to the primary server, we may be still talking
-// to the ICAP service. And vice versa. Here, we quit only if we are done
-// talking to both.
-void ServerStateData::quitIfAllDone()
+bool ServerStateData::doneAll() const
 {
+    return  doneWithServer() &&
 #if USE_ADAPTATION
-    if (!doneWithAdaptation()) {
-        debugs(11,5, HERE << "transaction not done: still talking to ICAP");
-        return;
-    }
+            doneWithAdaptation() &&
+            Adaptation::Initiator::doneAll() &&
+            BodyProducer::doneAll() &&
 #endif
-
-    if (!doneWithServer()) {
-        debugs(11,5, HERE << "transaction not done: still talking to server");
-        return;
-    }
-
-    debugs(11,3, HERE << "transaction done");
-
-    deleteThis("ServerStateData::quitIfAllDone");
+            BodyConsumer::doneAll();
 }
 
 // FTP side overloads this to work around multiple calls to fwd->complete
@@ -285,7 +276,8 @@ ServerStateData::noteMoreBodyDataAvailable(BodyPipe::Pointer bp)
         return;
     }
 #endif
-    handleMoreRequestBodyAvailable();
+    if (requestBodySource == bp)
+        handleMoreRequestBodyAvailable();
 }
 
 // the entire request or adapted response body was provided, successfully
@@ -298,7 +290,8 @@ ServerStateData::noteBodyProductionEnded(BodyPipe::Pointer bp)
         return;
     }
 #endif
-    handleRequestBodyProductionEnded();
+    if (requestBodySource == bp)
+        handleRequestBodyProductionEnded();
 }
 
 // premature end of the request or adapted response body production
@@ -311,10 +304,10 @@ ServerStateData::noteBodyProducerAborted(BodyPipe::Pointer bp)
         return;
     }
 #endif
-    handleRequestBodyProducerAborted();
+    if (requestBodySource == bp)
+        handleRequestBodyProducerAborted();
 }
 
-
 // more origin request body data is available
 void
 ServerStateData::handleMoreRequestBodyAvailable()
@@ -371,7 +364,7 @@ ServerStateData::sentRequestBody(const CommIoCbParams &io)
 
     if (io.size > 0) {
         fd_bytes(io.fd, io.size, FD_WRITE);
-        kb_incr(&statCounter.server.all.kbytes_out, io.size);
+        kb_incr(&(statCounter.server.all.kbytes_out), io.size);
         // kids should increment their counters
     }
 
@@ -384,9 +377,9 @@ ServerStateData::sentRequestBody(const CommIoCbParams &io)
     }
 
     if (io.flag) {
-        debugs(11, 1, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(io.xerrno));
+        debugs(11, DBG_IMPORTANT, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(io.xerrno));
         ErrorState *err;
-        err = errorCon(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request);
+        err = new ErrorState(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request);
         err->xerrno = io.xerrno;
         fwd->fail(err);
         abortTransaction("I/O error while sending request body");
@@ -710,22 +703,63 @@ ServerStateData::handleAdaptedHeader(HttpMsg *msg)
     }
 }
 
-// more adapted response body is available
 void
-ServerStateData::handleMoreAdaptedBodyAvailable()
+ServerStateData::resumeBodyStorage()
 {
-    const size_t contentSize = adaptedBodySource->buf().contentSize();
+    if (abortOnBadEntry("store entry aborted while kick producer callback"))
+        return;
 
-    debugs(11,5, HERE << "consuming " << contentSize << " bytes of adapted " <<
-           "response body at offset " << adaptedBodySource->consumedSize());
+    if (!adaptedBodySource)
+        return;
+
+    handleMoreAdaptedBodyAvailable();
+
+    if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+        endAdaptedBodyConsumption();
+}
 
+// more adapted response body is available
+void
+ServerStateData::handleMoreAdaptedBodyAvailable()
+{
     if (abortOnBadEntry("entry refuses adapted body"))
         return;
 
     assert(entry);
+
+    size_t contentSize = adaptedBodySource->buf().contentSize();
+
+    if (!contentSize)
+        return; // XXX: bytesWanted asserts on zero-size ranges
+
+    const size_t spaceAvailable = entry->bytesWanted(Range<size_t>(0, contentSize), true);
+
+    if (spaceAvailable < contentSize ) {
+        // No or partial body data consuming
+        typedef NullaryMemFunT<ServerStateData> Dialer;
+        AsyncCall::Pointer call = asyncCall(93, 5, "ServerStateData::resumeBodyStorage",
+                                            Dialer(this, &ServerStateData::resumeBodyStorage));
+        entry->deferProducer(call);
+    }
+
+    if (!spaceAvailable)  {
+        debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " <<
+               "response body at offset " << adaptedBodySource->consumedSize());
+        return;
+    }
+
+    if (spaceAvailable < contentSize ) {
+        debugs(11, 5, HERE << "postponing storage of " <<
+               (contentSize - spaceAvailable) << " body bytes");
+        contentSize = spaceAvailable;
+    }
+
+    debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " <<
+           "response body at offset " << adaptedBodySource->consumedSize());
+
     BodyPipeCheckout bpc(*adaptedBodySource);
-    const StoreIOBuffer ioBuf(&bpc.buf, currentOffset);
-    currentOffset += bpc.buf.size;
+    const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize);
+    currentOffset += ioBuf.length;
     entry->write(ioBuf);
     bpc.buf.consume(contentSize);
     bpc.checkIn();
@@ -735,11 +769,19 @@ ServerStateData::handleMoreAdaptedBodyAvailable()
 void
 ServerStateData::handleAdaptedBodyProductionEnded()
 {
-    stopConsumingFrom(adaptedBodySource);
-
     if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
         return;
 
+    // end consumption if we consumed everything
+    if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+        endAdaptedBodyConsumption();
+    // else resumeBodyStorage() will eventually consume the rest
+}
+
+void
+ServerStateData::endAdaptedBodyConsumption()
+{
+    stopConsumingFrom(adaptedBodySource);
     handleAdaptationCompleted();
 }
 
@@ -766,10 +808,8 @@ ServerStateData::handleAdaptationCompleted()
     }
 
     completeForwarding();
-    quitIfAllDone();
 }
 
-
 // common part of noteAdaptation*Aborted and noteBodyConsumerAborted methods
 void
 ServerStateData::handleAdaptationAborted(bool bypassable)
@@ -784,8 +824,8 @@ ServerStateData::handleAdaptationAborted(bool bypassable)
 
     if (entry->isEmpty()) {
         debugs(11,9, HERE << "creating ICAP error entry after ICAP failure");
-        ErrorState *err = errorCon(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR, request);
-        err->xerrno = ERR_DETAIL_ICAP_RESPMOD_EARLY;
+        ErrorState *err = new ErrorState(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR, request);
+        err->detailError(ERR_DETAIL_ICAP_RESPMOD_EARLY);
         fwd->fail(err);
         fwd->dontRetry(true);
     } else if (request) { // update logged info directly
@@ -818,8 +858,8 @@ ServerStateData::handleAdaptationBlocked(const Adaptation::Answer &answer)
     if (page_id == ERR_NONE)
         page_id = ERR_ACCESS_DENIED;
 
-    ErrorState *err = errorCon(page_id, HTTP_FORBIDDEN, request);
-    err->xerrno = ERR_DETAIL_RESPMOD_BLOCK_EARLY;
+    ErrorState *err = new ErrorState(page_id, HTTP_FORBIDDEN, request);
+    err->detailError(ERR_DETAIL_RESPMOD_BLOCK_EARLY);
     fwd->fail(err);
     fwd->dontRetry(true);
 
@@ -827,7 +867,7 @@ ServerStateData::handleAdaptationBlocked(const Adaptation::Answer &answer)
 }
 
 void
-ServerStateData::adaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
+ServerStateData::noteAdaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
 {
     adaptationAccessCheckPending = false;
 
@@ -852,20 +892,12 @@ ServerStateData::adaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
     startAdaptation(group, originalRequest());
     processReplyBody();
 }
-
-void
-ServerStateData::adaptationAclCheckDoneWrapper(Adaptation::ServiceGroupPointer group, void *data)
-{
-    ServerStateData *state = (ServerStateData *)data;
-    state->adaptationAclCheckDone(group);
-}
 #endif
 
 void
 ServerStateData::sendBodyIsTooLargeError()
 {
-    ErrorState *err = errorCon(ERR_TOO_BIG, HTTP_FORBIDDEN, request);
-    err->xerrno = errno;
+    ErrorState *err = new ErrorState(ERR_TOO_BIG, HTTP_FORBIDDEN, request);
     fwd->fail(err);
     fwd->dontRetry(true);
     abortTransaction("Virgin body too large.");
@@ -881,7 +913,7 @@ ServerStateData::adaptOrFinalizeReply()
     // The callback can be called with a NULL service if adaptation is off.
     adaptationAccessCheckPending = Adaptation::AccessCheck::Start(
                                        Adaptation::methodRespmod, Adaptation::pointPreCache,
-                                       originalRequest(), virginReply(), adaptationAclCheckDoneWrapper, this);
+                                       originalRequest(), virginReply(), this);
     debugs(11,5, HERE << "adaptationAccessCheckPending=" << adaptationAccessCheckPending);
     if (adaptationAccessCheckPending)
         return;