5 CBDATA_CLASS_INIT(BodyPipe
);
7 // inform the pipe that we are done and clear the Pointer
8 void BodyProducer::stopProducingFor(RefCount
<BodyPipe
> &pipe
, bool atEof
)
10 debugs(91,7, HERE
<< this << " will not produce for " << pipe
<<
11 "; atEof: " << atEof
);
12 assert(pipe
!= NULL
); // be strict: the caller state may depend on this
13 pipe
->clearProducer(atEof
);
17 // inform the pipe that we are done and clear the Pointer
18 void BodyConsumer::stopConsumingFrom(RefCount
<BodyPipe
> &pipe
)
20 debugs(91,7, HERE
<< this << " will not consume from " << pipe
);
21 assert(pipe
!= NULL
); // be strict: the caller state may depend on this
22 pipe
->clearConsumer();
28 BodyPipe::BodyPipe(Producer
*aProducer
): theBodySize(-1),
29 theProducer(aProducer
), theConsumer(0),
30 thePutSize(0), theGetSize(0), theCCallsPending(0), theCCallsToSkip(0),
31 mustAutoConsume(false), isCheckedOut(false)
33 // TODO: teach MemBuf to start with zero minSize
34 // TODO: limit maxSize by theBodySize, when known?
35 theBuf
.init(2*1024, MaxCapacity
);
36 debugs(91,7, HERE
<< "created BodyPipe" << status());
41 debugs(91,7, HERE
<< "destroying BodyPipe" << status());
47 void BodyPipe::setBodySize(uint64_t aBodySize
)
49 assert(!bodySizeKnown());
50 assert(aBodySize
>= 0);
51 assert(thePutSize
<= aBodySize
);
53 // If this assert fails, we need to add code to check for eof and inform
54 // the consumer about the eof condition via scheduleBodyEndNotification,
55 // because just setting a body size limit may trigger the eof condition.
58 theBodySize
= aBodySize
;
59 debugs(91,7, HERE
<< "set body size" << status());
62 uint64_t BodyPipe::bodySize() const
64 assert(bodySizeKnown());
65 return static_cast<uint64_t>(theBodySize
);
68 bool BodyPipe::expectMoreAfter(uint64_t offset
) const
70 assert(theGetSize
<= offset
);
71 return offset
< thePutSize
|| // buffer has more now or
72 (!productionEnded() && mayNeedMoreData()); // buffer will have more
75 bool BodyPipe::exhausted() const
77 return !expectMoreAfter(theGetSize
);
80 size_t BodyPipe::unproducedSize() const
82 return bodySize() - thePutSize
; // bodySize() asserts that size is known
86 BodyPipe::clearProducer(bool atEof
)
89 debugs(91,7, HERE
<< "clearing BodyPipe producer" << status());
93 theBodySize
= thePutSize
;
95 if (bodySize() != thePutSize
)
96 debugs(91,1, HERE
<< "aborting on premature eof" << status());
98 // asserta that we can detect the abort if the consumer joins later
99 assert(!bodySizeKnown() || bodySize() != thePutSize
);
101 scheduleBodyEndNotification();
106 BodyPipe::putMoreData(const char *buf
, size_t size
)
109 size
= XMIN(size
, unproducedSize());
111 const size_t spaceSize
= static_cast<size_t>(theBuf
.potentialSpaceSize());
112 if ((size
= XMIN(size
, spaceSize
))) {
113 theBuf
.append(buf
, size
);
121 BodyPipe::setConsumerIfNotLate(Consumer
*aConsumer
)
123 assert(!theConsumer
);
126 // TODO: convert this into an exception and remove IfNotLate suffix
127 // If there is something consumed already, we are in an auto-consuming mode
128 // and it is too late to attach a real consumer to the pipe.
129 if (theGetSize
> 0) {
130 assert(mustAutoConsume
);
134 theConsumer
= aConsumer
;
135 debugs(91,7, HERE
<< "set consumer" << status());
136 if (theBuf
.hasContent())
137 scheduleBodyDataNotification();
139 scheduleBodyEndNotification();
144 // When BodyPipe consumer is gone, all events for that consumer must not
145 // reach the new consumer (if any). Otherwise, the calls may go out of order
146 // (if _some_ calls are dropped due to the ultimate destination being
147 // temporary NULL). The code keeps track of the number of outstanding
148 // events and skips that number if consumer leaves. TODO: when AscyncCall
149 // support is improved, should we just schedule calls directly to consumer?
151 BodyPipe::clearConsumer() {
153 debugs(91,7, HERE
<< "clearing consumer" << status());
155 theCCallsToSkip
= theCCallsPending
; // skip all pending consumer calls
156 if (consumedSize() && !exhausted())
157 AsyncCall(91,5, this, BodyPipe::tellBodyConsumerAborted
);
162 BodyPipe::getMoreData(MemBuf
&buf
)
164 if (!theBuf
.hasContent())
165 return 0; // did not touch the possibly uninitialized buf
169 const size_t size
= XMIN(theBuf
.contentSize(), buf
.potentialSpaceSize());
170 buf
.append(theBuf
.content(), size
);
171 theBuf
.consume(size
);
173 return size
; // cannot be zero if we called buf.init above
177 BodyPipe::consume(size_t size
)
179 theBuf
.consume(size
);
184 BodyPipe::enableAutoConsumption() {
185 mustAutoConsume
= true;
186 debugs(91,5, HERE
<< "enabled auto consumption" << status());
187 if (!theConsumer
&& theBuf
.hasContent())
188 scheduleBodyDataNotification();
192 BodyPipe::checkOut() {
193 assert(!isCheckedOut
);
199 BodyPipe::checkIn(Checkout
&checkout
)
201 assert(isCheckedOut
);
202 isCheckedOut
= false;
203 const size_t currentSize
= theBuf
.contentSize();
204 if (checkout
.checkedOutSize
> currentSize
)
205 postConsume(checkout
.checkedOutSize
- currentSize
);
207 if (checkout
.checkedOutSize
< currentSize
)
208 postAppend(currentSize
- checkout
.checkedOutSize
);
212 BodyPipe::undoCheckOut(Checkout
&checkout
)
214 assert(isCheckedOut
);
215 const size_t currentSize
= theBuf
.contentSize();
216 // We can only undo if size did not change, and even that carries
217 // some risk. If this becomes a problem, the code checking out
218 // raw buffers should always check them in (possibly unchanged)
219 // instead of relying on the automated undo mechanism of Checkout.
220 // The code can always use a temporary buffer to accomplish that.
221 assert(checkout
.checkedOutSize
== currentSize
);
224 // TODO: Optimize: inform consumer/producer about more data/space only if
225 // they used the data/space since we notified them last time.
228 BodyPipe::postConsume(size_t size
) {
229 assert(!isCheckedOut
);
231 debugs(91,7, HERE
<< "consumed " << size
<< " bytes" << status());
232 if (mayNeedMoreData())
233 AsyncCall(91,5, this, BodyPipe::tellMoreBodySpaceAvailable
);
237 BodyPipe::postAppend(size_t size
) {
238 assert(!isCheckedOut
);
240 debugs(91,7, HERE
<< "added " << size
<< " bytes" << status());
242 // We should not consume here even if mustAutoConsume because the
243 // caller may not be ready for the data to be consumed during this call.
244 scheduleBodyDataNotification();
246 if (!mayNeedMoreData())
247 clearProducer(true); // reached end-of-body
252 BodyPipe::scheduleBodyDataNotification()
254 if (theConsumer
|| mustAutoConsume
) {
256 AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable
);
261 BodyPipe::scheduleBodyEndNotification()
265 if (bodySizeKnown() && bodySize() == thePutSize
)
266 AsyncCall(91,5, this, BodyPipe::tellBodyProductionEnded
);
268 AsyncCall(91,5, this, BodyPipe::tellBodyProducerAborted
);
272 void BodyPipe::tellMoreBodySpaceAvailable()
274 if (theProducer
!= NULL
)
275 theProducer
->noteMoreBodySpaceAvailable(*this);
278 void BodyPipe::tellBodyConsumerAborted()
280 if (theProducer
!= NULL
)
281 theProducer
->noteBodyConsumerAborted(*this);
284 void BodyPipe::tellMoreBodyDataAvailable()
289 if (theConsumer
!= NULL
)
290 theConsumer
->noteMoreBodyDataAvailable(*this);
292 if (mustAutoConsume
&& theBuf
.hasContent())
293 consume(theBuf
.contentSize());
296 void BodyPipe::tellBodyProductionEnded()
301 if (theConsumer
!= NULL
)
302 theConsumer
->noteBodyProductionEnded(*this);
305 void BodyPipe::tellBodyProducerAborted()
310 if (theConsumer
!= NULL
)
311 theConsumer
->noteBodyProducerAborted(*this);
314 // skips calls destined for the previous consumer; see BodyPipe::clearConsumer
315 bool BodyPipe::skipCCall()
317 assert(theCCallsPending
> 0);
320 if (theCCallsToSkip
<= 0)
324 debugs(91,5, HERE
<< "skipped call");
328 // a short temporary string describing buffer status for debugging
329 const char *BodyPipe::status() const
336 buf
.Printf("%"PRIu64
"<=%"PRIu64
, theGetSize
, thePutSize
);
337 if (theBodySize
>= 0)
338 buf
.Printf("<=%"PRId64
, theBodySize
);
340 buf
.append("<=?", 3);
342 buf
.Printf(" %d+%d", (int)theBuf
.contentSize(), (int)theBuf
.spaceSize());
344 buf
.Printf(" pipe%p", this);
346 buf
.Printf(" prod%p", theProducer
);
348 buf
.Printf(" cons%p", theConsumer
);
353 buf
.append(" L", 2); // Locked
359 return buf
.content();
363 /* BodyPipeCheckout */
365 BodyPipeCheckout::BodyPipeCheckout(BodyPipe
&aPipe
): pipe(aPipe
),
366 buf(aPipe
.checkOut()), offset(aPipe
.consumedSize()),
367 checkedOutSize(buf
.contentSize()), checkedIn(false)
371 BodyPipeCheckout::~BodyPipeCheckout()
374 pipe
.undoCheckOut(*this);
378 BodyPipeCheckout::checkIn()
386 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout
&c
): pipe(c
.pipe
),
387 buf(c
.buf
), offset(c
.offset
), checkedOutSize(c
.checkedOutSize
),
388 checkedIn(c
.checkedIn
)
390 assert(false); // prevent copying
394 BodyPipeCheckout::operator =(const BodyPipeCheckout
&)
396 assert(false); // prevent assignment