]> git.ipfire.org Git - thirdparty/squid.git/blob - src/BodyPipe.cc
Author: Tsantilas Christos <chtsanti@users.sourceforge.net>
[thirdparty/squid.git] / src / BodyPipe.cc
1
2 #include "squid.h"
3 #include "BodyPipe.h"
4
5 CBDATA_CLASS_INIT(BodyPipe);
6
7 // inform the pipe that we are done and clear the Pointer
8 void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
9 {
10 debugs(91,7, HERE << this << " will not produce for " << pipe <<
11 "; atEof: " << atEof);
12 assert(pipe != NULL); // be strict: the caller state may depend on this
13 pipe->clearProducer(atEof);
14 pipe = NULL;
15 }
16
17 // inform the pipe that we are done and clear the Pointer
18 void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
19 {
20 debugs(91,7, HERE << this << " will not consume from " << pipe);
21 assert(pipe != NULL); // be strict: the caller state may depend on this
22 pipe->clearConsumer();
23 pipe = NULL;
24 }
25
26 /* BodyPipe */
27
28 BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
29 theProducer(aProducer), theConsumer(0),
30 thePutSize(0), theGetSize(0), theCCallsPending(0), theCCallsToSkip(0),
31 mustAutoConsume(false), isCheckedOut(false)
32 {
33 // TODO: teach MemBuf to start with zero minSize
34 // TODO: limit maxSize by theBodySize, when known?
35 theBuf.init(2*1024, MaxCapacity);
36 debugs(91,7, HERE << "created BodyPipe" << status());
37 }
38
39 BodyPipe::~BodyPipe()
40 {
41 debugs(91,7, HERE << "destroying BodyPipe" << status());
42 assert(!theProducer);
43 assert(!theConsumer);
44 theBuf.clean();
45 }
46
47 void BodyPipe::setBodySize(uint64_t aBodySize)
48 {
49 assert(!bodySizeKnown());
50 assert(aBodySize >= 0);
51 assert(thePutSize <= aBodySize);
52
53 // If this assert fails, we need to add code to check for eof and inform
54 // the consumer about the eof condition via scheduleBodyEndNotification,
55 // because just setting a body size limit may trigger the eof condition.
56 assert(!theConsumer);
57
58 theBodySize = aBodySize;
59 debugs(91,7, HERE << "set body size" << status());
60 }
61
62 uint64_t BodyPipe::bodySize() const
63 {
64 assert(bodySizeKnown());
65 return static_cast<uint64_t>(theBodySize);
66 }
67
68 bool BodyPipe::expectMoreAfter(uint64_t offset) const
69 {
70 assert(theGetSize <= offset);
71 return offset < thePutSize || // buffer has more now or
72 (!productionEnded() && mayNeedMoreData()); // buffer will have more
73 }
74
75 bool BodyPipe::exhausted() const
76 {
77 return !expectMoreAfter(theGetSize);
78 }
79
80 uint64_t BodyPipe::unproducedSize() const
81 {
82 return bodySize() - thePutSize; // bodySize() asserts that size is known
83 }
84
85 void
86 BodyPipe::clearProducer(bool atEof)
87 {
88 if (theProducer) {
89 debugs(91,7, HERE << "clearing BodyPipe producer" << status());
90 theProducer = NULL;
91 if (atEof) {
92 if (!bodySizeKnown())
93 theBodySize = thePutSize;
94 else
95 if (bodySize() != thePutSize)
96 debugs(91,3, HERE << "aborting on premature eof" << status());
97 } else {
98 // asserta that we can detect the abort if the consumer joins later
99 assert(!bodySizeKnown() || bodySize() != thePutSize);
100 }
101 scheduleBodyEndNotification();
102 }
103 }
104
105 size_t
106 BodyPipe::putMoreData(const char *buf, size_t size)
107 {
108 if (bodySizeKnown())
109 size = XMIN((uint64_t)size, unproducedSize());
110
111 const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
112 if ((size = XMIN(size, spaceSize))) {
113 theBuf.append(buf, size);
114 postAppend(size);
115 return size;
116 }
117 return 0;
118 }
119
120 bool
121 BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
122 {
123 assert(!theConsumer);
124 assert(aConsumer);
125
126 // TODO: convert this into an exception and remove IfNotLate suffix
127 // If there is something consumed already, we are in an auto-consuming mode
128 // and it is too late to attach a real consumer to the pipe.
129 if (theGetSize > 0) {
130 assert(mustAutoConsume);
131 return false;
132 }
133
134 theConsumer = aConsumer;
135 debugs(91,7, HERE << "set consumer" << status());
136 if (theBuf.hasContent())
137 scheduleBodyDataNotification();
138 if (!theProducer)
139 scheduleBodyEndNotification();
140
141 return true;
142 }
143
144 // When BodyPipe consumer is gone, all events for that consumer must not
145 // reach the new consumer (if any). Otherwise, the calls may go out of order
146 // (if _some_ calls are dropped due to the ultimate destination being
147 // temporary NULL). The code keeps track of the number of outstanding
148 // events and skips that number if consumer leaves. TODO: when AscyncCall
149 // support is improved, should we just schedule calls directly to consumer?
150 void
151 BodyPipe::clearConsumer() {
152 if (theConsumer) {
153 debugs(91,7, HERE << "clearing consumer" << status());
154 theConsumer = NULL;
155 theCCallsToSkip = theCCallsPending; // skip all pending consumer calls
156 if (consumedSize() && !exhausted())
157 AsyncCall(91,5, this, BodyPipe::tellBodyConsumerAborted);
158 }
159 }
160
161 size_t
162 BodyPipe::getMoreData(MemBuf &buf)
163 {
164 if (!theBuf.hasContent())
165 return 0; // did not touch the possibly uninitialized buf
166
167 if (buf.isNull())
168 buf.init();
169 const size_t size = XMIN(theBuf.contentSize(), buf.potentialSpaceSize());
170 buf.append(theBuf.content(), size);
171 theBuf.consume(size);
172 postConsume(size);
173 return size; // cannot be zero if we called buf.init above
174 }
175
176 void
177 BodyPipe::consume(size_t size)
178 {
179 theBuf.consume(size);
180 postConsume(size);
181 }
182
183 void
184 BodyPipe::enableAutoConsumption() {
185 mustAutoConsume = true;
186 debugs(91,5, HERE << "enabled auto consumption" << status());
187 if (!theConsumer && theBuf.hasContent())
188 scheduleBodyDataNotification();
189 }
190
191 MemBuf &
192 BodyPipe::checkOut() {
193 assert(!isCheckedOut);
194 isCheckedOut = true;
195 return theBuf;
196 }
197
198 void
199 BodyPipe::checkIn(Checkout &checkout)
200 {
201 assert(isCheckedOut);
202 isCheckedOut = false;
203 const size_t currentSize = theBuf.contentSize();
204 if (checkout.checkedOutSize > currentSize)
205 postConsume(checkout.checkedOutSize - currentSize);
206 else
207 if (checkout.checkedOutSize < currentSize)
208 postAppend(currentSize - checkout.checkedOutSize);
209 }
210
211 void
212 BodyPipe::undoCheckOut(Checkout &checkout)
213 {
214 assert(isCheckedOut);
215 const size_t currentSize = theBuf.contentSize();
216 // We can only undo if size did not change, and even that carries
217 // some risk. If this becomes a problem, the code checking out
218 // raw buffers should always check them in (possibly unchanged)
219 // instead of relying on the automated undo mechanism of Checkout.
220 // The code can always use a temporary buffer to accomplish that.
221 assert(checkout.checkedOutSize == currentSize);
222 }
223
224 // TODO: Optimize: inform consumer/producer about more data/space only if
225 // they used the data/space since we notified them last time.
226
227 void
228 BodyPipe::postConsume(size_t size) {
229 assert(!isCheckedOut);
230 theGetSize += size;
231 debugs(91,7, HERE << "consumed " << size << " bytes" << status());
232 if (mayNeedMoreData())
233 AsyncCall(91,5, this, BodyPipe::tellMoreBodySpaceAvailable);
234 }
235
236 void
237 BodyPipe::postAppend(size_t size) {
238 assert(!isCheckedOut);
239 thePutSize += size;
240 debugs(91,7, HERE << "added " << size << " bytes" << status());
241
242 // We should not consume here even if mustAutoConsume because the
243 // caller may not be ready for the data to be consumed during this call.
244 scheduleBodyDataNotification();
245
246 if (!mayNeedMoreData())
247 clearProducer(true); // reached end-of-body
248 }
249
250
251 void
252 BodyPipe::scheduleBodyDataNotification()
253 {
254 if (theConsumer || mustAutoConsume) {
255 ++theCCallsPending;
256 AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable);
257 }
258 }
259
260 void
261 BodyPipe::scheduleBodyEndNotification()
262 {
263 if (theConsumer) {
264 ++theCCallsPending;
265 if (bodySizeKnown() && bodySize() == thePutSize)
266 AsyncCall(91,5, this, BodyPipe::tellBodyProductionEnded);
267 else
268 AsyncCall(91,5, this, BodyPipe::tellBodyProducerAborted);
269 }
270 }
271
272 void BodyPipe::tellMoreBodySpaceAvailable()
273 {
274 if (theProducer != NULL)
275 theProducer->noteMoreBodySpaceAvailable(*this);
276 }
277
278 void BodyPipe::tellBodyConsumerAborted()
279 {
280 if (theProducer != NULL)
281 theProducer->noteBodyConsumerAborted(*this);
282 }
283
284 void BodyPipe::tellMoreBodyDataAvailable()
285 {
286 if (skipCCall())
287 return;
288
289 if (theConsumer != NULL)
290 theConsumer->noteMoreBodyDataAvailable(*this);
291 else
292 if (mustAutoConsume && theBuf.hasContent())
293 consume(theBuf.contentSize());
294 }
295
296 void BodyPipe::tellBodyProductionEnded()
297 {
298 if (skipCCall())
299 return;
300
301 if (theConsumer != NULL)
302 theConsumer->noteBodyProductionEnded(*this);
303 }
304
305 void BodyPipe::tellBodyProducerAborted()
306 {
307 if (skipCCall())
308 return;
309
310 if (theConsumer != NULL)
311 theConsumer->noteBodyProducerAborted(*this);
312 }
313
314 // skips calls destined for the previous consumer; see BodyPipe::clearConsumer
315 bool BodyPipe::skipCCall()
316 {
317 assert(theCCallsPending > 0);
318 --theCCallsPending;
319
320 if (theCCallsToSkip <= 0)
321 return false;
322
323 --theCCallsToSkip;
324 debugs(91,5, HERE << "skipped call");
325 return true;
326 }
327
328 // a short temporary string describing buffer status for debugging
329 const char *BodyPipe::status() const
330 {
331 static MemBuf buf;
332 buf.reset();
333
334 buf.append(" [", 2);
335
336 buf.Printf("%"PRIu64"<=%"PRIu64, theGetSize, thePutSize);
337 if (theBodySize >= 0)
338 buf.Printf("<=%"PRId64, theBodySize);
339 else
340 buf.append("<=?", 3);
341
342 buf.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
343
344 buf.Printf(" pipe%p", this);
345 if (theProducer)
346 buf.Printf(" prod%p", theProducer);
347 if (theConsumer)
348 buf.Printf(" cons%p", theConsumer);
349
350 if (mustAutoConsume)
351 buf.append(" A", 2);
352 if (isCheckedOut)
353 buf.append(" L", 2); // Locked
354
355 buf.append("]", 1);
356
357 buf.terminate();
358
359 return buf.content();
360 }
361
362
363 /* BodyPipeCheckout */
364
365 BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe),
366 buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
367 checkedOutSize(buf.contentSize()), checkedIn(false)
368 {
369 }
370
371 BodyPipeCheckout::~BodyPipeCheckout()
372 {
373 if (!checkedIn)
374 pipe.undoCheckOut(*this);
375 }
376
377 void
378 BodyPipeCheckout::checkIn()
379 {
380 assert(!checkedIn);
381 pipe.checkIn(*this);
382 checkedIn = true;
383 }
384
385
386 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe),
387 buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
388 checkedIn(c.checkedIn)
389 {
390 assert(false); // prevent copying
391 }
392
393 BodyPipeCheckout &
394 BodyPipeCheckout::operator =(const BodyPipeCheckout &)
395 {
396 assert(false); // prevent assignment
397 return *this;
398 }