]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/Server.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / Server.cc
index a1eeee04dcdcdf8e137469eb27dd1d77ffa6fa97..927245b4ae0121248372ae25b6138be649576166 100644 (file)
  */
 
 #include "squid.h"
+#include "acl/Gadgets.h"
 #include "base/TextException.h"
-#include "Server.h"
-#include "Store.h"
-#include "fde.h" /* for fd_table[fd].closing */
-#include "HttpRequest.h"
-#include "HttpReply.h"
+#include "comm/Connection.h"
+#include "comm/forward.h"
+#include "comm/Write.h"
+#include "err_detail_type.h"
 #include "errorpage.h"
+#include "HttpReply.h"
+#include "HttpRequest.h"
+#include "protos.h"
+#include "Server.h"
 #include "SquidTime.h"
+#include "StatCounters.h"
+#include "Store.h"
 
 #if USE_ADAPTATION
 #include "adaptation/AccessCheck.h"
+#include "adaptation/Answer.h"
 #include "adaptation/Iterator.h"
+#include "base/AsyncCall.h"
 #endif
 
 // implemented in client_side_reply.cc until sides have a common parent
 extern void purgeEntriesByUrl(HttpRequest * req, const char *url);
 
-
-ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateData"),requestSender(NULL)
+ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateData"),
+        requestSender(NULL),
 #if USE_ADAPTATION
-        , adaptedHeadSource(NULL)
-        , adaptationAccessCheckPending(false)
-        , startedAdaptation(false)
+        adaptedHeadSource(NULL),
+        adaptationAccessCheckPending(false),
+        startedAdaptation(false),
 #endif
+        receivedWholeRequestBody(false),
+        theVirginReply(NULL),
+        theFinalReply(NULL)
 {
     fwd = theFwdState;
     entry = fwd->entry;
@@ -115,7 +126,6 @@ ServerStateData::swanSong()
 #endif
 }
 
-
 HttpReply *
 ServerStateData::virginReply()
 {
@@ -156,8 +166,10 @@ ServerStateData::setFinalReply(HttpReply *rep)
     assert(rep);
     theFinalReply = HTTPMSGLOCK(rep);
 
-    entry->replaceHttpReply(theFinalReply);
-    haveParsedReplyHeaders();
+    // give entry the reply because haveParsedReplyHeaders() expects it there
+    entry->replaceHttpReply(theFinalReply, false); // but do not write yet
+    haveParsedReplyHeaders(); // update the entry/reply (e.g., set timestamps)
+    entry->startWriting(); // write the updated entry to store
 
     return theFinalReply;
 }
@@ -202,29 +214,17 @@ ServerStateData::serverComplete2()
 #endif
 
     completeForwarding();
-    quitIfAllDone();
 }
 
-// When we are done talking to the primary server, we may be still talking
-// to the ICAP service. And vice versa. Here, we quit only if we are done
-// talking to both.
-void ServerStateData::quitIfAllDone()
+bool ServerStateData::doneAll() const
 {
+    return  doneWithServer() &&
 #if USE_ADAPTATION
-    if (!doneWithAdaptation()) {
-        debugs(11,5, HERE << "transaction not done: still talking to ICAP");
-        return;
-    }
+            doneWithAdaptation() &&
+            Adaptation::Initiator::doneAll() &&
+            BodyProducer::doneAll() &&
 #endif
-
-    if (!doneWithServer()) {
-        debugs(11,5, HERE << "transaction not done: still talking to server");
-        return;
-    }
-
-    debugs(11,3, HERE << "transaction done");
-
-    deleteThis("ServerStateData::quitIfAllDone");
+            BodyConsumer::doneAll();
 }
 
 // FTP side overloads this to work around multiple calls to fwd->complete
@@ -276,7 +276,8 @@ ServerStateData::noteMoreBodyDataAvailable(BodyPipe::Pointer bp)
         return;
     }
 #endif
-    handleMoreRequestBodyAvailable();
+    if (requestBodySource == bp)
+        handleMoreRequestBodyAvailable();
 }
 
 // the entire request or adapted response body was provided, successfully
@@ -289,7 +290,8 @@ ServerStateData::noteBodyProductionEnded(BodyPipe::Pointer bp)
         return;
     }
 #endif
-    handleRequestBodyProductionEnded();
+    if (requestBodySource == bp)
+        handleRequestBodyProductionEnded();
 }
 
 // premature end of the request or adapted response body production
@@ -302,10 +304,10 @@ ServerStateData::noteBodyProducerAborted(BodyPipe::Pointer bp)
         return;
     }
 #endif
-    handleRequestBodyProducerAborted();
+    if (requestBodySource == bp)
+        handleRequestBodyProducerAborted();
 }
 
-
 // more origin request body data is available
 void
 ServerStateData::handleMoreRequestBodyAvailable()
@@ -320,6 +322,7 @@ ServerStateData::handleMoreRequestBodyAvailable()
 void
 ServerStateData::handleRequestBodyProductionEnded()
 {
+    receivedWholeRequestBody = true;
     if (!requestSender)
         doneSendingRequestBody();
     else
@@ -361,7 +364,7 @@ ServerStateData::sentRequestBody(const CommIoCbParams &io)
 
     if (io.size > 0) {
         fd_bytes(io.fd, io.size, FD_WRITE);
-        kb_incr(&statCounter.server.all.kbytes_out, io.size);
+        kb_incr(&(statCounter.server.all.kbytes_out), io.size);
         // kids should increment their counters
     }
 
@@ -374,10 +377,10 @@ ServerStateData::sentRequestBody(const CommIoCbParams &io)
     }
 
     if (io.flag) {
-        debugs(11, 1, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(errno));
+        debugs(11, DBG_IMPORTANT, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(io.xerrno));
         ErrorState *err;
-        err = errorCon(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request);
-        err->xerrno = errno;
+        err = new ErrorState(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request);
+        err->xerrno = io.xerrno;
         fwd->fail(err);
         abortTransaction("I/O error while sending request body");
         return;
@@ -388,16 +391,12 @@ ServerStateData::sentRequestBody(const CommIoCbParams &io)
         return;
     }
 
-    if (requestBodySource->exhausted())
+    if (!requestBodySource->exhausted())
+        sendMoreRequestBody();
+    else if (receivedWholeRequestBody)
         doneSendingRequestBody();
     else
-        sendMoreRequestBody();
-}
-
-bool
-ServerStateData::canSend(int fd) const
-{
-    return fd >= 0 && !fd_table[fd].closing();
+        debugs(9,3, HERE << "waiting for body production end or abort");
 }
 
 void
@@ -406,26 +405,34 @@ ServerStateData::sendMoreRequestBody()
     assert(requestBodySource != NULL);
     assert(!requestSender);
 
-    const int fd = dataDescriptor();
+    const Comm::ConnectionPointer conn = dataConnection();
 
-    if (!canSend(fd)) {
-        debugs(9,3, HERE << "cannot send request body to closing FD " << fd);
+    if (!Comm::IsConnOpen(conn)) {
+        debugs(9,3, HERE << "cannot send request body to closing " << conn);
         return; // wait for the kid's close handler; TODO: assert(closer);
     }
 
     MemBuf buf;
-    if (requestBodySource->getMoreData(buf)) {
+    if (getMoreRequestBody(buf) && buf.contentSize() > 0) {
         debugs(9,3, HERE << "will write " << buf.contentSize() << " request body bytes");
         typedef CommCbMemFunT<ServerStateData, CommIoCbParams> Dialer;
-        requestSender = JobCallback(93,3,
-            Dialer, this, ServerStateData::sentRequestBody);
-        comm_write_mbuf(fd, &buf, requestSender);
+        requestSender = JobCallback(93,3, Dialer, this, ServerStateData::sentRequestBody);
+        Comm::Write(conn, &buf, requestSender);
     } else {
         debugs(9,3, HERE << "will wait for more request body bytes or eof");
         requestSender = NULL;
     }
 }
 
+/// either fill buf with available [encoded] request body bytes or return false
+bool
+ServerStateData::getMoreRequestBody(MemBuf &buf)
+{
+    // default implementation does not encode request body content
+    Must(requestBodySource != NULL);
+    return requestBodySource->getMoreData(buf);
+}
+
 // Compares hosts in urls, returns false if different, no sheme, or no host.
 static bool
 sameUrlHosts(const char *url1, const char *url2)
@@ -508,7 +515,7 @@ ServerStateData::maybePurgeOthers()
     purgeEntriesByHeader(request, reqUrl, theFinalReply, HDR_CONTENT_LOCATION);
 }
 
-// called (usually by kids) when we have final (possibly adapted) reply headers
+/// called when we have final (possibly adapted) reply headers; kids extend
 void
 ServerStateData::haveParsedReplyHeaders()
 {
@@ -544,7 +551,7 @@ ServerStateData::startAdaptation(const Adaptation::ServiceGroupPointer &group, H
     }
 
     adaptedHeadSource = initiateAdaptation(
-        new Adaptation::Iterator(vrep, cause, group));
+                            new Adaptation::Iterator(vrep, cause, group));
     startedAdaptation = initiated(adaptedHeadSource);
     Must(startedAdaptation);
 }
@@ -644,12 +651,39 @@ ServerStateData::noteBodyConsumerAborted(BodyPipe::Pointer)
 
 // received adapted response headers (body may follow)
 void
-ServerStateData::noteAdaptationAnswer(HttpMsg *msg)
+ServerStateData::noteAdaptationAnswer(const Adaptation::Answer &answer)
 {
     clearAdaptation(adaptedHeadSource); // we do not expect more messages
 
-    if (abortOnBadEntry("entry went bad while waiting for adapted headers"))
+    switch (answer.kind) {
+    case Adaptation::Answer::akForward:
+        handleAdaptedHeader(answer.message);
+        break;
+
+    case Adaptation::Answer::akBlock:
+        handleAdaptationBlocked(answer);
+        break;
+
+    case Adaptation::Answer::akError:
+        handleAdaptationAborted(!answer.final);
+        break;
+    }
+}
+
+void
+ServerStateData::handleAdaptedHeader(HttpMsg *msg)
+{
+    if (abortOnBadEntry("entry went bad while waiting for adapted headers")) {
+        // If the adapted response has a body, the ICAP side needs to know
+        // that nobody will consume that body. We will be destroyed upon
+        // return. Tell the ICAP side that it is on its own.
+        HttpReply *rep = dynamic_cast<HttpReply*>(msg);
+        assert(rep);
+        if (rep->body_pipe != NULL)
+            rep->body_pipe->expectNoConsumption();
+
         return;
+    }
 
     HttpReply *rep = dynamic_cast<HttpReply*>(msg);
     assert(rep);
@@ -669,30 +703,63 @@ ServerStateData::noteAdaptationAnswer(HttpMsg *msg)
     }
 }
 
-// will not receive adapted response headers (and, hence, body)
 void
-ServerStateData::noteAdaptationQueryAbort(bool final)
+ServerStateData::resumeBodyStorage()
 {
-    clearAdaptation(adaptedHeadSource);
-    handleAdaptationAborted(!final);
+    if (abortOnBadEntry("store entry aborted while kick producer callback"))
+        return;
+
+    if (!adaptedBodySource)
+        return;
+
+    handleMoreAdaptedBodyAvailable();
+
+    if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+        endAdaptedBodyConsumption();
 }
 
 // more adapted response body is available
 void
 ServerStateData::handleMoreAdaptedBodyAvailable()
 {
-    const size_t contentSize = adaptedBodySource->buf().contentSize();
-
-    debugs(11,5, HERE << "consuming " << contentSize << " bytes of adapted " <<
-           "response body at offset " << adaptedBodySource->consumedSize());
-
     if (abortOnBadEntry("entry refuses adapted body"))
         return;
 
     assert(entry);
+
+    size_t contentSize = adaptedBodySource->buf().contentSize();
+
+    if (!contentSize)
+        return; // XXX: bytesWanted asserts on zero-size ranges
+
+    const size_t spaceAvailable = entry->bytesWanted(Range<size_t>(0, contentSize), true);
+
+    if (spaceAvailable < contentSize ) {
+        // No or partial body data consuming
+        typedef NullaryMemFunT<ServerStateData> Dialer;
+        AsyncCall::Pointer call = asyncCall(93, 5, "ServerStateData::resumeBodyStorage",
+                                            Dialer(this, &ServerStateData::resumeBodyStorage));
+        entry->deferProducer(call);
+    }
+
+    if (!spaceAvailable)  {
+        debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " <<
+               "response body at offset " << adaptedBodySource->consumedSize());
+        return;
+    }
+
+    if (spaceAvailable < contentSize ) {
+        debugs(11, 5, HERE << "postponing storage of " <<
+               (contentSize - spaceAvailable) << " body bytes");
+        contentSize = spaceAvailable;
+    }
+
+    debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " <<
+           "response body at offset " << adaptedBodySource->consumedSize());
+
     BodyPipeCheckout bpc(*adaptedBodySource);
-    const StoreIOBuffer ioBuf(&bpc.buf, currentOffset);
-    currentOffset += bpc.buf.size;
+    const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize);
+    currentOffset += ioBuf.length;
     entry->write(ioBuf);
     bpc.buf.consume(contentSize);
     bpc.checkIn();
@@ -702,11 +769,19 @@ ServerStateData::handleMoreAdaptedBodyAvailable()
 void
 ServerStateData::handleAdaptedBodyProductionEnded()
 {
-    stopConsumingFrom(adaptedBodySource);
-
     if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
         return;
 
+    // end consumption if we consumed everything
+    if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+        endAdaptedBodyConsumption();
+    // else resumeBodyStorage() will eventually consume the rest
+}
+
+void
+ServerStateData::endAdaptedBodyConsumption()
+{
+    stopConsumingFrom(adaptedBodySource);
     handleAdaptationCompleted();
 }
 
@@ -733,10 +808,8 @@ ServerStateData::handleAdaptationCompleted()
     }
 
     completeForwarding();
-    quitIfAllDone();
 }
 
-
 // common part of noteAdaptation*Aborted and noteBodyConsumerAborted methods
 void
 ServerStateData::handleAdaptationAborted(bool bypassable)
@@ -751,17 +824,50 @@ ServerStateData::handleAdaptationAborted(bool bypassable)
 
     if (entry->isEmpty()) {
         debugs(11,9, HERE << "creating ICAP error entry after ICAP failure");
-        ErrorState *err = errorCon(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR, request);
-        err->xerrno = errno;
+        ErrorState *err = new ErrorState(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR, request);
+        err->detailError(ERR_DETAIL_ICAP_RESPMOD_EARLY);
         fwd->fail(err);
         fwd->dontRetry(true);
+    } else if (request) { // update logged info directly
+        request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_ICAP_RESPMOD_LATE);
     }
 
     abortTransaction("ICAP failure");
 }
 
+// adaptation service wants us to deny HTTP client access to this response
 void
-ServerStateData::adaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
+ServerStateData::handleAdaptationBlocked(const Adaptation::Answer &answer)
+{
+    debugs(11,5, HERE << answer.ruleId);
+
+    if (abortOnBadEntry("entry went bad while ICAP aborted"))
+        return;
+
+    if (!entry->isEmpty()) { // too late to block (should not really happen)
+        if (request)
+            request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_RESPMOD_BLOCK_LATE);
+        abortTransaction("late adaptation block");
+        return;
+    }
+
+    debugs(11,7, HERE << "creating adaptation block response");
+
+    err_type page_id =
+        aclGetDenyInfoPage(&Config.denyInfoList, answer.ruleId.termedBuf(), 1);
+    if (page_id == ERR_NONE)
+        page_id = ERR_ACCESS_DENIED;
+
+    ErrorState *err = new ErrorState(page_id, HTTP_FORBIDDEN, request);
+    err->detailError(ERR_DETAIL_RESPMOD_BLOCK_EARLY);
+    fwd->fail(err);
+    fwd->dontRetry(true);
+
+    abortTransaction("timely adaptation block");
+}
+
+void
+ServerStateData::noteAdaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
 {
     adaptationAccessCheckPending = false;
 
@@ -786,20 +892,12 @@ ServerStateData::adaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
     startAdaptation(group, originalRequest());
     processReplyBody();
 }
-
-void
-ServerStateData::adaptationAclCheckDoneWrapper(Adaptation::ServiceGroupPointer group, void *data)
-{
-    ServerStateData *state = (ServerStateData *)data;
-    state->adaptationAclCheckDone(group);
-}
 #endif
 
 void
 ServerStateData::sendBodyIsTooLargeError()
 {
-    ErrorState *err = errorCon(ERR_TOO_BIG, HTTP_FORBIDDEN, request);
-    err->xerrno = errno;
+    ErrorState *err = new ErrorState(ERR_TOO_BIG, HTTP_FORBIDDEN, request);
     fwd->fail(err);
     fwd->dontRetry(true);
     abortTransaction("Virgin body too large.");
@@ -815,7 +913,7 @@ ServerStateData::adaptOrFinalizeReply()
     // The callback can be called with a NULL service if adaptation is off.
     adaptationAccessCheckPending = Adaptation::AccessCheck::Start(
                                        Adaptation::methodRespmod, Adaptation::pointPreCache,
-                                       request, virginReply(), adaptationAclCheckDoneWrapper, this);
+                                       originalRequest(), virginReply(), this);
     debugs(11,5, HERE << "adaptationAccessCheckPending=" << adaptationAccessCheckPending);
     if (adaptationAccessCheckPending)
         return;