]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/BodyPipe.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / BodyPipe.cc
index 0cae9446c1436359dda6bbc1121e6f2f4d6e30d1..e8962eb13fc4bf504e7e410f97d34d27b007a16a 100644 (file)
@@ -1,7 +1,8 @@
 
 #include "squid.h"
+#include "base/AsyncJobCalls.h"
+#include "base/TextException.h"
 #include "BodyPipe.h"
-#include "TextException.h"
 
 CBDATA_CLASS_INIT(BodyPipe);
 
@@ -9,24 +10,25 @@ CBDATA_CLASS_INIT(BodyPipe);
 // data from a BodyPipe
 class BodySink: public BodyConsumer
 {
-    bool done;
 public:
-    BodySink():AsyncJob("BodySink"), done(false) {}
-    virtual ~BodySink() {}
+    BodySink(const BodyPipe::Pointer &bp): AsyncJob("BodySink"), body_pipe(bp) {}
+    virtual ~BodySink() { assert(!body_pipe); }
 
     virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) {
         size_t contentSize = bp->buf().contentSize();
         bp->consume(contentSize);
     }
     virtual void noteBodyProductionEnded(BodyPipe::Pointer bp) {
-        stopConsumingFrom(bp);
-        done = true;
+        stopConsumingFrom(body_pipe);
     }
     virtual void noteBodyProducerAborted(BodyPipe::Pointer bp) {
-        stopConsumingFrom(bp);
-        done = true;
+        stopConsumingFrom(body_pipe);
     }
-    bool doneAll() const {return done && AsyncJob::doneAll();}
+    bool doneAll() const {return !body_pipe && AsyncJob::doneAll();}
+
+private:
+    BodyPipe::Pointer body_pipe; ///< the pipe we are consuming from
+
     CBDATA_CLASS2(BodySink);
 };
 
@@ -40,8 +42,9 @@ class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
 public:
     typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
 
-    BodyProducerDialer(BodyProducer *aProducer, Parent::Method aHandler,
-                       BodyPipe::Pointer bp): Parent(aProducer, aHandler, bp) {}
+    BodyProducerDialer(const BodyProducer::Pointer &aProducer,
+                       Parent::Method aHandler, BodyPipe::Pointer bp):
+            Parent(aProducer, aHandler, bp) {}
 
     virtual bool canDial(AsyncCall &call);
 };
@@ -54,8 +57,9 @@ class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
 public:
     typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
 
-    BodyConsumerDialer(BodyConsumer *aConsumer, Parent::Method aHandler,
-                       BodyPipe::Pointer bp): Parent(aConsumer, aHandler, bp) {}
+    BodyConsumerDialer(const BodyConsumer::Pointer &aConsumer,
+                       Parent::Method aHandler, BodyPipe::Pointer bp):
+            Parent(aConsumer, aHandler, bp) {}
 
     virtual bool canDial(AsyncCall &call);
 };
@@ -66,7 +70,7 @@ BodyProducerDialer::canDial(AsyncCall &call)
     if (!Parent::canDial(call))
         return false;
 
-    BodyProducer *producer = object;
+    const BodyProducer::Pointer &producer = job;
     BodyPipe::Pointer pipe = arg1;
     if (!pipe->stillProducing(producer)) {
         debugs(call.debugSection, call.debugLevel, HERE << producer <<
@@ -83,7 +87,7 @@ BodyConsumerDialer::canDial(AsyncCall &call)
     if (!Parent::canDial(call))
         return false;
 
-    BodyConsumer *consumer = object;
+    const BodyConsumer::Pointer &consumer = job;
     BodyPipe::Pointer pipe = arg1;
     if (!pipe->stillConsuming(consumer)) {
         debugs(call.debugSection, call.debugLevel, HERE << consumer <<
@@ -94,7 +98,6 @@ BodyConsumerDialer::canDial(AsyncCall &call)
     return true;
 }
 
-
 /* BodyProducer */
 
 // inform the pipe that we are done and clear the Pointer
@@ -107,8 +110,6 @@ void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
     pipe = NULL;
 }
 
-
-
 /* BodyConsumer */
 
 // inform the pipe that we are done and clear the Pointer
@@ -120,13 +121,12 @@ void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
     pipe = NULL;
 }
 
-
 /* BodyPipe */
 
 BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
         theProducer(aProducer), theConsumer(0),
         thePutSize(0), theGetSize(0),
-        mustAutoConsume(false), isCheckedOut(false)
+        mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
 {
     // TODO: teach MemBuf to start with zero minSize
     // TODO: limit maxSize by theBodySize, when known?
@@ -145,7 +145,6 @@ BodyPipe::~BodyPipe()
 void BodyPipe::setBodySize(uint64_t aBodySize)
 {
     assert(!bodySizeKnown());
-    assert(aBodySize >= 0);
     assert(thePutSize <= aBodySize);
 
     // If this assert fails, we need to add code to check for eof and inform
@@ -180,18 +179,26 @@ uint64_t BodyPipe::unproducedSize() const
     return bodySize() - thePutSize; // bodySize() asserts that size is known
 }
 
+void BodyPipe::expectProductionEndAfter(uint64_t size)
+{
+    const uint64_t expectedSize = thePutSize + size;
+    if (bodySizeKnown())
+        Must(bodySize() == expectedSize);
+    else
+        theBodySize = expectedSize;
+}
+
 void
 BodyPipe::clearProducer(bool atEof)
 {
-    if (theProducer) {
+    if (theProducer.set()) {
         debugs(91,7, HERE << "clearing BodyPipe producer" << status());
-        theProducer = NULL;
+        theProducer.clear();
         if (atEof) {
             if (!bodySizeKnown())
                 theBodySize = thePutSize;
-            else
-                if (bodySize() != thePutSize)
-                    debugs(91,3, HERE << "aborting on premature eof" << status());
+            else if (bodySize() != thePutSize)
+                debugs(91,3, HERE << "aborting on premature eof" << status());
         } else {
             // asserta that we can detect the abort if the consumer joins later
             assert(!bodySizeKnown() || bodySize() != thePutSize);
@@ -201,14 +208,14 @@ BodyPipe::clearProducer(bool atEof)
 }
 
 size_t
-BodyPipe::putMoreData(const char *buf, size_t size)
+BodyPipe::putMoreData(const char *aBuffer, size_t size)
 {
     if (bodySizeKnown())
         size = min((uint64_t)size, unproducedSize());
 
     const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
     if ((size = min(size, spaceSize))) {
-        theBuf.append(buf, size);
+        theBuf.append(aBuffer, size);
         postAppend(size);
         return size;
     }
@@ -216,10 +223,10 @@ BodyPipe::putMoreData(const char *buf, size_t size)
 }
 
 bool
-BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
+BodyPipe::setConsumerIfNotLate(const Consumer::Pointer &aConsumer)
 {
     assert(!theConsumer);
-    assert(aConsumer);
+    assert(aConsumer.set()); // but might be invalid
 
     // TODO: convert this into an exception and remove IfNotLate suffix
     // If there is something consumed already, we are in an auto-consuming mode
@@ -229,6 +236,8 @@ BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
         return false;
     }
 
+    Must(!abortedConsumption); // did not promise to never consume
+
     theConsumer = aConsumer;
     debugs(91,7, HERE << "set consumer" << status());
     if (theBuf.hasContent())
@@ -239,38 +248,57 @@ BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
     return true;
 }
 
-// When BodyPipe consumer is gone, all events for that consumer must not
-// reach the new consumer (if any). Otherwise, the calls may go out of order
-// (if _some_ calls are dropped due to the ultimate destination being
-// temporary NULL). The code keeps track of the number of outstanding
-// events and skips that number if consumer leaves. TODO: when AscyncCall
-// support is improved, should we just schedule calls directly to consumer?
 void
 BodyPipe::clearConsumer()
 {
-    if (theConsumer) {
+    if (theConsumer.set()) {
         debugs(91,7, HERE << "clearing consumer" << status());
-        theConsumer = NULL;
-        if (consumedSize() && !exhausted()) {
-            AsyncCall::Pointer call= asyncCall(91, 7,
-                                               "BodyProducer::noteBodyConsumerAborted",
-                                               BodyProducerDialer(theProducer,
-                                                                  &BodyProducer::noteBodyConsumerAborted, this));
-            ScheduleCallHere(call);
-        }
+        theConsumer.clear();
+        // do not abort if we have not consumed so that HTTP or ICAP can retry
+        // benign xaction failures due to persistent connection race conditions
+        if (consumedSize())
+            expectNoConsumption();
+    }
+}
+
+void
+BodyPipe::expectNoConsumption()
+{
+    // We may be called multiple times because multiple jobs on the consumption
+    // chain may realize that there will be no more setConsumer() calls (e.g.,
+    // consuming code and retrying code). It is both difficult and not really
+    // necessary for them to coordinate their expectNoConsumption() calls.
+
+    // As a consequence, we may be called when we are auto-consuming already.
+
+    if (!abortedConsumption && !exhausted()) {
+        // Before we abort, any regular consumption should be over and auto
+        // consumption must not be started.
+        Must(!theConsumer);
+
+        AsyncCall::Pointer call= asyncCall(91, 7,
+                                           "BodyProducer::noteBodyConsumerAborted",
+                                           BodyProducerDialer(theProducer,
+                                                              &BodyProducer::noteBodyConsumerAborted, this));
+        ScheduleCallHere(call);
+        abortedConsumption = true;
+
+        // in case somebody enabled auto-consumption before regular one aborted
+        if (mustAutoConsume)
+            startAutoConsumption();
     }
 }
 
 size_t
-BodyPipe::getMoreData(MemBuf &buf)
+BodyPipe::getMoreData(MemBuf &aMemBuffer)
 {
     if (!theBuf.hasContent())
         return 0; // did not touch the possibly uninitialized buf
 
-    if (buf.isNull())
-        buf.init();
-    const size_t size = min(theBuf.contentSize(), buf.potentialSpaceSize());
-    buf.append(theBuf.content(), size);
+    if (aMemBuffer.isNull())
+        aMemBuffer.init();
+    const size_t size = min(theBuf.contentSize(), aMemBuffer.potentialSpaceSize());
+    aMemBuffer.append(theBuf.content(), size);
     theBuf.consume(size);
     postConsume(size);
     return size; // cannot be zero if we called buf.init above
@@ -300,7 +328,7 @@ BodyPipe::startAutoConsumption()
 {
     Must(mustAutoConsume);
     Must(!theConsumer);
-    theConsumer = new BodySink;
+    theConsumer = new BodySink(this);
     debugs(91,7, HERE << "starting auto consumption" << status());
     scheduleBodyDataNotification();
 }
@@ -321,9 +349,8 @@ BodyPipe::checkIn(Checkout &checkout)
     const size_t currentSize = theBuf.contentSize();
     if (checkout.checkedOutSize > currentSize)
         postConsume(checkout.checkedOutSize - currentSize);
-    else
-        if (checkout.checkedOutSize < currentSize)
-            postAppend(currentSize - checkout.checkedOutSize);
+    else if (checkout.checkedOutSize < currentSize)
+        postAppend(currentSize - checkout.checkedOutSize);
 }
 
 void
@@ -336,7 +363,7 @@ BodyPipe::undoCheckOut(Checkout &checkout)
     // raw buffers should always check them in (possibly unchanged)
     // instead of relying on the automated undo mechanism of Checkout.
     // The code can always use a temporary buffer to accomplish that.
-    assert(checkout.checkedOutSize == currentSize);
+    Must(checkout.checkedOutSize == currentSize);
 }
 
 // TODO: Optimize: inform consumer/producer about more data/space only if
@@ -375,11 +402,10 @@ BodyPipe::postAppend(size_t size)
         clearProducer(true); // reached end-of-body
 }
 
-
 void
 BodyPipe::scheduleBodyDataNotification()
 {
-    if (theConsumer) {
+    if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
         AsyncCall::Pointer call = asyncCall(91, 7,
                                             "BodyConsumer::noteMoreBodyDataAvailable",
                                             BodyConsumerDialer(theConsumer,
@@ -391,7 +417,7 @@ BodyPipe::scheduleBodyDataNotification()
 void
 BodyPipe::scheduleBodyEndNotification()
 {
-    if (theConsumer) {
+    if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
         if (bodySizeKnown() && bodySize() == thePutSize) {
             AsyncCall::Pointer call = asyncCall(91, 7,
                                                 "BodyConsumer::noteBodyProductionEnded",
@@ -411,38 +437,39 @@ BodyPipe::scheduleBodyEndNotification()
 // a short temporary string describing buffer status for debugging
 const char *BodyPipe::status() const
 {
-    static MemBuf buf;
-    buf.reset();
+    static MemBuf outputBuffer;
+    outputBuffer.reset();
 
-    buf.append(" [", 2);
+    outputBuffer.append(" [", 2);
 
-    buf.Printf("%"PRIu64"<=%"PRIu64, theGetSize, thePutSize);
+    outputBuffer.Printf("%" PRIu64 "<=%" PRIu64, theGetSize, thePutSize);
     if (theBodySize >= 0)
-        buf.Printf("<=%"PRId64, theBodySize);
+        outputBuffer.Printf("<=%" PRId64, theBodySize);
     else
-        buf.append("<=?", 3);
+        outputBuffer.append("<=?", 3);
 
-    buf.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
+    outputBuffer.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
 
-    buf.Printf(" pipe%p", this);
-    if (theProducer)
-        buf.Printf(" prod%p", theProducer);
-    if (theConsumer)
-        buf.Printf(" cons%p", theConsumer);
+    outputBuffer.Printf(" pipe%p", this);
+    if (theProducer.set())
+        outputBuffer.Printf(" prod%p", theProducer.get());
+    if (theConsumer.set())
+        outputBuffer.Printf(" cons%p", theConsumer.get());
 
     if (mustAutoConsume)
-        buf.append(" A", 2);
+        outputBuffer.append(" A", 2);
+    if (abortedConsumption)
+        outputBuffer.append(" !C", 3);
     if (isCheckedOut)
-        buf.append(" L", 2); // Locked
+        outputBuffer.append(" L", 2); // Locked
 
-    buf.append("]", 1);
+    outputBuffer.append("]", 1);
 
-    buf.terminate();
+    outputBuffer.terminate();
 
-    return buf.content();
+    return outputBuffer.content();
 }
 
-
 /* BodyPipeCheckout */
 
 BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe),
@@ -453,8 +480,13 @@ BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe),
 
 BodyPipeCheckout::~BodyPipeCheckout()
 {
-    if (!checkedIn)
-        pipe.undoCheckOut(*this);
+    if (!checkedIn) {
+        // Do not pipe.undoCheckOut(*this) because it asserts or throws
+        // TODO: consider implementing the long-term solution discussed at
+        // http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html
+        debugs(91,2, HERE << "Warning: cannot undo BodyPipeCheckout");
+        pipe.checkIn(*this);
+    }
 }
 
 void
@@ -465,7 +497,6 @@ BodyPipeCheckout::checkIn()
     checkedIn = true;
 }
 
-
 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe),
         buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
         checkedIn(c.checkedIn)