From: Martin Huter Date: Fri, 2 Dec 2011 12:17:07 +0000 (-0700) Subject: Bug 2619: Excessive RAM growth due to unlimited adapted body data consumption X-Git-Tag: SQUID_3_1_17~9 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=516added00821547e9426da2a4e09d677379c2d1;p=thirdparty%2Fsquid.git Bug 2619: Excessive RAM growth due to unlimited adapted body data consumption If the client does not read from the open connection (i.e. the user does not confirm the browsers download-message-box in microsofts IE), squid keeps on reading data from the ICAP server into the store entry, while no more data can be delivered to the client. Thus the store entry in memory is growing and squid may - in worst case - consume memory up to the size of the users download. This patch add API to StoreEntry to call the producer back when released memory/space from the StoreEntry and add code to the ICAP client code to not consume body data comes from the ICAP server when there is not available space in the store entry. --- diff --git a/src/Server.cc b/src/Server.cc index 8cd2678f07..92c02ba5f8 100644 --- a/src/Server.cc +++ b/src/Server.cc @@ -45,6 +45,7 @@ #if USE_ADAPTATION #include "adaptation/AccessCheck.h" #include "adaptation/Iterator.h" +#include "base/AsyncCall.h" #endif // implemented in client_side_reply.cc until sides have a common parent @@ -57,6 +58,8 @@ ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateDa , adaptationAccessCheckPending(false) , startedAdaptation(false) #endif + ,theVirginReply(NULL), + theFinalReply(NULL) { fwd = theFwdState; entry = fwd->entry; @@ -276,7 +279,8 @@ ServerStateData::noteMoreBodyDataAvailable(BodyPipe::Pointer bp) return; } #endif - handleMoreRequestBodyAvailable(); + if (requestBodySource == bp) + handleMoreRequestBodyAvailable(); } // the entire request or adapted response body was provided, successfully @@ -289,7 +293,8 @@ ServerStateData::noteBodyProductionEnded(BodyPipe::Pointer bp) return; } #endif - handleRequestBodyProductionEnded(); + if (requestBodySource == bp) + handleRequestBodyProductionEnded(); } // premature end of the request or adapted response body production @@ -302,7 +307,8 @@ ServerStateData::noteBodyProducerAborted(BodyPipe::Pointer bp) return; } #endif - handleRequestBodyProducerAborted(); + if (requestBodySource == bp) + handleRequestBodyProducerAborted(); } @@ -686,22 +692,67 @@ ServerStateData::noteAdaptationQueryAbort(bool final) handleAdaptationAborted(!final); } -// more adapted response body is available void -ServerStateData::handleMoreAdaptedBodyAvailable() +ServerStateData::resumeBodyStorage() { - const size_t contentSize = adaptedBodySource->buf().contentSize(); + if (abortOnBadEntry("store entry aborted while kick producer callback")) + return; - debugs(11,5, HERE << "consuming " << contentSize << " bytes of adapted " << - "response body at offset " << adaptedBodySource->consumedSize()); + if (!adaptedBodySource) + return; + + handleMoreAdaptedBodyAvailable(); + + if (adaptedBodySource != NULL && adaptedBodySource->exhausted()) + endAdaptedBodyConsumption(); +} +// more adapted response body is available +void +ServerStateData::handleMoreAdaptedBodyAvailable() +{ if (abortOnBadEntry("entry refuses adapted body")) return; assert(entry); + + size_t contentSize = adaptedBodySource->buf().contentSize(); + + if (!contentSize) + return; // XXX: bytesWanted asserts on zero-size ranges + + // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data. + // We have to add 1 to avoid suspending forever. + const size_t bytesWanted = entry->bytesWanted(Range(0, contentSize)); + const size_t spaceAvailable = bytesWanted > 0 ? (bytesWanted + 1) : 0; + + if (spaceAvailable < contentSize ) { + // No or partial body data consuming + typedef NullaryMemFunT Dialer; + AsyncCall::Pointer call = asyncCall(93, 5, "ServerStateData::resumeBodyStorage", + Dialer(this, &ServerStateData::resumeBodyStorage)); + entry->deferProducer(call); + } + + // XXX: bytesWanted API does not allow us to write just one byte! + if (!spaceAvailable && contentSize > 1) { + debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " << + "response body at offset " << adaptedBodySource->consumedSize()); + return; + } + + if (spaceAvailable < contentSize ) { + debugs(11, 5, HERE << "postponing storage of " << + (contentSize - spaceAvailable) << " body bytes"); + contentSize = spaceAvailable; + } + + debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " << + "response body at offset " << adaptedBodySource->consumedSize()); + BodyPipeCheckout bpc(*adaptedBodySource); - const StoreIOBuffer ioBuf(&bpc.buf, currentOffset); - currentOffset += bpc.buf.size; + const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize); + currentOffset += ioBuf.length; entry->write(ioBuf); bpc.buf.consume(contentSize); bpc.checkIn(); @@ -711,11 +762,19 @@ ServerStateData::handleMoreAdaptedBodyAvailable() void ServerStateData::handleAdaptedBodyProductionEnded() { - stopConsumingFrom(adaptedBodySource); - if (abortOnBadEntry("entry went bad while waiting for adapted body eof")) return; + // end consumption if we consumed everything + if (adaptedBodySource != NULL && adaptedBodySource->exhausted()) + endAdaptedBodyConsumption(); + // else resumeBodyStorage() will eventually consume the rest +} + +void +ServerStateData::endAdaptedBodyConsumption() +{ + stopConsumingFrom(adaptedBodySource); handleAdaptationCompleted(); } diff --git a/src/Server.h b/src/Server.h index d49e770784..26a1b546ca 100644 --- a/src/Server.h +++ b/src/Server.h @@ -155,6 +155,11 @@ protected: void handleAdaptationCompleted(); void handleAdaptationAborted(bool bypassable = false); + + /// called by StoreEntry when it has more buffer space available + void resumeBodyStorage(); + /// called when the entire adapted response body is consumed + void endAdaptedBodyConsumption(); #endif protected: diff --git a/src/Store.h b/src/Store.h index 516745ab36..7f6a1d9a3b 100644 --- a/src/Store.h +++ b/src/Store.h @@ -189,9 +189,21 @@ public: virtual void lock(); virtual void release(); +#if USE_ADAPTATION + /// call back producer when more buffer space is available + void deferProducer(const AsyncCall::Pointer &producer); + /// calls back producer registered with deferProducer + void kickProducer(); +#endif + private: static MemAllocator *pool; +#if USE_ADAPTATION + /// producer callback registered with deferProducer + AsyncCall::Pointer deferredProducer; +#endif + bool validLength() const; bool hasOneOfEtags(const String &reqETags, const bool allowWeakMatch) const; }; diff --git a/src/StoreIOBuffer.h b/src/StoreIOBuffer.h index d526789835..ba46ed0b0c 100644 --- a/src/StoreIOBuffer.h +++ b/src/StoreIOBuffer.h @@ -59,6 +59,13 @@ public: flags.error = 0; } + StoreIOBuffer(MemBuf *aMemBuf, int64_t anOffset, size_t anLength) : + length(anLength), + offset (anOffset), + data(aMemBuf->content()) { + flags.error = 0; + } + Range range() const { return Range(offset, offset + length); } diff --git a/src/client_side_request.cc b/src/client_side_request.cc index a690743f66..6fdcde9b6b 100644 --- a/src/client_side_request.cc +++ b/src/client_side_request.cc @@ -1472,18 +1472,47 @@ ClientHttpRequest::noteAdaptationQueryAbort(bool final) handleAdaptationFailure(!final); } +void +ClientHttpRequest::resumeBodyStorage() +{ + if (!adaptedBodySource) + return; + + noteMoreBodyDataAvailable(adaptedBodySource); +} + void ClientHttpRequest::noteMoreBodyDataAvailable(BodyPipe::Pointer) { assert(request_satisfaction_mode); assert(adaptedBodySource != NULL); - if (const size_t contentSize = adaptedBodySource->buf().contentSize()) { + if (size_t contentSize = adaptedBodySource->buf().contentSize()) { + // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data. + // We have to add 1 to avoid suspending forever. + const size_t bytesWanted = storeEntry()->bytesWanted(Range(0,contentSize)); + const size_t spaceAvailable = bytesWanted > 0 ? (bytesWanted + 1) : 0; + + if (spaceAvailable < contentSize ) { + // No or partial body data consuming + typedef NullaryMemFunT Dialer; + AsyncCall::Pointer call = asyncCall(93, 5, "ClientHttpRequest::resumeBodyStorage", + Dialer(this, &ClientHttpRequest::resumeBodyStorage)); + storeEntry()->deferProducer(call); + } + + // XXX: bytesWanted API does not allow us to write just one byte! + if (!spaceAvailable && contentSize > 1) + return; + + if (spaceAvailable < contentSize ) + contentSize = spaceAvailable; + BodyPipeCheckout bpc(*adaptedBodySource); - const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset); + const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset, contentSize); storeEntry()->write(ioBuf); - // assume can write everything - request_satisfaction_offset += contentSize; + // assume StoreEntry::write() writes the entire ioBuf + request_satisfaction_offset += ioBuf.length; bpc.buf.consume(contentSize); bpc.checkIn(); } @@ -1497,13 +1526,9 @@ void ClientHttpRequest::noteBodyProductionEnded(BodyPipe::Pointer) { assert(!virginHeadSource); - if (adaptedBodySource != NULL) { // did not end request satisfaction yet - // We do not expect more because noteMoreBodyDataAvailable always - // consumes everything. We do not even have a mechanism to consume - // leftovers after noteMoreBodyDataAvailable notifications seize. - assert(adaptedBodySource->exhausted()); + // should we end request satisfaction now? + if (adaptedBodySource != NULL && adaptedBodySource->exhausted()) endRequestSatisfaction(); - } } void diff --git a/src/client_side_request.h b/src/client_side_request.h index 601e700c95..5efc1fac1f 100644 --- a/src/client_side_request.h +++ b/src/client_side_request.h @@ -180,6 +180,8 @@ private: virtual void noteBodyProducerAborted(BodyPipe::Pointer); void endRequestSatisfaction(); + /// called by StoreEntry when it has more buffer space available + void resumeBodyStorage(); private: CbcPointer virginHeadSource; diff --git a/src/store.cc b/src/store.cc index c1c2b7b232..d24081e1e7 100644 --- a/src/store.cc +++ b/src/store.cc @@ -366,6 +366,27 @@ StoreEntry::StoreEntry(const char *aUrl, const char *aLogUrl) swap_dirn = -1; } +#if USE_ADAPTATION +void +StoreEntry::deferProducer(const AsyncCall::Pointer &producer) +{ + if (!deferredProducer) + deferredProducer = producer; + else + debugs(20, 5, HERE << "Deferred producer call is allready set to: " << + *deferredProducer << ", requested call: " << *producer); +} + +void +StoreEntry::kickProducer() +{ + if (deferredProducer != NULL) { + ScheduleCallHere(deferredProducer); + deferredProducer = NULL; + } +} +#endif + void StoreEntry::destroyMemObject() { diff --git a/src/store_client.cc b/src/store_client.cc index 4b24ae651f..2057112620 100644 --- a/src/store_client.cc +++ b/src/store_client.cc @@ -262,6 +262,11 @@ store_client::copy(StoreEntry * anEntry, copying = false; storeClientCopy2(entry, this); + +#if USE_ADAPTATION + if (entry) + entry->kickProducer(); +#endif } /* @@ -694,6 +699,10 @@ storeUnregister(store_client * sc, StoreEntry * e, void *data) else mem->kickReads(); +#if USE_ADAPTATION + e->kickProducer(); +#endif + return 1; } diff --git a/test-suite/mem_hdr_test.cc b/test-suite/mem_hdr_test.cc index 98b8a7f6f7..80c5738b4a 100644 --- a/test-suite/mem_hdr_test.cc +++ b/test-suite/mem_hdr_test.cc @@ -50,7 +50,7 @@ testLowAndHigh() assert (aHeader.lowestOffset() == 0); assert (aHeader.write (StoreIOBuffer())); assert (aHeader.lowestOffset() == 0); - assert (aHeader.write (StoreIOBuffer(0, 1, NULL))); + assert (aHeader.write (StoreIOBuffer(0, 1, (char *)NULL))); assert (aHeader.lowestOffset() == 0); char * sampleData = xstrdup ("A"); assert (aHeader.write (StoreIOBuffer(1, 100, sampleData)));