3 #include "base/AsyncJobCalls.h"
4 #include "base/TextException.h"
7 CBDATA_CLASS_INIT(BodyPipe
);
9 // BodySink is a BodyConsumer class which just consume and drops
10 // data from a BodyPipe
11 class BodySink
: public BodyConsumer
15 BodySink():AsyncJob("BodySink"), done(false) {}
16 virtual ~BodySink() {}
18 virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp
) {
19 size_t contentSize
= bp
->buf().contentSize();
20 bp
->consume(contentSize
);
22 virtual void noteBodyProductionEnded(BodyPipe::Pointer bp
) {
23 stopConsumingFrom(bp
);
26 virtual void noteBodyProducerAborted(BodyPipe::Pointer bp
) {
27 stopConsumingFrom(bp
);
30 bool doneAll() const {return done
&& AsyncJob::doneAll();}
31 CBDATA_CLASS2(BodySink
);
34 CBDATA_CLASS_INIT(BodySink
);
36 // The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
37 // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
38 // the BodyPipe passed as argument
39 class BodyProducerDialer
: public UnaryMemFunT
<BodyProducer
, BodyPipe::Pointer
>
42 typedef UnaryMemFunT
<BodyProducer
, BodyPipe::Pointer
> Parent
;
44 BodyProducerDialer(const BodyProducer::Pointer
&aProducer
,
45 Parent::Method aHandler
, BodyPipe::Pointer bp
):
46 Parent(aProducer
, aHandler
, bp
) {}
48 virtual bool canDial(AsyncCall
&call
);
51 // The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
52 // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
53 // of the BodyPipe passed as argument
54 class BodyConsumerDialer
: public UnaryMemFunT
<BodyConsumer
, BodyPipe::Pointer
>
57 typedef UnaryMemFunT
<BodyConsumer
, BodyPipe::Pointer
> Parent
;
59 BodyConsumerDialer(const BodyConsumer::Pointer
&aConsumer
,
60 Parent::Method aHandler
, BodyPipe::Pointer bp
):
61 Parent(aConsumer
, aHandler
, bp
) {}
63 virtual bool canDial(AsyncCall
&call
);
67 BodyProducerDialer::canDial(AsyncCall
&call
)
69 if (!Parent::canDial(call
))
72 const BodyProducer::Pointer
&producer
= job
;
73 BodyPipe::Pointer pipe
= arg1
;
74 if (!pipe
->stillProducing(producer
)) {
75 debugs(call
.debugSection
, call
.debugLevel
, HERE
<< producer
<<
76 " no longer producing for " << pipe
->status());
77 return call
.cancel("no longer producing");
84 BodyConsumerDialer::canDial(AsyncCall
&call
)
86 if (!Parent::canDial(call
))
89 const BodyConsumer::Pointer
&consumer
= job
;
90 BodyPipe::Pointer pipe
= arg1
;
91 if (!pipe
->stillConsuming(consumer
)) {
92 debugs(call
.debugSection
, call
.debugLevel
, HERE
<< consumer
<<
93 " no longer consuming from " << pipe
->status());
94 return call
.cancel("no longer consuming");
103 // inform the pipe that we are done and clear the Pointer
104 void BodyProducer::stopProducingFor(RefCount
<BodyPipe
> &pipe
, bool atEof
)
106 debugs(91,7, HERE
<< this << " will not produce for " << pipe
<<
107 "; atEof: " << atEof
);
108 assert(pipe
!= NULL
); // be strict: the caller state may depend on this
109 pipe
->clearProducer(atEof
);
117 // inform the pipe that we are done and clear the Pointer
118 void BodyConsumer::stopConsumingFrom(RefCount
<BodyPipe
> &pipe
)
120 debugs(91,7, HERE
<< this << " will not consume from " << pipe
);
121 assert(pipe
!= NULL
); // be strict: the caller state may depend on this
122 pipe
->clearConsumer();
129 BodyPipe::BodyPipe(Producer
*aProducer
): theBodySize(-1),
130 theProducer(aProducer
), theConsumer(0),
131 thePutSize(0), theGetSize(0),
132 mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
134 // TODO: teach MemBuf to start with zero minSize
135 // TODO: limit maxSize by theBodySize, when known?
136 theBuf
.init(2*1024, MaxCapacity
);
137 debugs(91,7, HERE
<< "created BodyPipe" << status());
140 BodyPipe::~BodyPipe()
142 debugs(91,7, HERE
<< "destroying BodyPipe" << status());
143 assert(!theProducer
);
144 assert(!theConsumer
);
148 void BodyPipe::setBodySize(uint64_t aBodySize
)
150 assert(!bodySizeKnown());
151 assert(thePutSize
<= aBodySize
);
153 // If this assert fails, we need to add code to check for eof and inform
154 // the consumer about the eof condition via scheduleBodyEndNotification,
155 // because just setting a body size limit may trigger the eof condition.
156 assert(!theConsumer
);
158 theBodySize
= aBodySize
;
159 debugs(91,7, HERE
<< "set body size" << status());
162 uint64_t BodyPipe::bodySize() const
164 assert(bodySizeKnown());
165 return static_cast<uint64_t>(theBodySize
);
168 bool BodyPipe::expectMoreAfter(uint64_t offset
) const
170 assert(theGetSize
<= offset
);
171 return offset
< thePutSize
|| // buffer has more now or
172 (!productionEnded() && mayNeedMoreData()); // buffer will have more
175 bool BodyPipe::exhausted() const
177 return !expectMoreAfter(theGetSize
);
180 uint64_t BodyPipe::unproducedSize() const
182 return bodySize() - thePutSize
; // bodySize() asserts that size is known
185 void BodyPipe::expectProductionEndAfter(uint64_t size
)
187 const uint64_t expectedSize
= thePutSize
+ size
;
189 Must(bodySize() == expectedSize
);
191 theBodySize
= expectedSize
;
195 BodyPipe::clearProducer(bool atEof
)
197 if (theProducer
.set()) {
198 debugs(91,7, HERE
<< "clearing BodyPipe producer" << status());
201 if (!bodySizeKnown())
202 theBodySize
= thePutSize
;
203 else if (bodySize() != thePutSize
)
204 debugs(91,3, HERE
<< "aborting on premature eof" << status());
206 // asserta that we can detect the abort if the consumer joins later
207 assert(!bodySizeKnown() || bodySize() != thePutSize
);
209 scheduleBodyEndNotification();
214 BodyPipe::putMoreData(const char *aBuffer
, size_t size
)
217 size
= min((uint64_t)size
, unproducedSize());
219 const size_t spaceSize
= static_cast<size_t>(theBuf
.potentialSpaceSize());
220 if ((size
= min(size
, spaceSize
))) {
221 theBuf
.append(aBuffer
, size
);
229 BodyPipe::setConsumerIfNotLate(const Consumer::Pointer
&aConsumer
)
231 assert(!theConsumer
);
232 assert(aConsumer
.set()); // but might be invalid
234 // TODO: convert this into an exception and remove IfNotLate suffix
235 // If there is something consumed already, we are in an auto-consuming mode
236 // and it is too late to attach a real consumer to the pipe.
237 if (theGetSize
> 0) {
238 assert(mustAutoConsume
);
242 Must(!abortedConsumption
); // did not promise to never consume
244 theConsumer
= aConsumer
;
245 debugs(91,7, HERE
<< "set consumer" << status());
246 if (theBuf
.hasContent())
247 scheduleBodyDataNotification();
249 scheduleBodyEndNotification();
255 BodyPipe::clearConsumer()
257 if (theConsumer
.set()) {
258 debugs(91,7, HERE
<< "clearing consumer" << status());
260 // do not abort if we have not consumed so that HTTP or ICAP can retry
261 // benign xaction failures due to persistent connection race conditions
263 expectNoConsumption();
268 BodyPipe::expectNoConsumption()
271 if (!abortedConsumption
&& !exhausted()) {
272 AsyncCall::Pointer call
= asyncCall(91, 7,
273 "BodyProducer::noteBodyConsumerAborted",
274 BodyProducerDialer(theProducer
,
275 &BodyProducer::noteBodyConsumerAborted
, this));
276 ScheduleCallHere(call
);
277 abortedConsumption
= true;
282 BodyPipe::getMoreData(MemBuf
&aMemBuffer
)
284 if (!theBuf
.hasContent())
285 return 0; // did not touch the possibly uninitialized buf
287 if (aMemBuffer
.isNull())
289 const size_t size
= min(theBuf
.contentSize(), aMemBuffer
.potentialSpaceSize());
290 aMemBuffer
.append(theBuf
.content(), size
);
291 theBuf
.consume(size
);
293 return size
; // cannot be zero if we called buf.init above
297 BodyPipe::consume(size_t size
)
299 theBuf
.consume(size
);
303 // In the AutoConsumption mode the consumer has gone but the producer continues
304 // producing data. We are using a BodySink BodyConsumer which just discards the produced data.
306 BodyPipe::enableAutoConsumption()
308 mustAutoConsume
= true;
309 debugs(91,5, HERE
<< "enabled auto consumption" << status());
310 if (!theConsumer
&& theBuf
.hasContent())
311 startAutoConsumption();
314 // start auto consumption by creating body sink
316 BodyPipe::startAutoConsumption()
318 Must(mustAutoConsume
);
320 theConsumer
= new BodySink
;
321 debugs(91,7, HERE
<< "starting auto consumption" << status());
322 scheduleBodyDataNotification();
328 assert(!isCheckedOut
);
334 BodyPipe::checkIn(Checkout
&checkout
)
336 assert(isCheckedOut
);
337 isCheckedOut
= false;
338 const size_t currentSize
= theBuf
.contentSize();
339 if (checkout
.checkedOutSize
> currentSize
)
340 postConsume(checkout
.checkedOutSize
- currentSize
);
341 else if (checkout
.checkedOutSize
< currentSize
)
342 postAppend(currentSize
- checkout
.checkedOutSize
);
346 BodyPipe::undoCheckOut(Checkout
&checkout
)
348 assert(isCheckedOut
);
349 const size_t currentSize
= theBuf
.contentSize();
350 // We can only undo if size did not change, and even that carries
351 // some risk. If this becomes a problem, the code checking out
352 // raw buffers should always check them in (possibly unchanged)
353 // instead of relying on the automated undo mechanism of Checkout.
354 // The code can always use a temporary buffer to accomplish that.
355 Must(checkout
.checkedOutSize
== currentSize
);
358 // TODO: Optimize: inform consumer/producer about more data/space only if
359 // they used the data/space since we notified them last time.
362 BodyPipe::postConsume(size_t size
)
364 assert(!isCheckedOut
);
366 debugs(91,7, HERE
<< "consumed " << size
<< " bytes" << status());
367 if (mayNeedMoreData()) {
368 AsyncCall::Pointer call
= asyncCall(91, 7,
369 "BodyProducer::noteMoreBodySpaceAvailable",
370 BodyProducerDialer(theProducer
,
371 &BodyProducer::noteMoreBodySpaceAvailable
, this));
372 ScheduleCallHere(call
);
377 BodyPipe::postAppend(size_t size
)
379 assert(!isCheckedOut
);
381 debugs(91,7, HERE
<< "added " << size
<< " bytes" << status());
383 if (mustAutoConsume
&& !theConsumer
&& size
> 0)
384 startAutoConsumption();
386 // We should not consume here even if mustAutoConsume because the
387 // caller may not be ready for the data to be consumed during this call.
388 scheduleBodyDataNotification();
390 if (!mayNeedMoreData())
391 clearProducer(true); // reached end-of-body
396 BodyPipe::scheduleBodyDataNotification()
398 if (theConsumer
.valid()) { // TODO: allow asyncCall() to check this instead
399 AsyncCall::Pointer call
= asyncCall(91, 7,
400 "BodyConsumer::noteMoreBodyDataAvailable",
401 BodyConsumerDialer(theConsumer
,
402 &BodyConsumer::noteMoreBodyDataAvailable
, this));
403 ScheduleCallHere(call
);
408 BodyPipe::scheduleBodyEndNotification()
410 if (theConsumer
.valid()) { // TODO: allow asyncCall() to check this instead
411 if (bodySizeKnown() && bodySize() == thePutSize
) {
412 AsyncCall::Pointer call
= asyncCall(91, 7,
413 "BodyConsumer::noteBodyProductionEnded",
414 BodyConsumerDialer(theConsumer
,
415 &BodyConsumer::noteBodyProductionEnded
, this));
416 ScheduleCallHere(call
);
418 AsyncCall::Pointer call
= asyncCall(91, 7,
419 "BodyConsumer::noteBodyProducerAborted",
420 BodyConsumerDialer(theConsumer
,
421 &BodyConsumer::noteBodyProducerAborted
, this));
422 ScheduleCallHere(call
);
427 // a short temporary string describing buffer status for debugging
428 const char *BodyPipe::status() const
430 static MemBuf outputBuffer
;
431 outputBuffer
.reset();
433 outputBuffer
.append(" [", 2);
435 outputBuffer
.Printf("%"PRIu64
"<=%"PRIu64
, theGetSize
, thePutSize
);
436 if (theBodySize
>= 0)
437 outputBuffer
.Printf("<=%"PRId64
, theBodySize
);
439 outputBuffer
.append("<=?", 3);
441 outputBuffer
.Printf(" %d+%d", (int)theBuf
.contentSize(), (int)theBuf
.spaceSize());
443 outputBuffer
.Printf(" pipe%p", this);
444 if (theProducer
.set())
445 outputBuffer
.Printf(" prod%p", theProducer
.get());
446 if (theConsumer
.set())
447 outputBuffer
.Printf(" cons%p", theConsumer
.get());
450 outputBuffer
.append(" A", 2);
451 if (abortedConsumption
)
452 outputBuffer
.append(" !C", 3);
454 outputBuffer
.append(" L", 2); // Locked
456 outputBuffer
.append("]", 1);
458 outputBuffer
.terminate();
460 return outputBuffer
.content();
464 /* BodyPipeCheckout */
466 BodyPipeCheckout::BodyPipeCheckout(BodyPipe
&aPipe
): pipe(aPipe
),
467 buf(aPipe
.checkOut()), offset(aPipe
.consumedSize()),
468 checkedOutSize(buf
.contentSize()), checkedIn(false)
472 BodyPipeCheckout::~BodyPipeCheckout()
475 // Do not pipe.undoCheckOut(*this) because it asserts or throws
476 // TODO: consider implementing the long-term solution discussed at
477 // http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html
478 debugs(91,2, HERE
<< "Warning: cannot undo BodyPipeCheckout");
484 BodyPipeCheckout::checkIn()
492 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout
&c
): pipe(c
.pipe
),
493 buf(c
.buf
), offset(c
.offset
), checkedOutSize(c
.checkedOutSize
),
494 checkedIn(c
.checkedIn
)
496 assert(false); // prevent copying
500 BodyPipeCheckout::operator =(const BodyPipeCheckout
&)
502 assert(false); // prevent assignment