From 0ad2b63bf4586b507e351b853424ac13f81d3404 Mon Sep 17 00:00:00 2001 From: Christos Tsantilas Date: Fri, 18 Nov 2011 18:53:45 +0200 Subject: [PATCH] author: Martin Huter , Alex Rousskov , Christos Tsantilas Bug 2619: Excessive RAM growth due to unlimited adapted body data consumption If the client does not read from the open connection (i.e. the user does not confirm the browsers download-message-box in microsofts IE), squid keeps on reading data from the ICAP server into the store entry, while no more data can be delivered to the client. Thus the store entry in memory is growing and squid may - in worst case - consume memory up to the size of the users download. This patch add API to StoreEntry to call the producer back when released memory/space from the StoreEntry and add code to the ICAP client code to not consume body data comes from the ICAP server when there is not available space in the store entry. --- src/Server.cc | 87 ++++++++++++++++++++++++++++++++------ src/Server.h | 5 +++ src/Store.h | 12 ++++++ src/StoreIOBuffer.h | 7 +++ src/client_side_request.cc | 45 +++++++++++++++----- src/client_side_request.h | 2 + src/store.cc | 21 +++++++++ src/store_client.cc | 9 ++++ 8 files changed, 165 insertions(+), 23 deletions(-) diff --git a/src/Server.cc b/src/Server.cc index 754c49a525..68c500815a 100644 --- a/src/Server.cc +++ b/src/Server.cc @@ -50,6 +50,7 @@ #include "adaptation/AccessCheck.h" #include "adaptation/Answer.h" #include "adaptation/Iterator.h" +#include "base/AsyncCall.h" #endif // implemented in client_side_reply.cc until sides have a common parent @@ -63,7 +64,9 @@ ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateDa adaptationAccessCheckPending(false), startedAdaptation(false), #endif - receivedWholeRequestBody(false) + receivedWholeRequestBody(false), + theVirginReply(NULL), + theFinalReply(NULL) { fwd = theFwdState; entry = fwd->entry; @@ -273,7 +276,8 @@ ServerStateData::noteMoreBodyDataAvailable(BodyPipe::Pointer bp) return; } #endif - handleMoreRequestBodyAvailable(); + if (requestBodySource == bp) + handleMoreRequestBodyAvailable(); } // the entire request or adapted response body was provided, successfully @@ -286,7 +290,8 @@ ServerStateData::noteBodyProductionEnded(BodyPipe::Pointer bp) return; } #endif - handleRequestBodyProductionEnded(); + if (requestBodySource == bp) + handleRequestBodyProductionEnded(); } // premature end of the request or adapted response body production @@ -299,7 +304,8 @@ ServerStateData::noteBodyProducerAborted(BodyPipe::Pointer bp) return; } #endif - handleRequestBodyProducerAborted(); + if (requestBodySource == bp) + handleRequestBodyProducerAborted(); } @@ -698,22 +704,69 @@ ServerStateData::handleAdaptedHeader(HttpMsg *msg) } } -// more adapted response body is available void -ServerStateData::handleMoreAdaptedBodyAvailable() +ServerStateData::resumeBodyStorage() { - const size_t contentSize = adaptedBodySource->buf().contentSize(); + if (abortOnBadEntry("store entry aborted while kick producer callback")) + return; - debugs(11,5, HERE << "consuming " << contentSize << " bytes of adapted " << - "response body at offset " << adaptedBodySource->consumedSize()); + if(!adaptedBodySource) + return; + + handleMoreAdaptedBodyAvailable(); + + if (adaptedBodySource != NULL && adaptedBodySource->exhausted()) + endAdaptedBodyConsumption(); +} +// more adapted response body is available +void +ServerStateData::handleMoreAdaptedBodyAvailable() +{ if (abortOnBadEntry("entry refuses adapted body")) return; assert(entry); + + size_t contentSize = adaptedBodySource->buf().contentSize(); + bool consumedPartially = false; + + if (!contentSize) + return; // XXX: bytesWanted asserts on zero-size ranges + + // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data. + // We have to add 1 to avoid suspending forever. + const size_t bytesWanted = entry->bytesWanted(Range(0, contentSize)); + const size_t spaceAvailable = bytesWanted > 0 ? (bytesWanted + 1) : 0; + + if (spaceAvailable < contentSize ) { + // No or partial body data consuming + typedef NullaryMemFunT Dialer; + AsyncCall::Pointer call = asyncCall(93, 5, "ServerStateData::resumeBodyStorage", + Dialer(this, &ServerStateData::resumeBodyStorage)); + entry->deferProducer(call); + } + + // XXX: bytesWanted API does not allow us to write just one byte! + if (!spaceAvailable && contentSize > 1) { + debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " << + "response body at offset " << adaptedBodySource->consumedSize()); + return; + } + + if (spaceAvailable < contentSize ) { + debugs(11, 5, HERE << "postponing storage of " << + (contentSize - spaceAvailable) << " body bytes"); + contentSize = spaceAvailable; + consumedPartially=true; + } + + debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " << + "response body at offset " << adaptedBodySource->consumedSize()); + BodyPipeCheckout bpc(*adaptedBodySource); - const StoreIOBuffer ioBuf(&bpc.buf, currentOffset); - currentOffset += bpc.buf.size; + const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize); + currentOffset += ioBuf.length; entry->write(ioBuf); bpc.buf.consume(contentSize); bpc.checkIn(); @@ -723,11 +776,19 @@ ServerStateData::handleMoreAdaptedBodyAvailable() void ServerStateData::handleAdaptedBodyProductionEnded() { - stopConsumingFrom(adaptedBodySource); - if (abortOnBadEntry("entry went bad while waiting for adapted body eof")) return; + + // end consumption if we consumed everything + if (adaptedBodySource != NULL && adaptedBodySource->exhausted()) + endAdaptedBodyConsumption(); + // else resumeBodyStorage() will eventually consume the rest +} +void +ServerStateData::endAdaptedBodyConsumption() +{ + stopConsumingFrom(adaptedBodySource); handleAdaptationCompleted(); } diff --git a/src/Server.h b/src/Server.h index e20c9213a2..c3c8c3342d 100644 --- a/src/Server.h +++ b/src/Server.h @@ -147,6 +147,11 @@ protected: void handleAdaptationCompleted(); void handleAdaptationBlocked(const Adaptation::Answer &answer); void handleAdaptationAborted(bool bypassable = false); + + /// called by StoreEntry when it has more buffer space available + void resumeBodyStorage(); + /// called when the entire adapted response body is consumed + void endAdaptedBodyConsumption(); #endif protected: diff --git a/src/Store.h b/src/Store.h index 9621d1a7b8..78481647e5 100644 --- a/src/Store.h +++ b/src/Store.h @@ -201,9 +201,21 @@ public: virtual void lock(); virtual void release(); +#if USE_ADAPTATION + /// call back producer when more buffer space is available + void deferProducer(const AsyncCall::Pointer &producer); + /// calls back producer registered with deferProducer + void kickProducer(); +#endif + private: static MemAllocator *pool; +#if USE_ADAPTATION + /// producer callback registered with deferProducer + AsyncCall::Pointer deferredProducer; +#endif + bool validLength() const; bool hasOneOfEtags(const String &reqETags, const bool allowWeakMatch) const; }; diff --git a/src/StoreIOBuffer.h b/src/StoreIOBuffer.h index eb08cce431..7c7959e161 100644 --- a/src/StoreIOBuffer.h +++ b/src/StoreIOBuffer.h @@ -59,6 +59,13 @@ public: flags.error = 0; } + StoreIOBuffer(MemBuf *aMemBuf, int64_t anOffset, size_t anLength) : + length(anLength), + offset (anOffset), + data(aMemBuf->content()) { + flags.error = 0; + } + Range range() const { return Range(offset, offset + length); } diff --git a/src/client_side_request.cc b/src/client_side_request.cc index c8f93c45b6..96a643fa4a 100644 --- a/src/client_side_request.cc +++ b/src/client_side_request.cc @@ -1680,18 +1680,47 @@ ClientHttpRequest::handleAdaptationBlock(const Adaptation::Answer &answer) AclMatchedName = NULL; } +void +ClientHttpRequest::resumeBodyStorage() +{ + if(!adaptedBodySource) + return; + + noteMoreBodyDataAvailable(adaptedBodySource); +} + void ClientHttpRequest::noteMoreBodyDataAvailable(BodyPipe::Pointer) { assert(request_satisfaction_mode); assert(adaptedBodySource != NULL); - if (const size_t contentSize = adaptedBodySource->buf().contentSize()) { + if (size_t contentSize = adaptedBodySource->buf().contentSize()) { + // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data. + // We have to add 1 to avoid suspending forever. + const size_t bytesWanted = storeEntry()->bytesWanted(Range(0,contentSize)); + const size_t spaceAvailable = bytesWanted > 0 ? (bytesWanted + 1) : 0; + + if (spaceAvailable < contentSize ) { + // No or partial body data consuming + typedef NullaryMemFunT Dialer; + AsyncCall::Pointer call = asyncCall(93, 5, "ClientHttpRequest::resumeBodyStorage", + Dialer(this, &ClientHttpRequest::resumeBodyStorage)); + storeEntry()->deferProducer(call); + } + + // XXX: bytesWanted API does not allow us to write just one byte! + if (!spaceAvailable && contentSize > 1) + return; + + if (spaceAvailable < contentSize ) + contentSize = spaceAvailable; + BodyPipeCheckout bpc(*adaptedBodySource); - const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset); + const StoreIOBuffer ioBuf(&bpc.buf, request_satisfaction_offset, contentSize); storeEntry()->write(ioBuf); - // assume can write everything - request_satisfaction_offset += contentSize; + // assume StoreEntry::write() writes the entire ioBuf + request_satisfaction_offset += ioBuf.length; bpc.buf.consume(contentSize); bpc.checkIn(); } @@ -1705,13 +1734,9 @@ void ClientHttpRequest::noteBodyProductionEnded(BodyPipe::Pointer) { assert(!virginHeadSource); - if (adaptedBodySource != NULL) { // did not end request satisfaction yet - // We do not expect more because noteMoreBodyDataAvailable always - // consumes everything. We do not even have a mechanism to consume - // leftovers after noteMoreBodyDataAvailable notifications seize. - assert(adaptedBodySource->exhausted()); + // should we end request satisfaction now? + if (adaptedBodySource != NULL && adaptedBodySource->exhausted()) endRequestSatisfaction(); - } } void diff --git a/src/client_side_request.h b/src/client_side_request.h index 693e26effa..c50c1e5c45 100644 --- a/src/client_side_request.h +++ b/src/client_side_request.h @@ -188,6 +188,8 @@ private: virtual void noteBodyProducerAborted(BodyPipe::Pointer); void endRequestSatisfaction(); + /// called by StoreEntry when it has more buffer space available + void resumeBodyStorage(); private: CbcPointer virginHeadSource; diff --git a/src/store.cc b/src/store.cc index 3be12fcfa4..cd1f684f02 100644 --- a/src/store.cc +++ b/src/store.cc @@ -410,6 +410,27 @@ StoreEntry::~StoreEntry() delete hidden_mem_obj; } +#if USE_ADAPTATION +void +StoreEntry::deferProducer(const AsyncCall::Pointer &producer) +{ + if (!deferredProducer) + deferredProducer = producer; + else + debugs(20, 5, HERE << "Deferred producer call is allready set to: " << + *deferredProducer << ", requested call: " << *producer); +} + +void +StoreEntry::kickProducer() +{ + if(deferredProducer != NULL){ + ScheduleCallHere(deferredProducer); + deferredProducer = NULL; + } +} +#endif + void StoreEntry::destroyMemObject() { diff --git a/src/store_client.cc b/src/store_client.cc index f0294bd58b..55ed6a2dfe 100644 --- a/src/store_client.cc +++ b/src/store_client.cc @@ -262,6 +262,11 @@ store_client::copy(StoreEntry * anEntry, copying = false; storeClientCopy2(entry, this); + +#if USE_ADAPTATION + if (entry) + entry->kickProducer(); +#endif } /* @@ -726,6 +731,10 @@ storeUnregister(store_client * sc, StoreEntry * e, void *data) else mem->kickReads(); +#if USE_ADAPTATION + e->kickProducer(); +#endif + return 1; } -- 2.39.5