2 * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
10 #include "base/AsyncJobCalls.h"
11 #include "base/TextException.h"
14 // BodySink is a BodyConsumer class which just consume and drops
15 // data from a BodyPipe
16 class BodySink
: public BodyConsumer
18 CBDATA_CLASS(BodySink
);
21 BodySink(const BodyPipe::Pointer
&bp
): AsyncJob("BodySink"), body_pipe(bp
) {}
22 virtual ~BodySink() { assert(!body_pipe
); }
24 virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp
) {
25 size_t contentSize
= bp
->buf().contentSize();
26 bp
->consume(contentSize
);
28 virtual void noteBodyProductionEnded(BodyPipe::Pointer
) {
29 stopConsumingFrom(body_pipe
);
31 virtual void noteBodyProducerAborted(BodyPipe::Pointer
) {
32 stopConsumingFrom(body_pipe
);
34 bool doneAll() const {return !body_pipe
&& AsyncJob::doneAll();}
37 BodyPipe::Pointer body_pipe
; ///< the pipe we are consuming from
40 CBDATA_CLASS_INIT(BodySink
);
42 // The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
43 // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
44 // the BodyPipe passed as argument
45 class BodyProducerDialer
: public UnaryMemFunT
<BodyProducer
, BodyPipe::Pointer
>
48 typedef UnaryMemFunT
<BodyProducer
, BodyPipe::Pointer
> Parent
;
50 BodyProducerDialer(const BodyProducer::Pointer
&aProducer
,
51 Parent::Method aHandler
, BodyPipe::Pointer bp
):
52 Parent(aProducer
, aHandler
, bp
) {}
54 virtual bool canDial(AsyncCall
&call
);
57 // The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
58 // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
59 // of the BodyPipe passed as argument
60 class BodyConsumerDialer
: public UnaryMemFunT
<BodyConsumer
, BodyPipe::Pointer
>
63 typedef UnaryMemFunT
<BodyConsumer
, BodyPipe::Pointer
> Parent
;
65 BodyConsumerDialer(const BodyConsumer::Pointer
&aConsumer
,
66 Parent::Method aHandler
, BodyPipe::Pointer bp
):
67 Parent(aConsumer
, aHandler
, bp
) {}
69 virtual bool canDial(AsyncCall
&call
);
73 BodyProducerDialer::canDial(AsyncCall
&call
)
75 if (!Parent::canDial(call
))
78 const BodyProducer::Pointer
&producer
= job
;
79 BodyPipe::Pointer aPipe
= arg1
;
80 if (!aPipe
->stillProducing(producer
)) {
81 debugs(call
.debugSection
, call
.debugLevel
, producer
<< " no longer producing for " << aPipe
->status());
82 return call
.cancel("no longer producing");
89 BodyConsumerDialer::canDial(AsyncCall
&call
)
91 if (!Parent::canDial(call
))
94 const BodyConsumer::Pointer
&consumer
= job
;
95 BodyPipe::Pointer aPipe
= arg1
;
96 if (!aPipe
->stillConsuming(consumer
)) {
97 debugs(call
.debugSection
, call
.debugLevel
, consumer
<< " no longer consuming from " << aPipe
->status());
98 return call
.cancel("no longer consuming");
106 // inform the pipe that we are done and clear the Pointer
107 void BodyProducer::stopProducingFor(RefCount
<BodyPipe
> &p
, bool atEof
)
109 debugs(91,7, this << " will not produce for " << p
<< "; atEof: " << atEof
);
110 assert(p
!= NULL
); // be strict: the caller state may depend on this
111 p
->clearProducer(atEof
);
117 // inform the pipe that we are done and clear the Pointer
118 void BodyConsumer::stopConsumingFrom(RefCount
<BodyPipe
> &p
)
120 debugs(91,7, this << " will not consume from " << p
);
121 assert(p
!= NULL
); // be strict: the caller state may depend on this
128 BodyPipe::BodyPipe(Producer
*aProducer
): theBodySize(-1),
129 theProducer(aProducer
), theConsumer(0),
130 thePutSize(0), theGetSize(0),
131 mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
133 // TODO: teach MemBuf to start with zero minSize
134 // TODO: limit maxSize by theBodySize, when known?
135 theBuf
.init(2*1024, MaxCapacity
);
136 debugs(91,7, "created BodyPipe" << status());
139 BodyPipe::~BodyPipe()
141 debugs(91,7, "destroying BodyPipe" << status());
142 assert(!theProducer
);
143 assert(!theConsumer
);
147 void BodyPipe::setBodySize(uint64_t aBodySize
)
149 assert(!bodySizeKnown());
150 assert(thePutSize
<= aBodySize
);
152 // If this assert fails, we need to add code to check for eof and inform
153 // the consumer about the eof condition via scheduleBodyEndNotification,
154 // because just setting a body size limit may trigger the eof condition.
155 assert(!theConsumer
);
157 theBodySize
= aBodySize
;
158 debugs(91,7, "set body size" << status());
161 uint64_t BodyPipe::bodySize() const
163 assert(bodySizeKnown());
164 return static_cast<uint64_t>(theBodySize
);
167 bool BodyPipe::expectMoreAfter(uint64_t offset
) const
169 assert(theGetSize
<= offset
);
170 return offset
< thePutSize
|| // buffer has more now or
171 (!productionEnded() && mayNeedMoreData()); // buffer will have more
174 bool BodyPipe::exhausted() const
176 return !expectMoreAfter(theGetSize
);
179 uint64_t BodyPipe::unproducedSize() const
181 return bodySize() - thePutSize
; // bodySize() asserts that size is known
184 void BodyPipe::expectProductionEndAfter(uint64_t size
)
186 const uint64_t expectedSize
= thePutSize
+ size
;
188 Must(bodySize() == expectedSize
);
190 theBodySize
= expectedSize
;
194 BodyPipe::clearProducer(bool atEof
)
196 if (theProducer
.set()) {
197 debugs(91,7, "clearing BodyPipe producer" << status());
200 if (!bodySizeKnown())
201 theBodySize
= thePutSize
;
202 else if (bodySize() != thePutSize
)
203 debugs(91,3, "aborting on premature eof" << status());
205 // asserta that we can detect the abort if the consumer joins later
206 assert(!bodySizeKnown() || bodySize() != thePutSize
);
208 scheduleBodyEndNotification();
213 BodyPipe::putMoreData(const char *aBuffer
, size_t size
)
216 size
= min((uint64_t)size
, unproducedSize());
218 const size_t spaceSize
= static_cast<size_t>(theBuf
.potentialSpaceSize());
219 if ((size
= min(size
, spaceSize
))) {
220 theBuf
.append(aBuffer
, size
);
228 BodyPipe::setConsumerIfNotLate(const Consumer::Pointer
&aConsumer
)
230 assert(!theConsumer
);
231 assert(aConsumer
.set()); // but might be invalid
233 // TODO: convert this into an exception and remove IfNotLate suffix
234 // If there is something consumed already, we are in an auto-consuming mode
235 // and it is too late to attach a real consumer to the pipe.
236 if (theGetSize
> 0) {
237 assert(mustAutoConsume
);
241 Must(!abortedConsumption
); // did not promise to never consume
243 theConsumer
= aConsumer
;
244 debugs(91,7, "set consumer" << status());
245 if (theBuf
.hasContent())
246 scheduleBodyDataNotification();
248 scheduleBodyEndNotification();
254 BodyPipe::clearConsumer()
256 if (theConsumer
.set()) {
257 debugs(91,7, "clearing consumer" << status());
259 // do not abort if we have not consumed so that HTTP or ICAP can retry
260 // benign xaction failures due to persistent connection race conditions
262 expectNoConsumption();
267 BodyPipe::expectNoConsumption()
269 // We may be called multiple times because multiple jobs on the consumption
270 // chain may realize that there will be no more setConsumer() calls (e.g.,
271 // consuming code and retrying code). It is both difficult and not really
272 // necessary for them to coordinate their expectNoConsumption() calls.
274 // As a consequence, we may be called when we are auto-consuming already.
276 if (!abortedConsumption
&& !exhausted()) {
277 // Before we abort, any regular consumption should be over and auto
278 // consumption must not be started.
281 AsyncCall::Pointer call
= asyncCall(91, 7,
282 "BodyProducer::noteBodyConsumerAborted",
283 BodyProducerDialer(theProducer
,
284 &BodyProducer::noteBodyConsumerAborted
, this));
285 ScheduleCallHere(call
);
286 abortedConsumption
= true;
288 // in case somebody enabled auto-consumption before regular one aborted
289 startAutoConsumptionIfNeeded();
294 BodyPipe::getMoreData(MemBuf
&aMemBuffer
)
296 if (!theBuf
.hasContent())
297 return 0; // did not touch the possibly uninitialized buf
299 if (aMemBuffer
.isNull())
301 const size_t size
= min(theBuf
.contentSize(), aMemBuffer
.potentialSpaceSize());
302 aMemBuffer
.append(theBuf
.content(), size
);
303 theBuf
.consume(size
);
305 return size
; // cannot be zero if we called buf.init above
309 BodyPipe::consume(size_t size
)
311 theBuf
.consume(size
);
316 BodyPipe::enableAutoConsumption()
318 mustAutoConsume
= true;
319 debugs(91,5, "enabled auto consumption" << status());
320 startAutoConsumptionIfNeeded();
323 /// Check the current need and, if needed, start auto consumption. In auto
324 /// consumption mode, the consumer is gone, but the producer continues to
325 /// produce data. We use a BodySink BodyConsumer to discard that data.
327 BodyPipe::startAutoConsumptionIfNeeded()
329 const auto startNow
=
330 mustAutoConsume
&& // was enabled
331 !theConsumer
&& // has not started yet
332 theProducer
.valid() && // still useful (and will eventually stop)
333 theBuf
.hasContent(); // has something to consume right now
337 theConsumer
= new BodySink(this);
338 AsyncJob::Start(theConsumer
);
339 debugs(91,7, "starting auto consumption" << status());
340 scheduleBodyDataNotification();
346 assert(!isCheckedOut
);
352 BodyPipe::checkIn(Checkout
&checkout
)
354 assert(isCheckedOut
);
355 isCheckedOut
= false;
356 const size_t currentSize
= theBuf
.contentSize();
357 if (checkout
.checkedOutSize
> currentSize
)
358 postConsume(checkout
.checkedOutSize
- currentSize
);
359 else if (checkout
.checkedOutSize
< currentSize
)
360 postAppend(currentSize
- checkout
.checkedOutSize
);
364 BodyPipe::undoCheckOut(Checkout
&checkout
)
366 assert(isCheckedOut
);
367 const size_t currentSize
= theBuf
.contentSize();
368 // We can only undo if size did not change, and even that carries
369 // some risk. If this becomes a problem, the code checking out
370 // raw buffers should always check them in (possibly unchanged)
371 // instead of relying on the automated undo mechanism of Checkout.
372 // The code can always use a temporary buffer to accomplish that.
373 Must(checkout
.checkedOutSize
== currentSize
);
376 // TODO: Optimize: inform consumer/producer about more data/space only if
377 // they used the data/space since we notified them last time.
380 BodyPipe::postConsume(size_t size
)
382 assert(!isCheckedOut
);
384 debugs(91,7, "consumed " << size
<< " bytes" << status());
385 if (mayNeedMoreData()) {
386 AsyncCall::Pointer call
= asyncCall(91, 7,
387 "BodyProducer::noteMoreBodySpaceAvailable",
388 BodyProducerDialer(theProducer
,
389 &BodyProducer::noteMoreBodySpaceAvailable
, this));
390 ScheduleCallHere(call
);
395 BodyPipe::postAppend(size_t size
)
397 assert(!isCheckedOut
);
399 debugs(91,7, "added " << size
<< " bytes" << status());
401 // We should not consume here even if mustAutoConsume because the
402 // caller may not be ready for the data to be consumed during this call.
403 scheduleBodyDataNotification();
405 // Do this check after scheduleBodyDataNotification() to ensure the
406 // natural order of "more body data" and "production ended" events.
407 if (!mayNeedMoreData())
408 clearProducer(true); // reached end-of-body
410 startAutoConsumptionIfNeeded();
414 BodyPipe::scheduleBodyDataNotification()
416 if (theConsumer
.valid()) { // TODO: allow asyncCall() to check this instead
417 AsyncCall::Pointer call
= asyncCall(91, 7,
418 "BodyConsumer::noteMoreBodyDataAvailable",
419 BodyConsumerDialer(theConsumer
,
420 &BodyConsumer::noteMoreBodyDataAvailable
, this));
421 ScheduleCallHere(call
);
426 BodyPipe::scheduleBodyEndNotification()
428 if (theConsumer
.valid()) { // TODO: allow asyncCall() to check this instead
429 if (bodySizeKnown() && bodySize() == thePutSize
) {
430 AsyncCall::Pointer call
= asyncCall(91, 7,
431 "BodyConsumer::noteBodyProductionEnded",
432 BodyConsumerDialer(theConsumer
,
433 &BodyConsumer::noteBodyProductionEnded
, this));
434 ScheduleCallHere(call
);
436 AsyncCall::Pointer call
= asyncCall(91, 7,
437 "BodyConsumer::noteBodyProducerAborted",
438 BodyConsumerDialer(theConsumer
,
439 &BodyConsumer::noteBodyProducerAborted
, this));
440 ScheduleCallHere(call
);
445 // a short temporary string describing buffer status for debugging
446 const char *BodyPipe::status() const
448 static MemBuf outputBuffer
;
449 outputBuffer
.reset();
451 outputBuffer
.append(" [", 2);
453 outputBuffer
.appendf("%" PRIu64
"<=%" PRIu64
, theGetSize
, thePutSize
);
454 if (theBodySize
>= 0)
455 outputBuffer
.appendf("<=%" PRId64
, theBodySize
);
457 outputBuffer
.append("<=?", 3);
459 outputBuffer
.appendf(" %" PRId64
"+%" PRId64
, static_cast<int64_t>(theBuf
.contentSize()), static_cast<int64_t>(theBuf
.spaceSize()));
461 outputBuffer
.appendf(" pipe%p", this);
462 if (theProducer
.set())
463 outputBuffer
.appendf(" prod%p", theProducer
.get());
464 if (theConsumer
.set())
465 outputBuffer
.appendf(" cons%p", theConsumer
.get());
468 outputBuffer
.append(" A", 2);
469 if (abortedConsumption
)
470 outputBuffer
.append(" !C", 3);
472 outputBuffer
.append(" L", 2); // Locked
474 outputBuffer
.append("]", 1);
476 outputBuffer
.terminate();
478 return outputBuffer
.content();
481 /* BodyPipeCheckout */
483 BodyPipeCheckout::BodyPipeCheckout(BodyPipe
&aPipe
): thePipe(aPipe
),
484 buf(aPipe
.checkOut()), offset(aPipe
.consumedSize()),
485 checkedOutSize(buf
.contentSize()), checkedIn(false)
489 BodyPipeCheckout::~BodyPipeCheckout()
492 // Do not pipe.undoCheckOut(*this) because it asserts or throws
493 // TODO: consider implementing the long-term solution discussed at
494 // http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html
495 debugs(91,2, "Warning: cannot undo BodyPipeCheckout");
496 thePipe
.checkIn(*this);
501 BodyPipeCheckout::checkIn()
504 thePipe
.checkIn(*this);
508 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout
&c
): thePipe(c
.thePipe
),
509 buf(c
.buf
), offset(c
.offset
), checkedOutSize(c
.checkedOutSize
),
510 checkedIn(c
.checkedIn
)
512 assert(false); // prevent copying
516 BodyPipeCheckout::operator =(const BodyPipeCheckout
&)
518 assert(false); // prevent assignment