2 * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
10 #include "base/AsyncJobCalls.h"
11 #include "base/TextException.h"
14 // BodySink is a BodyConsumer class which just consume and drops
15 // data from a BodyPipe
16 class BodySink
: public BodyConsumer
18 CBDATA_CLASS(BodySink
);
21 BodySink(const BodyPipe::Pointer
&bp
): AsyncJob("BodySink"), body_pipe(bp
) {}
22 virtual ~BodySink() { assert(!body_pipe
); }
24 virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp
) {
25 size_t contentSize
= bp
->buf().contentSize();
26 bp
->consume(contentSize
);
28 virtual void noteBodyProductionEnded(BodyPipe::Pointer
) {
29 stopConsumingFrom(body_pipe
);
31 virtual void noteBodyProducerAborted(BodyPipe::Pointer
) {
32 stopConsumingFrom(body_pipe
);
34 bool doneAll() const {return !body_pipe
&& AsyncJob::doneAll();}
37 BodyPipe::Pointer body_pipe
; ///< the pipe we are consuming from
40 CBDATA_CLASS_INIT(BodySink
);
42 // The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
43 // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
44 // the BodyPipe passed as argument
45 class BodyProducerDialer
: public UnaryMemFunT
<BodyProducer
, BodyPipe::Pointer
>
48 typedef UnaryMemFunT
<BodyProducer
, BodyPipe::Pointer
> Parent
;
50 BodyProducerDialer(const BodyProducer::Pointer
&aProducer
,
51 Parent::Method aHandler
, BodyPipe::Pointer bp
):
52 Parent(aProducer
, aHandler
, bp
) {}
54 virtual bool canDial(AsyncCall
&call
);
57 // The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
58 // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
59 // of the BodyPipe passed as argument
60 class BodyConsumerDialer
: public UnaryMemFunT
<BodyConsumer
, BodyPipe::Pointer
>
63 typedef UnaryMemFunT
<BodyConsumer
, BodyPipe::Pointer
> Parent
;
65 BodyConsumerDialer(const BodyConsumer::Pointer
&aConsumer
,
66 Parent::Method aHandler
, BodyPipe::Pointer bp
):
67 Parent(aConsumer
, aHandler
, bp
) {}
69 virtual bool canDial(AsyncCall
&call
);
73 BodyProducerDialer::canDial(AsyncCall
&call
)
75 if (!Parent::canDial(call
))
78 const BodyProducer::Pointer
&producer
= job
;
79 BodyPipe::Pointer aPipe
= arg1
;
80 if (!aPipe
->stillProducing(producer
)) {
81 debugs(call
.debugSection
, call
.debugLevel
, producer
<< " no longer producing for " << aPipe
->status());
82 return call
.cancel("no longer producing");
89 BodyConsumerDialer::canDial(AsyncCall
&call
)
91 if (!Parent::canDial(call
))
94 const BodyConsumer::Pointer
&consumer
= job
;
95 BodyPipe::Pointer aPipe
= arg1
;
96 if (!aPipe
->stillConsuming(consumer
)) {
97 debugs(call
.debugSection
, call
.debugLevel
, consumer
<< " no longer consuming from " << aPipe
->status());
98 return call
.cancel("no longer consuming");
106 // inform the pipe that we are done and clear the Pointer
107 void BodyProducer::stopProducingFor(RefCount
<BodyPipe
> &p
, bool atEof
)
109 debugs(91,7, this << " will not produce for " << p
<< "; atEof: " << atEof
);
110 assert(p
!= NULL
); // be strict: the caller state may depend on this
111 p
->clearProducer(atEof
);
117 // inform the pipe that we are done and clear the Pointer
118 void BodyConsumer::stopConsumingFrom(RefCount
<BodyPipe
> &p
)
120 debugs(91,7, this << " will not consume from " << p
);
121 assert(p
!= NULL
); // be strict: the caller state may depend on this
128 BodyPipe::BodyPipe(Producer
*aProducer
): theBodySize(-1),
129 theProducer(aProducer
), theConsumer(0),
130 thePutSize(0), theGetSize(0),
131 mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
133 // TODO: teach MemBuf to start with zero minSize
134 // TODO: limit maxSize by theBodySize, when known?
135 theBuf
.init(2*1024, MaxCapacity
);
136 debugs(91,7, HERE
<< "created BodyPipe" << status());
139 BodyPipe::~BodyPipe()
141 debugs(91,7, HERE
<< "destroying BodyPipe" << status());
142 assert(!theProducer
);
143 assert(!theConsumer
);
147 void BodyPipe::setBodySize(uint64_t aBodySize
)
149 assert(!bodySizeKnown());
150 assert(thePutSize
<= aBodySize
);
152 // If this assert fails, we need to add code to check for eof and inform
153 // the consumer about the eof condition via scheduleBodyEndNotification,
154 // because just setting a body size limit may trigger the eof condition.
155 assert(!theConsumer
);
157 theBodySize
= aBodySize
;
158 debugs(91,7, HERE
<< "set body size" << status());
161 uint64_t BodyPipe::bodySize() const
163 assert(bodySizeKnown());
164 return static_cast<uint64_t>(theBodySize
);
167 bool BodyPipe::expectMoreAfter(uint64_t offset
) const
169 assert(theGetSize
<= offset
);
170 return offset
< thePutSize
|| // buffer has more now or
171 (!productionEnded() && mayNeedMoreData()); // buffer will have more
174 bool BodyPipe::exhausted() const
176 return !expectMoreAfter(theGetSize
);
179 uint64_t BodyPipe::unproducedSize() const
181 return bodySize() - thePutSize
; // bodySize() asserts that size is known
184 void BodyPipe::expectProductionEndAfter(uint64_t size
)
186 const uint64_t expectedSize
= thePutSize
+ size
;
188 Must(bodySize() == expectedSize
);
190 theBodySize
= expectedSize
;
194 BodyPipe::clearProducer(bool atEof
)
196 if (theProducer
.set()) {
197 debugs(91,7, HERE
<< "clearing BodyPipe producer" << status());
200 if (!bodySizeKnown())
201 theBodySize
= thePutSize
;
202 else if (bodySize() != thePutSize
)
203 debugs(91,3, HERE
<< "aborting on premature eof" << status());
205 // asserta that we can detect the abort if the consumer joins later
206 assert(!bodySizeKnown() || bodySize() != thePutSize
);
208 scheduleBodyEndNotification();
213 BodyPipe::putMoreData(const char *aBuffer
, size_t size
)
216 size
= min((uint64_t)size
, unproducedSize());
218 const size_t spaceSize
= static_cast<size_t>(theBuf
.potentialSpaceSize());
219 if ((size
= min(size
, spaceSize
))) {
220 theBuf
.append(aBuffer
, size
);
228 BodyPipe::setConsumerIfNotLate(const Consumer::Pointer
&aConsumer
)
230 assert(!theConsumer
);
231 assert(aConsumer
.set()); // but might be invalid
233 // TODO: convert this into an exception and remove IfNotLate suffix
234 // If there is something consumed already, we are in an auto-consuming mode
235 // and it is too late to attach a real consumer to the pipe.
236 if (theGetSize
> 0) {
237 assert(mustAutoConsume
);
241 Must(!abortedConsumption
); // did not promise to never consume
243 theConsumer
= aConsumer
;
244 debugs(91,7, HERE
<< "set consumer" << status());
245 if (theBuf
.hasContent())
246 scheduleBodyDataNotification();
248 scheduleBodyEndNotification();
254 BodyPipe::clearConsumer()
256 if (theConsumer
.set()) {
257 debugs(91,7, HERE
<< "clearing consumer" << status());
259 // do not abort if we have not consumed so that HTTP or ICAP can retry
260 // benign xaction failures due to persistent connection race conditions
262 expectNoConsumption();
267 BodyPipe::expectNoConsumption()
269 // We may be called multiple times because multiple jobs on the consumption
270 // chain may realize that there will be no more setConsumer() calls (e.g.,
271 // consuming code and retrying code). It is both difficult and not really
272 // necessary for them to coordinate their expectNoConsumption() calls.
274 // As a consequence, we may be called when we are auto-consuming already.
276 if (!abortedConsumption
&& !exhausted()) {
277 // Before we abort, any regular consumption should be over and auto
278 // consumption must not be started.
281 AsyncCall::Pointer call
= asyncCall(91, 7,
282 "BodyProducer::noteBodyConsumerAborted",
283 BodyProducerDialer(theProducer
,
284 &BodyProducer::noteBodyConsumerAborted
, this));
285 ScheduleCallHere(call
);
286 abortedConsumption
= true;
288 // in case somebody enabled auto-consumption before regular one aborted
290 startAutoConsumption();
295 BodyPipe::getMoreData(MemBuf
&aMemBuffer
)
297 if (!theBuf
.hasContent())
298 return 0; // did not touch the possibly uninitialized buf
300 if (aMemBuffer
.isNull())
302 const size_t size
= min(theBuf
.contentSize(), aMemBuffer
.potentialSpaceSize());
303 aMemBuffer
.append(theBuf
.content(), size
);
304 theBuf
.consume(size
);
306 return size
; // cannot be zero if we called buf.init above
310 BodyPipe::consume(size_t size
)
312 theBuf
.consume(size
);
316 // In the AutoConsumption mode the consumer has gone but the producer continues
317 // producing data. We are using a BodySink BodyConsumer which just discards the produced data.
319 BodyPipe::enableAutoConsumption()
321 mustAutoConsume
= true;
322 debugs(91,5, HERE
<< "enabled auto consumption" << status());
323 if (!theConsumer
&& theBuf
.hasContent())
324 startAutoConsumption();
327 // start auto consumption by creating body sink
329 BodyPipe::startAutoConsumption()
331 Must(mustAutoConsume
);
333 theConsumer
= new BodySink(this);
334 debugs(91,7, HERE
<< "starting auto consumption" << status());
335 scheduleBodyDataNotification();
341 assert(!isCheckedOut
);
347 BodyPipe::checkIn(Checkout
&checkout
)
349 assert(isCheckedOut
);
350 isCheckedOut
= false;
351 const size_t currentSize
= theBuf
.contentSize();
352 if (checkout
.checkedOutSize
> currentSize
)
353 postConsume(checkout
.checkedOutSize
- currentSize
);
354 else if (checkout
.checkedOutSize
< currentSize
)
355 postAppend(currentSize
- checkout
.checkedOutSize
);
359 BodyPipe::undoCheckOut(Checkout
&checkout
)
361 assert(isCheckedOut
);
362 const size_t currentSize
= theBuf
.contentSize();
363 // We can only undo if size did not change, and even that carries
364 // some risk. If this becomes a problem, the code checking out
365 // raw buffers should always check them in (possibly unchanged)
366 // instead of relying on the automated undo mechanism of Checkout.
367 // The code can always use a temporary buffer to accomplish that.
368 Must(checkout
.checkedOutSize
== currentSize
);
371 // TODO: Optimize: inform consumer/producer about more data/space only if
372 // they used the data/space since we notified them last time.
375 BodyPipe::postConsume(size_t size
)
377 assert(!isCheckedOut
);
379 debugs(91,7, HERE
<< "consumed " << size
<< " bytes" << status());
380 if (mayNeedMoreData()) {
381 AsyncCall::Pointer call
= asyncCall(91, 7,
382 "BodyProducer::noteMoreBodySpaceAvailable",
383 BodyProducerDialer(theProducer
,
384 &BodyProducer::noteMoreBodySpaceAvailable
, this));
385 ScheduleCallHere(call
);
390 BodyPipe::postAppend(size_t size
)
392 assert(!isCheckedOut
);
394 debugs(91,7, HERE
<< "added " << size
<< " bytes" << status());
396 if (mustAutoConsume
&& !theConsumer
&& size
> 0)
397 startAutoConsumption();
399 // We should not consume here even if mustAutoConsume because the
400 // caller may not be ready for the data to be consumed during this call.
401 scheduleBodyDataNotification();
403 if (!mayNeedMoreData())
404 clearProducer(true); // reached end-of-body
408 BodyPipe::scheduleBodyDataNotification()
410 if (theConsumer
.valid()) { // TODO: allow asyncCall() to check this instead
411 AsyncCall::Pointer call
= asyncCall(91, 7,
412 "BodyConsumer::noteMoreBodyDataAvailable",
413 BodyConsumerDialer(theConsumer
,
414 &BodyConsumer::noteMoreBodyDataAvailable
, this));
415 ScheduleCallHere(call
);
420 BodyPipe::scheduleBodyEndNotification()
422 if (theConsumer
.valid()) { // TODO: allow asyncCall() to check this instead
423 if (bodySizeKnown() && bodySize() == thePutSize
) {
424 AsyncCall::Pointer call
= asyncCall(91, 7,
425 "BodyConsumer::noteBodyProductionEnded",
426 BodyConsumerDialer(theConsumer
,
427 &BodyConsumer::noteBodyProductionEnded
, this));
428 ScheduleCallHere(call
);
430 AsyncCall::Pointer call
= asyncCall(91, 7,
431 "BodyConsumer::noteBodyProducerAborted",
432 BodyConsumerDialer(theConsumer
,
433 &BodyConsumer::noteBodyProducerAborted
, this));
434 ScheduleCallHere(call
);
439 // a short temporary string describing buffer status for debugging
440 const char *BodyPipe::status() const
442 static MemBuf outputBuffer
;
443 outputBuffer
.reset();
445 outputBuffer
.append(" [", 2);
447 outputBuffer
.appendf("%" PRIu64
"<=%" PRIu64
, theGetSize
, thePutSize
);
448 if (theBodySize
>= 0)
449 outputBuffer
.appendf("<=%" PRId64
, theBodySize
);
451 outputBuffer
.append("<=?", 3);
453 outputBuffer
.appendf(" %" PRId64
"+%" PRId64
, static_cast<int64_t>(theBuf
.contentSize()), static_cast<int64_t>(theBuf
.spaceSize()));
455 outputBuffer
.appendf(" pipe%p", this);
456 if (theProducer
.set())
457 outputBuffer
.appendf(" prod%p", theProducer
.get());
458 if (theConsumer
.set())
459 outputBuffer
.appendf(" cons%p", theConsumer
.get());
462 outputBuffer
.append(" A", 2);
463 if (abortedConsumption
)
464 outputBuffer
.append(" !C", 3);
466 outputBuffer
.append(" L", 2); // Locked
468 outputBuffer
.append("]", 1);
470 outputBuffer
.terminate();
472 return outputBuffer
.content();
475 /* BodyPipeCheckout */
477 BodyPipeCheckout::BodyPipeCheckout(BodyPipe
&aPipe
): thePipe(aPipe
),
478 buf(aPipe
.checkOut()), offset(aPipe
.consumedSize()),
479 checkedOutSize(buf
.contentSize()), checkedIn(false)
483 BodyPipeCheckout::~BodyPipeCheckout()
486 // Do not pipe.undoCheckOut(*this) because it asserts or throws
487 // TODO: consider implementing the long-term solution discussed at
488 // http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html
489 debugs(91,2, HERE
<< "Warning: cannot undo BodyPipeCheckout");
490 thePipe
.checkIn(*this);
495 BodyPipeCheckout::checkIn()
498 thePipe
.checkIn(*this);
502 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout
&c
): thePipe(c
.thePipe
),
503 buf(c
.buf
), offset(c
.offset
), checkedOutSize(c
.checkedOutSize
),
504 checkedIn(c
.checkedIn
)
506 assert(false); // prevent copying
510 BodyPipeCheckout::operator =(const BodyPipeCheckout
&)
512 assert(false); // prevent assignment