BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
theProducer(aProducer), theConsumer(0),
thePutSize(0), theGetSize(0),
- mustAutoConsume(false), isCheckedOut(false)
+ mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
{
// TODO: teach MemBuf to start with zero minSize
// TODO: limit maxSize by theBodySize, when known?
return false;
}
+ Must(!abortedConsumption); // did not promise to never consume
+
theConsumer = aConsumer;
debugs(91,7, HERE << "set consumer" << status());
if (theBuf.hasContent())
return true;
}
-// When BodyPipe consumer is gone, all events for that consumer must not
-// reach the new consumer (if any). Otherwise, the calls may go out of order
-// (if _some_ calls are dropped due to the ultimate destination being
-// temporary NULL). The code keeps track of the number of outstanding
-// events and skips that number if consumer leaves. TODO: when AscyncCall
-// support is improved, should we just schedule calls directly to consumer?
void
BodyPipe::clearConsumer()
{
if (theConsumer.set()) {
debugs(91,7, HERE << "clearing consumer" << status());
theConsumer.clear();
- if (consumedSize() && !exhausted()) {
+ // do not abort if we have not consumed so that HTTP or ICAP can retry
+ // benign xaction failures due to persistent connection race conditions
+ if (consumedSize())
+ expectNoConsumption();
+ }
+}
+
+void
+BodyPipe::expectNoConsumption()
+{
+ Must(!theConsumer);
+ if (!abortedConsumption && !exhausted()) {
AsyncCall::Pointer call= asyncCall(91, 7,
"BodyProducer::noteBodyConsumerAborted",
BodyProducerDialer(theProducer,
&BodyProducer::noteBodyConsumerAborted, this));
ScheduleCallHere(call);
- }
+ abortedConsumption = true;
}
}
if (mustAutoConsume)
outputBuffer.append(" A", 2);
+ if (abortedConsumption)
+ outputBuffer.append(" !C", 3);
if (isCheckedOut)
outputBuffer.append(" L", 2); // Locked
// called by consumers
bool setConsumerIfNotLate(const Consumer::Pointer &aConsumer);
void clearConsumer(); // aborts if still piping
+ void expectNoConsumption(); ///< there will be no more setConsumer() calls
size_t getMoreData(MemBuf &buf);
void consume(size_t size);
bool expectMoreAfter(uint64_t offset) const;
MemBuf theBuf; // produced but not yet consumed content, if any
bool mustAutoConsume; // consume when there is no consumer
+ bool abortedConsumption; ///< called BodyProducer::noteBodyConsumerAborted
bool isCheckedOut; // to keep track of checkout violations
CBDATA_CLASS2(BodyPipe);