]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Bug 2619: Excessive RAM growth due to unlimited adapted body data consumption
authorMartin Huter <mhuter@barracuda.com>
Fri, 2 Dec 2011 12:17:07 +0000 (05:17 -0700)
committerAmos Jeffries <squid3@treenet.co.nz>
Fri, 2 Dec 2011 12:17:07 +0000 (05:17 -0700)
If the client does not read from the open connection (i.e. the user does not
confirm the browsers download-message-box in microsofts IE), squid keeps on
reading data from the ICAP server into the store entry, while no more data
can be delivered to the client.
Thus the store entry in memory is growing and squid may - in worst case -
consume memory up to the size of the users download.

This patch add API to StoreEntry to call the producer back when released
memory/space from the StoreEntry and add code to the ICAP client code to not
consume body data comes from the ICAP server when there is not available space
in the store entry.

src/Server.cc
src/Server.h
src/Store.h
src/StoreIOBuffer.h
src/client_side_request.cc
src/client_side_request.h
src/store.cc
src/store_client.cc
test-suite/mem_hdr_test.cc

index 8cd2678f07db59d1f99e4722c8df60a2bf6df536..92c02ba5f8ad7b9676830bb21960c3034cd022e9 100644 (file)
@@ -45,6 +45,7 @@
 #if USE_ADAPTATION
 #include "adaptation/AccessCheck.h"
 #include "adaptation/Iterator.h"
+#include "base/AsyncCall.h"
 #endif
 
 // implemented in client_side_reply.cc until sides have a common parent
@@ -57,6 +58,8 @@ ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateDa
         , adaptationAccessCheckPending(false)
         , startedAdaptation(false)
 #endif
+        ,theVirginReply(NULL),
+        theFinalReply(NULL)
 {
     fwd = theFwdState;
     entry = fwd->entry;
@@ -276,7 +279,8 @@ ServerStateData::noteMoreBodyDataAvailable(BodyPipe::Pointer bp)
         return;
     }
 #endif
-    handleMoreRequestBodyAvailable();
+    if (requestBodySource == bp)
+        handleMoreRequestBodyAvailable();
 }
 
 // the entire request or adapted response body was provided, successfully
@@ -289,7 +293,8 @@ ServerStateData::noteBodyProductionEnded(BodyPipe::Pointer bp)
         return;
     }
 #endif
-    handleRequestBodyProductionEnded();
+    if (requestBodySource == bp)
+        handleRequestBodyProductionEnded();
 }
 
 // premature end of the request or adapted response body production
@@ -302,7 +307,8 @@ ServerStateData::noteBodyProducerAborted(BodyPipe::Pointer bp)
         return;
     }
 #endif
-    handleRequestBodyProducerAborted();
+    if (requestBodySource == bp)
+        handleRequestBodyProducerAborted();
 }
 
 
@@ -686,22 +692,67 @@ ServerStateData::noteAdaptationQueryAbort(bool final)
     handleAdaptationAborted(!final);
 }
 
-// more adapted response body is available
 void
-ServerStateData::handleMoreAdaptedBodyAvailable()
+ServerStateData::resumeBodyStorage()
 {
-    const size_t contentSize = adaptedBodySource->buf().contentSize();
+    if (abortOnBadEntry("store entry aborted while kick producer callback"))
+        return;
 
-    debugs(11,5, HERE << "consuming " << contentSize << " bytes of adapted " <<
-           "response body at offset " << adaptedBodySource->consumedSize());
+    if (!adaptedBodySource)
+        return;
+
+    handleMoreAdaptedBodyAvailable();
+
+    if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+        endAdaptedBodyConsumption();
+}
 
+// more adapted response body is available
+void
+ServerStateData::handleMoreAdaptedBodyAvailable()
+{
     if (abortOnBadEntry("entry refuses adapted body"))
         return;
 
     assert(entry);
+
+    size_t contentSize = adaptedBodySource->buf().contentSize();
+
+    if (!contentSize)
+        return; // XXX: bytesWanted asserts on zero-size ranges
+
+    // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data.
+    // We have to add 1 to avoid suspending forever.
+    const size_t bytesWanted = entry->bytesWanted(Range<size_t>(0, contentSize));
+    const size_t spaceAvailable = bytesWanted >  0 ? (bytesWanted + 1) : 0;
+
+    if (spaceAvailable < contentSize ) {
+        // No or partial body data consuming
+        typedef NullaryMemFunT<ServerStateData> Dialer;
+        AsyncCall::Pointer call = asyncCall(93, 5, "ServerStateData::resumeBodyStorage",
+                                            Dialer(this, &ServerStateData::resumeBodyStorage));
+        entry->deferProducer(call);
+    }
+
+    // XXX: bytesWanted API does not allow us to write just one byte!
+    if (!spaceAvailable && contentSize > 1)  {
+        debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " <<
+               "response body at offset " << adaptedBodySource->consumedSize());
+        return;
+    }
+
+    if (spaceAvailable < contentSize ) {
+        debugs(11, 5, HERE << "postponing storage of " <<
+               (contentSize - spaceAvailable) << " body bytes");
+        contentSize = spaceAvailable;
+    }
+
+    debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " <<
+           "response body at offset " << adaptedBodySource->consumedSize());
+
     BodyPipeCheckout bpc(*adaptedBodySource);
-    const StoreIOBuffer ioBuf(&bpc.buf, currentOffset);
-    currentOffset += bpc.buf.size;
+    const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize);
+    currentOffset += ioBuf.length;
     entry->write(ioBuf);
     bpc.buf.consume(contentSize);
     bpc.checkIn();
@@ -711,11 +762,19 @@ ServerStateData::handleMoreAdaptedBodyAvailable()
 void
 ServerStateData::handleAdaptedBodyProductionEnded()
 {
-    stopConsumingFrom(adaptedBodySource);
-
     if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
         return;
 
+    // end consumption if we consumed everything
+    if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+        endAdaptedBodyConsumption();
+    // else resumeBodyStorage() will eventually consume the rest
+}
+
+void
+ServerStateData::endAdaptedBodyConsumption()
+{
+    stopConsumingFrom(adaptedBodySource);
     handleAdaptationCompleted();
 }
 
index d49e770784c836b56fa21db5f8ae0cd9ddcb8bdc..26a1b546ca0fb6717fa41687a2ffe21a41f629ab 100644 (file)
@@ -155,6 +155,11 @@ protected:
 
     void handleAdaptationCompleted();
     void handleAdaptationAborted(bool bypassable = false);
+
+    /// called by StoreEntry when it has more buffer space available
+    void resumeBodyStorage();
+    /// called when the entire adapted response body is consumed
+    void endAdaptedBodyConsumption();
 #endif
 
 protected:
index 516745ab36b228136026eaf1636f4e00a6430805..7f6a1d9a3b448babc1d6cf92b07ae54a1a1c335f 100644 (file)
@@ -189,9 +189,21 @@ public:
     virtual void lock();
     virtual void release();
 
+#if USE_ADAPTATION
+    /// call back producer when more buffer space is available
+    void deferProducer(const AsyncCall::Pointer &producer);
+    /// calls back producer registered with deferProducer
+    void kickProducer();
+#endif
+
 private:
     static MemAllocator *pool;
 
+#if USE_ADAPTATION
+    /// producer callback registered with deferProducer
+    AsyncCall::Pointer deferredProducer;
+#endif
+
     bool validLength() const;
     bool hasOneOfEtags(const String &reqETags, const bool allowWeakMatch) const;
 };
index d5267898350058b7e58b36cb00c6b05a1c87b579..ba46ed0b0c2ca4a54ea929c5c439806cc789dab2 100644 (file)
@@ -59,6 +59,13 @@ public:
         flags.error = 0;
     }
 
+    StoreIOBuffer(MemBuf *aMemBuf, int64_t anOffset, size_t anLength) :
+            length(anLength),
+            offset (anOffset),
+            data(aMemBuf->content()) {
+        flags.error = 0;
+    }
+
     Range<int64_t> range() const {
         return Range<int64_t>(offset, offset + length);
     }
index a690743f66e7a9cc73bf94e78804f06cdbaaa514..6fdcde9b6b7a26ee300bc907370a16d184b39951 100644 (file)
@@ -1472,18 +1472,47 @@ ClientHttpRequest::noteAdaptationQueryAbort(bool final)
     handleAdaptationFailure(!final);
 }
 
+void
+ClientHttpRequest::resumeBodyStorage()
+{
+    if (!adaptedBodySource)
+        return;
+
+    noteMoreBodyDataAvailable(adaptedBodySource);
+}
+
 void
 ClientHttpRequest::noteMoreBodyDataAvailable(BodyPipe::Pointer)
 {
     assert(request_satisfaction_mode);
     assert(adaptedBodySource != NULL);
 
-    if (const size_t contentSize = adaptedBodySource->buf().contentSize()) {
+    if (size_t contentSize = adaptedBodySource->buf().contentSize()) {
+        // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data.
+        // We have to add 1 to avoid suspending forever.
+        const size_t bytesWanted = storeEntry()->bytesWanted(Range<size_t>(0,contentSize));
+        const size_t spaceAvailable = bytesWanted >  0 ? (bytesWanted + 1) : 0;
+
+        if (spaceAvailable < contentSize ) {
+            // No or partial body data consuming
+            typedef NullaryMemFunT<ClientHttpRequest> Dialer;
+            AsyncCall::Pointer call = asyncCall(93, 5, "ClientHttpRequest::resumeBodyStorage",
+                                                Dialer(this, &ClientHttpRequest::resumeBodyStorage));
+            storeEntry()->deferProducer(call);
+        }
+
+        // XXX: bytesWanted API does not allow us to write just one byte!
+        if (!spaceAvailable && contentSize > 1)
+            return;
+
+        if (spaceAvailable < contentSize )
+            contentSize = spaceAvailable;
+
         BodyPipeCheckout bpc(*adaptedBodySource);
-        const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset);
+        const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset, contentSize);
         storeEntry()->write(ioBuf);
-        // assume can write everything
-        request_satisfaction_offset += contentSize;
+        // assume StoreEntry::write() writes the entire ioBuf
+        request_satisfaction_offset += ioBuf.length;
         bpc.buf.consume(contentSize);
         bpc.checkIn();
     }
@@ -1497,13 +1526,9 @@ void
 ClientHttpRequest::noteBodyProductionEnded(BodyPipe::Pointer)
 {
     assert(!virginHeadSource);
-    if (adaptedBodySource != NULL) { // did not end request satisfaction yet
-        // We do not expect more because noteMoreBodyDataAvailable always
-        // consumes everything. We do not even have a mechanism to consume
-        // leftovers after noteMoreBodyDataAvailable notifications seize.
-        assert(adaptedBodySource->exhausted());
+    // should we end request satisfaction now?
+    if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
         endRequestSatisfaction();
-    }
 }
 
 void
index 601e700c95cf7d11c1322289c5db42469ef1c7de..5efc1fac1fbe811d9c1540c03e7723d1d97152a3 100644 (file)
@@ -180,6 +180,8 @@ private:
     virtual void noteBodyProducerAborted(BodyPipe::Pointer);
 
     void endRequestSatisfaction();
+    /// called by StoreEntry when it has more buffer space available
+    void resumeBodyStorage();
 
 private:
     CbcPointer<Adaptation::Initiate> virginHeadSource;
index c1c2b7b232f8163171d11f0d65b41dedbc9c89b0..d24081e1e731a3d5ec577c9c6e3b801a0e954985 100644 (file)
@@ -366,6 +366,27 @@ StoreEntry::StoreEntry(const char *aUrl, const char *aLogUrl)
     swap_dirn = -1;
 }
 
+#if USE_ADAPTATION
+void
+StoreEntry::deferProducer(const AsyncCall::Pointer &producer)
+{
+    if (!deferredProducer)
+        deferredProducer = producer;
+    else
+        debugs(20, 5, HERE << "Deferred producer call is allready set to: " <<
+               *deferredProducer << ", requested call: " << *producer);
+}
+
+void
+StoreEntry::kickProducer()
+{
+    if (deferredProducer != NULL) {
+        ScheduleCallHere(deferredProducer);
+        deferredProducer = NULL;
+    }
+}
+#endif
+
 void
 StoreEntry::destroyMemObject()
 {
index 4b24ae651f5c405d4904195fbcc5d8153a320a40..2057112620db320f4dc28f6345447043468068e2 100644 (file)
@@ -262,6 +262,11 @@ store_client::copy(StoreEntry * anEntry,
     copying = false;
 
     storeClientCopy2(entry, this);
+
+#if USE_ADAPTATION
+    if (entry)
+        entry->kickProducer();
+#endif
 }
 
 /*
@@ -694,6 +699,10 @@ storeUnregister(store_client * sc, StoreEntry * e, void *data)
     else
         mem->kickReads();
 
+#if USE_ADAPTATION
+    e->kickProducer();
+#endif
+
     return 1;
 }
 
index 98b8a7f6f70e44f1fad9e01b4a6981d66c48fa56..80c5738b4a44695fd327c174b50ed586b1a6f0ae 100644 (file)
@@ -50,7 +50,7 @@ testLowAndHigh()
     assert (aHeader.lowestOffset() == 0);
     assert (aHeader.write (StoreIOBuffer()));
     assert (aHeader.lowestOffset() == 0);
-    assert (aHeader.write (StoreIOBuffer(0, 1, NULL)));
+    assert (aHeader.write (StoreIOBuffer(0, 1, (char *)NULL)));
     assert (aHeader.lowestOffset() == 0);
     char * sampleData = xstrdup ("A");
     assert (aHeader.write (StoreIOBuffer(1, 100, sampleData)));