From b599471b7536024b6bf083a78769ae351f27adea Mon Sep 17 00:00:00 2001 From: Alex Rousskov Date: Tue, 8 Jan 2019 15:14:18 +0000 Subject: [PATCH] Fix BodyPipe/Sink memory leaks associated with auto-consumption (#348) Auto-consumption happens (and could probably leak memory) in many cases, but this leak was exposed by an eCAP service that blocked or replaced virgin messages. The BodySink job termination algorithm relies on body production notifications. A BodySink job created after the body production had ended can never stop and, hence, leaks (leaking the associated BodyPipe object with it). Such a job is also useless: If production is over, there is no need to free space for more body data! This change avoids creating such leaking and useless jobs. --- src/BodyPipe.cc | 31 +++++++++++++++++-------------- src/BodyPipe.h | 6 +++--- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/src/BodyPipe.cc b/src/BodyPipe.cc index 0fdf0dc5e0..59477bdc79 100644 --- a/src/BodyPipe.cc +++ b/src/BodyPipe.cc @@ -286,8 +286,7 @@ BodyPipe::expectNoConsumption() abortedConsumption = true; // in case somebody enabled auto-consumption before regular one aborted - if (mustAutoConsume) - startAutoConsumption(); + startAutoConsumptionIfNeeded(); } } @@ -313,23 +312,28 @@ BodyPipe::consume(size_t size) postConsume(size); } -// In the AutoConsumption mode the consumer has gone but the producer continues -// producing data. We are using a BodySink BodyConsumer which just discards the produced data. void BodyPipe::enableAutoConsumption() { mustAutoConsume = true; debugs(91,5, HERE << "enabled auto consumption" << status()); - if (!theConsumer && theBuf.hasContent()) - startAutoConsumption(); + startAutoConsumptionIfNeeded(); } -// start auto consumption by creating body sink +/// Check the current need and, if needed, start auto consumption. In auto +/// consumption mode, the consumer is gone, but the producer continues to +/// produce data. We use a BodySink BodyConsumer to discard that data. void -BodyPipe::startAutoConsumption() +BodyPipe::startAutoConsumptionIfNeeded() { - Must(mustAutoConsume); - Must(!theConsumer); + const auto startNow = + mustAutoConsume && // was enabled + !theConsumer && // has not started yet + theProducer.valid() && // still useful (and will eventually stop) + theBuf.hasContent(); // has something to consume right now + if (!startNow) + return; + theConsumer = new BodySink(this); debugs(91,7, HERE << "starting auto consumption" << status()); scheduleBodyDataNotification(); @@ -393,15 +397,14 @@ BodyPipe::postAppend(size_t size) thePutSize += size; debugs(91,7, HERE << "added " << size << " bytes" << status()); - if (mustAutoConsume && !theConsumer && size > 0) - startAutoConsumption(); + if (!mayNeedMoreData()) + clearProducer(true); // reached end-of-body // We should not consume here even if mustAutoConsume because the // caller may not be ready for the data to be consumed during this call. scheduleBodyDataNotification(); - if (!mayNeedMoreData()) - clearProducer(true); // reached end-of-body + startAutoConsumptionIfNeeded(); } void diff --git a/src/BodyPipe.h b/src/BodyPipe.h index 2bf3220bf6..7061c78a36 100644 --- a/src/BodyPipe.h +++ b/src/BodyPipe.h @@ -131,7 +131,7 @@ public: bool exhausted() const; // saw eof/abort and all data consumed bool stillConsuming(const Consumer::Pointer &consumer) const { return theConsumer == consumer; } - // start or continue consuming when there is no consumer + /// start or continue consuming when producing without consumer void enableAutoConsumption(); const MemBuf &buf() const { return theBuf; } @@ -151,7 +151,7 @@ protected: void postConsume(size_t size); void postAppend(size_t size); - void startAutoConsumption(); // delayed start of enabled consumption + void startAutoConsumptionIfNeeded(); private: int64_t theBodySize; // expected total content length, if known @@ -163,7 +163,7 @@ private: MemBuf theBuf; // produced but not yet consumed content, if any - bool mustAutoConsume; // consume when there is no consumer + bool mustAutoConsume; ///< keep theBuf empty when producing without consumer bool abortedConsumption; ///< called BodyProducer::noteBodyConsumerAborted bool isCheckedOut; // to keep track of checkout violations }; -- 2.47.2