5 CBDATA_CLASS_INIT(BodyPipe
);
7 // inform the pipe that we are done and clear the Pointer
8 void BodyProducer::stopProducingFor(RefCount
<BodyPipe
> &pipe
, bool atEof
)
10 debugs(91,7, HERE
<< this << " will not produce for " << pipe
<<
11 "; atEof: " << atEof
);
12 assert(pipe
!= NULL
); // be strict: the caller state may depend on this
13 pipe
->clearProducer(atEof
);
17 // inform the pipe that we are done and clear the Pointer
18 void BodyConsumer::stopConsumingFrom(RefCount
<BodyPipe
> &pipe
)
20 debugs(91,7, HERE
<< this << " will not consume from " << pipe
);
21 assert(pipe
!= NULL
); // be strict: the caller state may depend on this
22 pipe
->clearConsumer();
28 BodyPipe::BodyPipe(Producer
*aProducer
): theBodySize(-1),
29 theProducer(aProducer
), theConsumer(0),
30 thePutSize(0), theGetSize(0), mustAutoConsume(false), isCheckedOut(false)
32 // TODO: teach MemBuf to start with zero minSize
33 // TODO: limit maxSize by theBodySize, when known?
34 theBuf
.init(2*1024, MaxCapacity
);
35 debugs(91,7, HERE
<< "created BodyPipe" << status());
40 debugs(91,7, HERE
<< "destroying BodyPipe" << status());
46 void BodyPipe::setBodySize(size_t aBodySize
)
48 assert(!bodySizeKnown());
49 assert(aBodySize
>= 0);
50 assert(thePutSize
<= aBodySize
);
52 // If this assert fails, we need to add code to check for eof and inform
53 // the consumer about the eof condition via scheduleBodyEndNotification,
54 // because just setting a body size limit may trigger the eof condition.
57 theBodySize
= aBodySize
;
58 debugs(91,7, HERE
<< "set body size" << status());
61 size_t BodyPipe::bodySize() const
63 assert(bodySizeKnown());
64 return static_cast<size_t>(theBodySize
);
67 bool BodyPipe::expectMoreAfter(size_t offset
) const
69 assert(theGetSize
<= offset
);
70 return offset
< thePutSize
|| // buffer has more now or
71 (!productionEnded() && mayNeedMoreData()); // buffer will have more
74 bool BodyPipe::exhausted() const
76 return !expectMoreAfter(theGetSize
);
79 size_t BodyPipe::unproducedSize() const
81 return bodySize() - thePutSize
; // bodySize() asserts that size is known
85 BodyPipe::clearProducer(bool atEof
)
88 debugs(91,7, HERE
<< "clearing BodyPipe producer" << status());
92 theBodySize
= thePutSize
;
94 if (bodySize() != thePutSize
)
95 debugs(91,1, HERE
<< "aborting on premature eof" << status());
97 // asserta that we can detect the abort if the consumer joins later
98 assert(!bodySizeKnown() || bodySize() != thePutSize
);
100 scheduleBodyEndNotification();
105 BodyPipe::putMoreData(const char *buf
, size_t size
)
108 size
= XMIN(size
, unproducedSize());
110 const size_t spaceSize
= static_cast<size_t>(theBuf
.potentialSpaceSize());
111 if ((size
= XMIN(size
, spaceSize
))) {
112 theBuf
.append(buf
, size
);
120 BodyPipe::setConsumerIfNotLate(Consumer
*aConsumer
)
122 assert(!theConsumer
);
125 // TODO: convert this into an exception and remove IfNotLate suffix
126 // If there is something consumed already, we are in an auto-consuming mode
127 // and it is too late to attach a real consumer to the pipe.
128 if (theGetSize
> 0) {
129 assert(mustAutoConsume
);
133 theConsumer
= aConsumer
;
134 debugs(91,7, HERE
<< "set consumer" << status());
135 if (theBuf
.hasContent())
136 AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable
);
138 scheduleBodyEndNotification();
144 BodyPipe::clearConsumer() {
146 debugs(91,7, HERE
<< "clearing consumer" << status());
149 AsyncCall(91,5, this, BodyPipe::tellBodyConsumerAborted
);
154 BodyPipe::getMoreData(MemBuf
&buf
)
156 if (!theBuf
.hasContent())
157 return 0; // did not touch the possibly uninitialized buf
161 const size_t size
= XMIN(theBuf
.contentSize(), buf
.potentialSpaceSize());
162 buf
.append(theBuf
.content(), size
);
163 theBuf
.consume(size
);
165 return size
; // cannot be zero if we called buf.init above
169 BodyPipe::consume(size_t size
)
171 theBuf
.consume(size
);
176 BodyPipe::enableAutoConsumption() {
177 mustAutoConsume
= true;
178 debugs(91,5, HERE
<< "enabled auto consumption" << status());
179 if (!theConsumer
&& theBuf
.hasContent())
180 AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable
);
184 BodyPipe::checkOut() {
185 assert(!isCheckedOut
);
191 BodyPipe::checkIn(Checkout
&checkout
)
193 assert(isCheckedOut
);
194 isCheckedOut
= false;
195 const size_t currentSize
= theBuf
.contentSize();
196 if (checkout
.checkedOutSize
> currentSize
)
197 postConsume(checkout
.checkedOutSize
- currentSize
);
199 if (checkout
.checkedOutSize
< currentSize
)
200 postAppend(currentSize
- checkout
.checkedOutSize
);
204 BodyPipe::undoCheckOut(Checkout
&checkout
)
206 assert(isCheckedOut
);
207 const size_t currentSize
= theBuf
.contentSize();
208 // We can only undo if size did not change, and even that carries
209 // some risk. If this becomes a problem, the code checking out
210 // raw buffers should always check them in (possibly unchanged)
211 // instead of relying on the automated undo mechanism of Checkout.
212 // The code can always use a temporary buffer to accomplish that.
213 assert(checkout
.checkedOutSize
== currentSize
);
216 // TODO: Optimize: inform consumer/producer about more data/space only if
217 // they used the data/space since we notified them last time.
220 BodyPipe::postConsume(size_t size
) {
221 assert(!isCheckedOut
);
223 debugs(91,7, HERE
<< "consumed " << size
<< " bytes" << status());
224 if (mayNeedMoreData())
225 AsyncCall(91,5, this, BodyPipe::tellMoreBodySpaceAvailable
);
229 BodyPipe::postAppend(size_t size
) {
230 assert(!isCheckedOut
);
232 debugs(91,7, HERE
<< "added " << size
<< " bytes" << status());
234 // We should not consume here even if mustAutoConsume because the
235 // caller may not be ready for the data to be consumed during this call.
236 AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable
);
238 if (!mayNeedMoreData())
239 clearProducer(true); // reached end-of-body
244 BodyPipe::scheduleBodyEndNotification()
246 if (bodySizeKnown() && bodySize() == thePutSize
)
247 AsyncCall(91,5, this, BodyPipe::tellBodyProductionEnded
);
249 AsyncCall(91,5, this, BodyPipe::tellBodyProducerAborted
);
252 void BodyPipe::tellMoreBodySpaceAvailable()
254 if (theProducer
!= NULL
)
255 theProducer
->noteMoreBodySpaceAvailable(*this);
258 void BodyPipe::tellBodyConsumerAborted()
260 if (theProducer
!= NULL
)
261 theProducer
->noteBodyConsumerAborted(*this);
264 void BodyPipe::tellMoreBodyDataAvailable()
266 if (theConsumer
!= NULL
)
267 theConsumer
->noteMoreBodyDataAvailable(*this);
269 if (mustAutoConsume
&& theBuf
.hasContent())
270 consume(theBuf
.contentSize());
273 void BodyPipe::tellBodyProductionEnded()
275 if (theConsumer
!= NULL
)
276 theConsumer
->noteBodyProductionEnded(*this);
279 void BodyPipe::tellBodyProducerAborted()
281 if (theConsumer
!= NULL
)
282 theConsumer
->noteBodyProducerAborted(*this);
285 // a short temporary string describing buffer status for debugging
286 const char *BodyPipe::status() const
293 buf
.Printf("%d<=%d", (int)theGetSize
, (int)thePutSize
);
294 if (theBodySize
>= 0)
295 buf
.Printf("<=%d", (int)theBodySize
);
297 buf
.append("<=?", 3);
299 buf
.Printf(" %d+%d", (int)theBuf
.contentSize(), (int)theBuf
.spaceSize());
301 buf
.Printf(" pipe%p", this);
303 buf
.Printf(" prod%p", theProducer
);
305 buf
.Printf(" cons%p", theConsumer
);
310 buf
.append(" L", 2); // Locked
316 return buf
.content();
320 /* BodyPipeCheckout */
322 BodyPipeCheckout::BodyPipeCheckout(BodyPipe
&aPipe
): pipe(aPipe
),
323 buf(aPipe
.checkOut()), offset(aPipe
.consumedSize()),
324 checkedOutSize(buf
.contentSize()), checkedIn(false)
328 BodyPipeCheckout::~BodyPipeCheckout()
331 pipe
.undoCheckOut(*this);
335 BodyPipeCheckout::checkIn()
343 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout
&c
): pipe(c
.pipe
),
344 buf(c
.buf
), offset(c
.offset
), checkedOutSize(c
.checkedOutSize
),
345 checkedIn(c
.checkedIn
)
347 assert(false); // prevent copying
351 BodyPipeCheckout::operator =(const BodyPipeCheckout
&)
353 assert(false); // prevent assignment