#include "squid.h"
+#include "base/AsyncJobCalls.h"
+#include "base/TextException.h"
#include "BodyPipe.h"
-#include "TextException.h"
CBDATA_CLASS_INIT(BodyPipe);
// data from a BodyPipe
class BodySink: public BodyConsumer
{
- bool done;
public:
- BodySink():AsyncJob("BodySink"), done(false) {}
- virtual ~BodySink() {}
+ BodySink(const BodyPipe::Pointer &bp): AsyncJob("BodySink"), body_pipe(bp) {}
+ virtual ~BodySink() { assert(!body_pipe); }
virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) {
size_t contentSize = bp->buf().contentSize();
bp->consume(contentSize);
}
virtual void noteBodyProductionEnded(BodyPipe::Pointer bp) {
- stopConsumingFrom(bp);
- done = true;
+ stopConsumingFrom(body_pipe);
}
virtual void noteBodyProducerAborted(BodyPipe::Pointer bp) {
- stopConsumingFrom(bp);
- done = true;
+ stopConsumingFrom(body_pipe);
}
- bool doneAll() const {return done && AsyncJob::doneAll();}
+ bool doneAll() const {return !body_pipe && AsyncJob::doneAll();}
+
+private:
+ BodyPipe::Pointer body_pipe; ///< the pipe we are consuming from
+
CBDATA_CLASS2(BodySink);
};
public:
typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
- BodyProducerDialer(BodyProducer *aProducer, Parent::Method aHandler,
- BodyPipe::Pointer bp): Parent(aProducer, aHandler, bp) {}
+ BodyProducerDialer(const BodyProducer::Pointer &aProducer,
+ Parent::Method aHandler, BodyPipe::Pointer bp):
+ Parent(aProducer, aHandler, bp) {}
virtual bool canDial(AsyncCall &call);
};
public:
typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
- BodyConsumerDialer(BodyConsumer *aConsumer, Parent::Method aHandler,
- BodyPipe::Pointer bp): Parent(aConsumer, aHandler, bp) {}
+ BodyConsumerDialer(const BodyConsumer::Pointer &aConsumer,
+ Parent::Method aHandler, BodyPipe::Pointer bp):
+ Parent(aConsumer, aHandler, bp) {}
virtual bool canDial(AsyncCall &call);
};
if (!Parent::canDial(call))
return false;
- BodyProducer *producer = object;
+ const BodyProducer::Pointer &producer = job;
BodyPipe::Pointer pipe = arg1;
if (!pipe->stillProducing(producer)) {
debugs(call.debugSection, call.debugLevel, HERE << producer <<
if (!Parent::canDial(call))
return false;
- BodyConsumer *consumer = object;
+ const BodyConsumer::Pointer &consumer = job;
BodyPipe::Pointer pipe = arg1;
if (!pipe->stillConsuming(consumer)) {
debugs(call.debugSection, call.debugLevel, HERE << consumer <<
return true;
}
-
/* BodyProducer */
// inform the pipe that we are done and clear the Pointer
pipe = NULL;
}
-
-
/* BodyConsumer */
// inform the pipe that we are done and clear the Pointer
pipe = NULL;
}
-
/* BodyPipe */
BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
theProducer(aProducer), theConsumer(0),
thePutSize(0), theGetSize(0),
- mustAutoConsume(false), isCheckedOut(false)
+ mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
{
// TODO: teach MemBuf to start with zero minSize
// TODO: limit maxSize by theBodySize, when known?
void BodyPipe::setBodySize(uint64_t aBodySize)
{
assert(!bodySizeKnown());
- assert(aBodySize >= 0);
assert(thePutSize <= aBodySize);
// If this assert fails, we need to add code to check for eof and inform
return bodySize() - thePutSize; // bodySize() asserts that size is known
}
+void BodyPipe::expectProductionEndAfter(uint64_t size)
+{
+ const uint64_t expectedSize = thePutSize + size;
+ if (bodySizeKnown())
+ Must(bodySize() == expectedSize);
+ else
+ theBodySize = expectedSize;
+}
+
void
BodyPipe::clearProducer(bool atEof)
{
- if (theProducer) {
+ if (theProducer.set()) {
debugs(91,7, HERE << "clearing BodyPipe producer" << status());
- theProducer = NULL;
+ theProducer.clear();
if (atEof) {
if (!bodySizeKnown())
theBodySize = thePutSize;
- else
- if (bodySize() != thePutSize)
- debugs(91,3, HERE << "aborting on premature eof" << status());
+ else if (bodySize() != thePutSize)
+ debugs(91,3, HERE << "aborting on premature eof" << status());
} else {
// asserta that we can detect the abort if the consumer joins later
assert(!bodySizeKnown() || bodySize() != thePutSize);
}
size_t
-BodyPipe::putMoreData(const char *buf, size_t size)
+BodyPipe::putMoreData(const char *aBuffer, size_t size)
{
if (bodySizeKnown())
size = min((uint64_t)size, unproducedSize());
const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
if ((size = min(size, spaceSize))) {
- theBuf.append(buf, size);
+ theBuf.append(aBuffer, size);
postAppend(size);
return size;
}
}
bool
-BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
+BodyPipe::setConsumerIfNotLate(const Consumer::Pointer &aConsumer)
{
assert(!theConsumer);
- assert(aConsumer);
+ assert(aConsumer.set()); // but might be invalid
// TODO: convert this into an exception and remove IfNotLate suffix
// If there is something consumed already, we are in an auto-consuming mode
return false;
}
+ Must(!abortedConsumption); // did not promise to never consume
+
theConsumer = aConsumer;
debugs(91,7, HERE << "set consumer" << status());
if (theBuf.hasContent())
return true;
}
-// When BodyPipe consumer is gone, all events for that consumer must not
-// reach the new consumer (if any). Otherwise, the calls may go out of order
-// (if _some_ calls are dropped due to the ultimate destination being
-// temporary NULL). The code keeps track of the number of outstanding
-// events and skips that number if consumer leaves. TODO: when AscyncCall
-// support is improved, should we just schedule calls directly to consumer?
void
BodyPipe::clearConsumer()
{
- if (theConsumer) {
+ if (theConsumer.set()) {
debugs(91,7, HERE << "clearing consumer" << status());
- theConsumer = NULL;
- if (consumedSize() && !exhausted()) {
- AsyncCall::Pointer call= asyncCall(91, 7,
- "BodyProducer::noteBodyConsumerAborted",
- BodyProducerDialer(theProducer,
- &BodyProducer::noteBodyConsumerAborted, this));
- ScheduleCallHere(call);
- }
+ theConsumer.clear();
+ // do not abort if we have not consumed so that HTTP or ICAP can retry
+ // benign xaction failures due to persistent connection race conditions
+ if (consumedSize())
+ expectNoConsumption();
+ }
+}
+
+void
+BodyPipe::expectNoConsumption()
+{
+ // We may be called multiple times because multiple jobs on the consumption
+ // chain may realize that there will be no more setConsumer() calls (e.g.,
+ // consuming code and retrying code). It is both difficult and not really
+ // necessary for them to coordinate their expectNoConsumption() calls.
+
+ // As a consequence, we may be called when we are auto-consuming already.
+
+ if (!abortedConsumption && !exhausted()) {
+ // Before we abort, any regular consumption should be over and auto
+ // consumption must not be started.
+ Must(!theConsumer);
+
+ AsyncCall::Pointer call= asyncCall(91, 7,
+ "BodyProducer::noteBodyConsumerAborted",
+ BodyProducerDialer(theProducer,
+ &BodyProducer::noteBodyConsumerAborted, this));
+ ScheduleCallHere(call);
+ abortedConsumption = true;
+
+ // in case somebody enabled auto-consumption before regular one aborted
+ if (mustAutoConsume)
+ startAutoConsumption();
}
}
size_t
-BodyPipe::getMoreData(MemBuf &buf)
+BodyPipe::getMoreData(MemBuf &aMemBuffer)
{
if (!theBuf.hasContent())
return 0; // did not touch the possibly uninitialized buf
- if (buf.isNull())
- buf.init();
- const size_t size = min(theBuf.contentSize(), buf.potentialSpaceSize());
- buf.append(theBuf.content(), size);
+ if (aMemBuffer.isNull())
+ aMemBuffer.init();
+ const size_t size = min(theBuf.contentSize(), aMemBuffer.potentialSpaceSize());
+ aMemBuffer.append(theBuf.content(), size);
theBuf.consume(size);
postConsume(size);
return size; // cannot be zero if we called buf.init above
{
Must(mustAutoConsume);
Must(!theConsumer);
- theConsumer = new BodySink;
+ theConsumer = new BodySink(this);
debugs(91,7, HERE << "starting auto consumption" << status());
scheduleBodyDataNotification();
}
const size_t currentSize = theBuf.contentSize();
if (checkout.checkedOutSize > currentSize)
postConsume(checkout.checkedOutSize - currentSize);
- else
- if (checkout.checkedOutSize < currentSize)
- postAppend(currentSize - checkout.checkedOutSize);
+ else if (checkout.checkedOutSize < currentSize)
+ postAppend(currentSize - checkout.checkedOutSize);
}
void
// raw buffers should always check them in (possibly unchanged)
// instead of relying on the automated undo mechanism of Checkout.
// The code can always use a temporary buffer to accomplish that.
- assert(checkout.checkedOutSize == currentSize);
+ Must(checkout.checkedOutSize == currentSize);
}
// TODO: Optimize: inform consumer/producer about more data/space only if
clearProducer(true); // reached end-of-body
}
-
void
BodyPipe::scheduleBodyDataNotification()
{
- if (theConsumer) {
+ if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
AsyncCall::Pointer call = asyncCall(91, 7,
"BodyConsumer::noteMoreBodyDataAvailable",
BodyConsumerDialer(theConsumer,
void
BodyPipe::scheduleBodyEndNotification()
{
- if (theConsumer) {
+ if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
if (bodySizeKnown() && bodySize() == thePutSize) {
AsyncCall::Pointer call = asyncCall(91, 7,
"BodyConsumer::noteBodyProductionEnded",
// a short temporary string describing buffer status for debugging
const char *BodyPipe::status() const
{
- static MemBuf buf;
- buf.reset();
+ static MemBuf outputBuffer;
+ outputBuffer.reset();
- buf.append(" [", 2);
+ outputBuffer.append(" [", 2);
- buf.Printf("%"PRIu64"<=%"PRIu64, theGetSize, thePutSize);
+ outputBuffer.Printf("%" PRIu64 "<=%" PRIu64, theGetSize, thePutSize);
if (theBodySize >= 0)
- buf.Printf("<=%"PRId64, theBodySize);
+ outputBuffer.Printf("<=%" PRId64, theBodySize);
else
- buf.append("<=?", 3);
+ outputBuffer.append("<=?", 3);
- buf.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
+ outputBuffer.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
- buf.Printf(" pipe%p", this);
- if (theProducer)
- buf.Printf(" prod%p", theProducer);
- if (theConsumer)
- buf.Printf(" cons%p", theConsumer);
+ outputBuffer.Printf(" pipe%p", this);
+ if (theProducer.set())
+ outputBuffer.Printf(" prod%p", theProducer.get());
+ if (theConsumer.set())
+ outputBuffer.Printf(" cons%p", theConsumer.get());
if (mustAutoConsume)
- buf.append(" A", 2);
+ outputBuffer.append(" A", 2);
+ if (abortedConsumption)
+ outputBuffer.append(" !C", 3);
if (isCheckedOut)
- buf.append(" L", 2); // Locked
+ outputBuffer.append(" L", 2); // Locked
- buf.append("]", 1);
+ outputBuffer.append("]", 1);
- buf.terminate();
+ outputBuffer.terminate();
- return buf.content();
+ return outputBuffer.content();
}
-
/* BodyPipeCheckout */
BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe),
BodyPipeCheckout::~BodyPipeCheckout()
{
- if (!checkedIn)
- pipe.undoCheckOut(*this);
+ if (!checkedIn) {
+ // Do not pipe.undoCheckOut(*this) because it asserts or throws
+ // TODO: consider implementing the long-term solution discussed at
+ // http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html
+ debugs(91,2, HERE << "Warning: cannot undo BodyPipeCheckout");
+ pipe.checkIn(*this);
+ }
}
void
checkedIn = true;
}
-
BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe),
buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
checkedIn(c.checkedIn)