]> git.ipfire.org Git - thirdparty/squid.git/blame - src/BodyPipe.cc
Author: wessels & Christos Tsantilas
[thirdparty/squid.git] / src / BodyPipe.cc
CommitLineData
1b39caaa 1
2#include "squid.h"
3#include "BodyPipe.h"
4
5CBDATA_CLASS_INIT(BodyPipe);
6
7// inform the pipe that we are done and clear the Pointer
8void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
9{
10 debugs(91,7, HERE << this << " will not produce for " << pipe <<
11 "; atEof: " << atEof);
12 assert(pipe != NULL); // be strict: the caller state may depend on this
13 pipe->clearProducer(atEof);
14 pipe = NULL;
15}
16
17// inform the pipe that we are done and clear the Pointer
18void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
19{
20 debugs(91,7, HERE << this << " will not consume from " << pipe);
21 assert(pipe != NULL); // be strict: the caller state may depend on this
22 pipe->clearConsumer();
23 pipe = NULL;
24}
25
26/* BodyPipe */
27
28BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
29 theProducer(aProducer), theConsumer(0),
6c56baf6 30 thePutSize(0), theGetSize(0), theCCallsPending(0), theCCallsToSkip(0),
31 mustAutoConsume(false), isCheckedOut(false)
1b39caaa 32{
33 // TODO: teach MemBuf to start with zero minSize
34 // TODO: limit maxSize by theBodySize, when known?
35 theBuf.init(2*1024, MaxCapacity);
36 debugs(91,7, HERE << "created BodyPipe" << status());
37}
38
39BodyPipe::~BodyPipe()
40{
41 debugs(91,7, HERE << "destroying BodyPipe" << status());
42 assert(!theProducer);
43 assert(!theConsumer);
44 theBuf.clean();
45}
46
47f6e231 47void BodyPipe::setBodySize(uint64_t aBodySize)
1b39caaa 48{
49 assert(!bodySizeKnown());
50 assert(aBodySize >= 0);
51 assert(thePutSize <= aBodySize);
52
53 // If this assert fails, we need to add code to check for eof and inform
54 // the consumer about the eof condition via scheduleBodyEndNotification,
55 // because just setting a body size limit may trigger the eof condition.
56 assert(!theConsumer);
57
58 theBodySize = aBodySize;
59 debugs(91,7, HERE << "set body size" << status());
60}
61
47f6e231 62uint64_t BodyPipe::bodySize() const
1b39caaa 63{
64 assert(bodySizeKnown());
47f6e231 65 return static_cast<uint64_t>(theBodySize);
1b39caaa 66}
67
47f6e231 68bool BodyPipe::expectMoreAfter(uint64_t offset) const
1b39caaa 69{
70 assert(theGetSize <= offset);
71 return offset < thePutSize || // buffer has more now or
72 (!productionEnded() && mayNeedMoreData()); // buffer will have more
73}
74
75bool BodyPipe::exhausted() const
76{
77 return !expectMoreAfter(theGetSize);
78}
79
80size_t BodyPipe::unproducedSize() const
81{
82 return bodySize() - thePutSize; // bodySize() asserts that size is known
83}
84
85void
86BodyPipe::clearProducer(bool atEof)
87{
88 if (theProducer) {
89 debugs(91,7, HERE << "clearing BodyPipe producer" << status());
90 theProducer = NULL;
91 if (atEof) {
92 if (!bodySizeKnown())
93 theBodySize = thePutSize;
94 else
95 if (bodySize() != thePutSize)
96 debugs(91,1, HERE << "aborting on premature eof" << status());
97 } else {
98 // asserta that we can detect the abort if the consumer joins later
99 assert(!bodySizeKnown() || bodySize() != thePutSize);
100 }
101 scheduleBodyEndNotification();
102 }
103}
104
105size_t
106BodyPipe::putMoreData(const char *buf, size_t size)
107{
108 if (bodySizeKnown())
109 size = XMIN(size, unproducedSize());
110
111 const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
112 if ((size = XMIN(size, spaceSize))) {
113 theBuf.append(buf, size);
114 postAppend(size);
115 return size;
116 }
117 return 0;
118}
119
120bool
121BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
122{
123 assert(!theConsumer);
124 assert(aConsumer);
125
126 // TODO: convert this into an exception and remove IfNotLate suffix
127 // If there is something consumed already, we are in an auto-consuming mode
128 // and it is too late to attach a real consumer to the pipe.
129 if (theGetSize > 0) {
130 assert(mustAutoConsume);
131 return false;
132 }
133
134 theConsumer = aConsumer;
135 debugs(91,7, HERE << "set consumer" << status());
136 if (theBuf.hasContent())
6c56baf6 137 scheduleBodyDataNotification();
1b39caaa 138 if (!theProducer)
139 scheduleBodyEndNotification();
140
141 return true;
142}
143
6c56baf6 144// When BodyPipe consumer is gone, all events for that consumer must not
145// reach the new consumer (if any). Otherwise, the calls may go out of order
146// (if _some_ calls are dropped due to the ultimate destination being
147// temporary NULL). The code keeps track of the number of outstanding
148// events and skips that number if consumer leaves. TODO: when AscyncCall
149// support is improved, should we just schedule calls directly to consumer?
1b39caaa 150void
151BodyPipe::clearConsumer() {
152 if (theConsumer) {
153 debugs(91,7, HERE << "clearing consumer" << status());
154 theConsumer = NULL;
6c56baf6 155 theCCallsToSkip = theCCallsPending; // skip all pending consumer calls
e71477f3 156 if (consumedSize() && !exhausted())
1b39caaa 157 AsyncCall(91,5, this, BodyPipe::tellBodyConsumerAborted);
158 }
159}
160
161size_t
162BodyPipe::getMoreData(MemBuf &buf)
163{
164 if (!theBuf.hasContent())
165 return 0; // did not touch the possibly uninitialized buf
166
167 if (buf.isNull())
168 buf.init();
169 const size_t size = XMIN(theBuf.contentSize(), buf.potentialSpaceSize());
170 buf.append(theBuf.content(), size);
171 theBuf.consume(size);
172 postConsume(size);
173 return size; // cannot be zero if we called buf.init above
174}
175
176void
177BodyPipe::consume(size_t size)
178{
179 theBuf.consume(size);
180 postConsume(size);
181}
182
183void
184BodyPipe::enableAutoConsumption() {
185 mustAutoConsume = true;
186 debugs(91,5, HERE << "enabled auto consumption" << status());
187 if (!theConsumer && theBuf.hasContent())
6c56baf6 188 scheduleBodyDataNotification();
1b39caaa 189}
190
191MemBuf &
192BodyPipe::checkOut() {
193 assert(!isCheckedOut);
194 isCheckedOut = true;
195 return theBuf;
196}
197
198void
199BodyPipe::checkIn(Checkout &checkout)
200{
201 assert(isCheckedOut);
202 isCheckedOut = false;
203 const size_t currentSize = theBuf.contentSize();
204 if (checkout.checkedOutSize > currentSize)
205 postConsume(checkout.checkedOutSize - currentSize);
206 else
207 if (checkout.checkedOutSize < currentSize)
208 postAppend(currentSize - checkout.checkedOutSize);
209}
210
211void
212BodyPipe::undoCheckOut(Checkout &checkout)
213{
214 assert(isCheckedOut);
215 const size_t currentSize = theBuf.contentSize();
216 // We can only undo if size did not change, and even that carries
217 // some risk. If this becomes a problem, the code checking out
218 // raw buffers should always check them in (possibly unchanged)
219 // instead of relying on the automated undo mechanism of Checkout.
220 // The code can always use a temporary buffer to accomplish that.
221 assert(checkout.checkedOutSize == currentSize);
222}
223
224// TODO: Optimize: inform consumer/producer about more data/space only if
225// they used the data/space since we notified them last time.
226
227void
228BodyPipe::postConsume(size_t size) {
229 assert(!isCheckedOut);
230 theGetSize += size;
231 debugs(91,7, HERE << "consumed " << size << " bytes" << status());
232 if (mayNeedMoreData())
233 AsyncCall(91,5, this, BodyPipe::tellMoreBodySpaceAvailable);
234}
235
236void
237BodyPipe::postAppend(size_t size) {
238 assert(!isCheckedOut);
239 thePutSize += size;
240 debugs(91,7, HERE << "added " << size << " bytes" << status());
241
242 // We should not consume here even if mustAutoConsume because the
243 // caller may not be ready for the data to be consumed during this call.
6c56baf6 244 scheduleBodyDataNotification();
1b39caaa 245
246 if (!mayNeedMoreData())
247 clearProducer(true); // reached end-of-body
248}
249
250
6c56baf6 251void
252BodyPipe::scheduleBodyDataNotification()
253{
254 if (theConsumer || mustAutoConsume) {
255 ++theCCallsPending;
256 AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable);
257 }
258}
259
1b39caaa 260void
261BodyPipe::scheduleBodyEndNotification()
262{
6c56baf6 263 if (theConsumer) {
264 ++theCCallsPending;
265 if (bodySizeKnown() && bodySize() == thePutSize)
266 AsyncCall(91,5, this, BodyPipe::tellBodyProductionEnded);
267 else
268 AsyncCall(91,5, this, BodyPipe::tellBodyProducerAborted);
269 }
1b39caaa 270}
271
272void BodyPipe::tellMoreBodySpaceAvailable()
273{
274 if (theProducer != NULL)
275 theProducer->noteMoreBodySpaceAvailable(*this);
276}
277
278void BodyPipe::tellBodyConsumerAborted()
279{
280 if (theProducer != NULL)
281 theProducer->noteBodyConsumerAborted(*this);
282}
283
284void BodyPipe::tellMoreBodyDataAvailable()
285{
6c56baf6 286 if (skipCCall())
287 return;
288
1b39caaa 289 if (theConsumer != NULL)
290 theConsumer->noteMoreBodyDataAvailable(*this);
291 else
292 if (mustAutoConsume && theBuf.hasContent())
293 consume(theBuf.contentSize());
294}
295
296void BodyPipe::tellBodyProductionEnded()
297{
6c56baf6 298 if (skipCCall())
299 return;
300
1b39caaa 301 if (theConsumer != NULL)
302 theConsumer->noteBodyProductionEnded(*this);
303}
304
305void BodyPipe::tellBodyProducerAborted()
306{
6c56baf6 307 if (skipCCall())
308 return;
309
1b39caaa 310 if (theConsumer != NULL)
311 theConsumer->noteBodyProducerAborted(*this);
312}
313
6c56baf6 314// skips calls destined for the previous consumer; see BodyPipe::clearConsumer
315bool BodyPipe::skipCCall()
316{
317 assert(theCCallsPending > 0);
318 --theCCallsPending;
319
320 if (theCCallsToSkip <= 0)
321 return false;
322
323 --theCCallsToSkip;
324 debugs(91,5, HERE << "skipped call");
325 return true;
326}
327
1b39caaa 328// a short temporary string describing buffer status for debugging
329const char *BodyPipe::status() const
330{
331 static MemBuf buf;
332 buf.reset();
333
334 buf.append(" [", 2);
335
47f6e231 336 buf.Printf("%"PRIu64"<=%"PRIu64, theGetSize, thePutSize);
1b39caaa 337 if (theBodySize >= 0)
47f6e231 338 buf.Printf("<=%"PRId64, theBodySize);
1b39caaa 339 else
340 buf.append("<=?", 3);
341
342 buf.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
343
344 buf.Printf(" pipe%p", this);
345 if (theProducer)
346 buf.Printf(" prod%p", theProducer);
347 if (theConsumer)
348 buf.Printf(" cons%p", theConsumer);
349
350 if (mustAutoConsume)
351 buf.append(" A", 2);
352 if (isCheckedOut)
353 buf.append(" L", 2); // Locked
354
355 buf.append("]", 1);
356
357 buf.terminate();
358
359 return buf.content();
360}
361
362
363/* BodyPipeCheckout */
364
365BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe),
366 buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
367 checkedOutSize(buf.contentSize()), checkedIn(false)
368{
369}
370
371BodyPipeCheckout::~BodyPipeCheckout()
372{
373 if (!checkedIn)
374 pipe.undoCheckOut(*this);
375}
376
377void
378BodyPipeCheckout::checkIn()
379{
380 assert(!checkedIn);
381 pipe.checkIn(*this);
382 checkedIn = true;
383}
384
385
386BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe),
387 buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
388 checkedIn(c.checkedIn)
389{
390 assert(false); // prevent copying
391}
392
393BodyPipeCheckout &
394BodyPipeCheckout::operator =(const BodyPipeCheckout &)
395{
396 assert(false); // prevent assignment
397 return *this;
398}