]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
author: Martin Huter <mhuter@barracuda.com>, Alex Rousskov <rousskov@measurement...
authorChristos Tsantilas <chtsanti@users.sourceforge.net>
Fri, 18 Nov 2011 16:53:45 +0000 (18:53 +0200)
committerChristos Tsantilas <chtsanti@users.sourceforge.net>
Fri, 18 Nov 2011 16:53:45 +0000 (18:53 +0200)
Bug 2619: Excessive RAM growth due to unlimited adapted body data consumption

If the client does not read from the open connection (i.e. the user does not
confirm the browsers download-message-box in microsofts IE), squid keeps on
reading data from the ICAP server into the store entry, while no more data
can be delivered to the client.
Thus the store entry in memory is growing and squid may - in worst case -
consume memory up to the size of the users download.

This patch add API to StoreEntry to call the producer back when released
memory/space from the StoreEntry and add code to the ICAP client code to not
consume body data comes from the ICAP server when there is not available space
in the store entry.

src/Server.cc
src/Server.h
src/Store.h
src/StoreIOBuffer.h
src/client_side_request.cc
src/client_side_request.h
src/store.cc
src/store_client.cc

index 754c49a525ff75d1113ea904556ce0688fbe8100..68c500815acf5976f4e331a62cb035c1e64bc7c9 100644 (file)
@@ -50,6 +50,7 @@
 #include "adaptation/AccessCheck.h"
 #include "adaptation/Answer.h"
 #include "adaptation/Iterator.h"
+#include "base/AsyncCall.h"
 #endif
 
 // implemented in client_side_reply.cc until sides have a common parent
@@ -63,7 +64,9 @@ ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateDa
         adaptationAccessCheckPending(false),
         startedAdaptation(false),
 #endif
-        receivedWholeRequestBody(false)
+        receivedWholeRequestBody(false),
+        theVirginReply(NULL),
+        theFinalReply(NULL)
 {
     fwd = theFwdState;
     entry = fwd->entry;
@@ -273,7 +276,8 @@ ServerStateData::noteMoreBodyDataAvailable(BodyPipe::Pointer bp)
         return;
     }
 #endif
-    handleMoreRequestBodyAvailable();
+    if (requestBodySource == bp)
+        handleMoreRequestBodyAvailable();
 }
 
 // the entire request or adapted response body was provided, successfully
@@ -286,7 +290,8 @@ ServerStateData::noteBodyProductionEnded(BodyPipe::Pointer bp)
         return;
     }
 #endif
-    handleRequestBodyProductionEnded();
+    if (requestBodySource == bp)
+        handleRequestBodyProductionEnded();
 }
 
 // premature end of the request or adapted response body production
@@ -299,7 +304,8 @@ ServerStateData::noteBodyProducerAborted(BodyPipe::Pointer bp)
         return;
     }
 #endif
-    handleRequestBodyProducerAborted();
+    if (requestBodySource == bp)
+        handleRequestBodyProducerAborted();
 }
 
 
@@ -698,22 +704,69 @@ ServerStateData::handleAdaptedHeader(HttpMsg *msg)
     }
 }
 
-// more adapted response body is available
 void
-ServerStateData::handleMoreAdaptedBodyAvailable()
+ServerStateData::resumeBodyStorage()
 {
-    const size_t contentSize = adaptedBodySource->buf().contentSize();
+    if (abortOnBadEntry("store entry aborted while kick producer callback"))
+        return;
 
-    debugs(11,5, HERE << "consuming " << contentSize << " bytes of adapted " <<
-           "response body at offset " << adaptedBodySource->consumedSize());
+    if(!adaptedBodySource)
+        return;
+
+    handleMoreAdaptedBodyAvailable();
+
+    if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+        endAdaptedBodyConsumption();
+}
 
+// more adapted response body is available
+void
+ServerStateData::handleMoreAdaptedBodyAvailable()
+{
     if (abortOnBadEntry("entry refuses adapted body"))
         return;
 
     assert(entry);
+
+    size_t contentSize = adaptedBodySource->buf().contentSize();
+    bool consumedPartially = false;
+
+    if (!contentSize)
+        return; // XXX: bytesWanted asserts on zero-size ranges
+
+    // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data.
+    // We have to add 1 to avoid suspending forever.
+    const size_t bytesWanted = entry->bytesWanted(Range<size_t>(0, contentSize));
+    const size_t spaceAvailable = bytesWanted >  0 ? (bytesWanted + 1) : 0;
+
+    if (spaceAvailable < contentSize ) { 
+        // No or partial body data consuming
+        typedef NullaryMemFunT<ServerStateData> Dialer;
+        AsyncCall::Pointer call = asyncCall(93, 5, "ServerStateData::resumeBodyStorage",
+                                            Dialer(this, &ServerStateData::resumeBodyStorage));
+        entry->deferProducer(call);
+    }
+
+    // XXX: bytesWanted API does not allow us to write just one byte!
+    if (!spaceAvailable && contentSize > 1)  {
+        debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " <<
+               "response body at offset " << adaptedBodySource->consumedSize());
+        return;
+    }
+    
+    if (spaceAvailable < contentSize ) {
+        debugs(11, 5, HERE << "postponing storage of " <<
+               (contentSize - spaceAvailable) << " body bytes");
+        contentSize = spaceAvailable;
+        consumedPartially=true;
+    }
+    
+    debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " <<
+           "response body at offset " << adaptedBodySource->consumedSize());
+    
     BodyPipeCheckout bpc(*adaptedBodySource);
-    const StoreIOBuffer ioBuf(&bpc.buf, currentOffset);
-    currentOffset += bpc.buf.size;
+    const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize);
+    currentOffset += ioBuf.length;
     entry->write(ioBuf);
     bpc.buf.consume(contentSize);
     bpc.checkIn();
@@ -723,11 +776,19 @@ ServerStateData::handleMoreAdaptedBodyAvailable()
 void
 ServerStateData::handleAdaptedBodyProductionEnded()
 {
-    stopConsumingFrom(adaptedBodySource);
-
     if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
         return;
+     // end consumption if we consumed everything
+    if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
+        endAdaptedBodyConsumption();
+    // else resumeBodyStorage() will eventually consume the rest
+}
 
+void
+ServerStateData::endAdaptedBodyConsumption()
+{
+    stopConsumingFrom(adaptedBodySource);
     handleAdaptationCompleted();
 }
 
index e20c9213a2f2770ec73c700ad706ff8eed3820af..c3c8c3342d50f31400adf718484b9e14a997bea2 100644 (file)
@@ -147,6 +147,11 @@ protected:
     void handleAdaptationCompleted();
     void handleAdaptationBlocked(const Adaptation::Answer &answer);
     void handleAdaptationAborted(bool bypassable = false);
+
+    /// called by StoreEntry when it has more buffer space available
+    void resumeBodyStorage();
+    /// called when the entire adapted response body is consumed
+    void endAdaptedBodyConsumption();
 #endif
 
 protected:
index 9621d1a7b891db144dac9614342175015a212f49..78481647e5b1d1f0cf6857018532b4c356698b02 100644 (file)
@@ -201,9 +201,21 @@ public:
     virtual void lock();
     virtual void release();
 
+#if USE_ADAPTATION
+    /// call back producer when more buffer space is available
+    void deferProducer(const AsyncCall::Pointer &producer);
+    /// calls back producer registered with deferProducer
+    void kickProducer();
+#endif
+
 private:
     static MemAllocator *pool;
 
+#if USE_ADAPTATION
+    /// producer callback registered with deferProducer
+    AsyncCall::Pointer deferredProducer;
+#endif
+
     bool validLength() const;
     bool hasOneOfEtags(const String &reqETags, const bool allowWeakMatch) const;
 };
index eb08cce431f3708787e54903500267e367038d87..7c7959e16105c72a7ad7599285c026622eb15183 100644 (file)
@@ -59,6 +59,13 @@ public:
         flags.error = 0;
     }
 
+    StoreIOBuffer(MemBuf *aMemBuf, int64_t anOffset, size_t anLength) :
+            length(anLength),
+            offset (anOffset),
+            data(aMemBuf->content()) {
+        flags.error = 0;
+    }
+
     Range<int64_t> range() const {
         return Range<int64_t>(offset, offset + length);
     }
index c8f93c45b698288f4ac57c2db8003329011e4c4a..96a643fa4a821fbbedbb639fa75c3df5e391f7da 100644 (file)
@@ -1680,18 +1680,47 @@ ClientHttpRequest::handleAdaptationBlock(const Adaptation::Answer &answer)
     AclMatchedName = NULL;
 }
 
+void
+ClientHttpRequest::resumeBodyStorage()
+{
+    if(!adaptedBodySource)
+        return;
+
+    noteMoreBodyDataAvailable(adaptedBodySource);
+}
+
 void
 ClientHttpRequest::noteMoreBodyDataAvailable(BodyPipe::Pointer)
 {
     assert(request_satisfaction_mode);
     assert(adaptedBodySource != NULL);
 
-    if (const size_t contentSize = adaptedBodySource->buf().contentSize()) {
+    if (size_t contentSize = adaptedBodySource->buf().contentSize()) {
+        // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data.
+        // We have to add 1 to avoid suspending forever.
+        const size_t bytesWanted = storeEntry()->bytesWanted(Range<size_t>(0,contentSize));
+        const size_t spaceAvailable = bytesWanted >  0 ? (bytesWanted + 1) : 0;
+
+        if (spaceAvailable < contentSize ) {
+            // No or partial body data consuming
+            typedef NullaryMemFunT<ClientHttpRequest> Dialer;
+            AsyncCall::Pointer call = asyncCall(93, 5, "ClientHttpRequest::resumeBodyStorage",
+                                                Dialer(this, &ClientHttpRequest::resumeBodyStorage));
+            storeEntry()->deferProducer(call);
+        }
+
+        // XXX: bytesWanted API does not allow us to write just one byte!
+        if (!spaceAvailable && contentSize > 1)
+            return;
+
+        if (spaceAvailable < contentSize )
+            contentSize = spaceAvailable;
+
         BodyPipeCheckout bpc(*adaptedBodySource);
-        const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset);
+        const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset, contentSize);
         storeEntry()->write(ioBuf);
-        // assume can write everything
-        request_satisfaction_offset += contentSize;
+        // assume StoreEntry::write() writes the entire ioBuf
+        request_satisfaction_offset += ioBuf.length;
         bpc.buf.consume(contentSize);
         bpc.checkIn();
     }
@@ -1705,13 +1734,9 @@ void
 ClientHttpRequest::noteBodyProductionEnded(BodyPipe::Pointer)
 {
     assert(!virginHeadSource);
-    if (adaptedBodySource != NULL) { // did not end request satisfaction yet
-        // We do not expect more because noteMoreBodyDataAvailable always
-        // consumes everything. We do not even have a mechanism to consume
-        // leftovers after noteMoreBodyDataAvailable notifications seize.
-        assert(adaptedBodySource->exhausted());
+    // should we end request satisfaction now?
+    if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
         endRequestSatisfaction();
-    }
 }
 
 void
index 693e26effac0429f70d8a650607aa5eb1fadb389..c50c1e5c45ae7ffab1fde07146d14daf806b1614 100644 (file)
@@ -188,6 +188,8 @@ private:
     virtual void noteBodyProducerAborted(BodyPipe::Pointer);
 
     void endRequestSatisfaction();
+    /// called by StoreEntry when it has more buffer space available
+    void resumeBodyStorage();
 
 private:
     CbcPointer<Adaptation::Initiate> virginHeadSource;
index 3be12fcfa4af0d0e1fa780cca7871635290d8e07..cd1f684f02d867e8d11c94d2c0a1f49aa22087d0 100644 (file)
@@ -410,6 +410,27 @@ StoreEntry::~StoreEntry()
     delete hidden_mem_obj;
 }
 
+#if USE_ADAPTATION
+void
+StoreEntry::deferProducer(const AsyncCall::Pointer &producer)
+{
+    if (!deferredProducer)
+        deferredProducer = producer;
+    else
+        debugs(20, 5, HERE << "Deferred producer call is allready set to: " << 
+               *deferredProducer << ", requested call: " << *producer);
+}
+
+void
+StoreEntry::kickProducer()
+{
+    if(deferredProducer != NULL){
+        ScheduleCallHere(deferredProducer);
+        deferredProducer = NULL;
+    }
+}
+#endif
+
 void
 StoreEntry::destroyMemObject()
 {
index f0294bd58b0007c1632a99181a3848a23abb5bf3..55ed6a2dfed75b77b4c29811f5dc2849d35230e8 100644 (file)
@@ -262,6 +262,11 @@ store_client::copy(StoreEntry * anEntry,
     copying = false;
 
     storeClientCopy2(entry, this);
+
+#if USE_ADAPTATION
+    if (entry)
+        entry->kickProducer();
+#endif
 }
 
 /*
@@ -726,6 +731,10 @@ storeUnregister(store_client * sc, StoreEntry * e, void *data)
     else
         mem->kickReads();
 
+#if USE_ADAPTATION
+    e->kickProducer();
+#endif
+
     return 1;
 }