]> git.ipfire.org Git - thirdparty/squid.git/blob - src/BodyPipe.cc
Author: Tsantilas Christos <chtsanti@users.sourceforge.net> , Alex Rousskov <rousskov...
[thirdparty/squid.git] / src / BodyPipe.cc
1
2 #include "squid.h"
3 #include "base/TextException.h"
4 #include "BodyPipe.h"
5
6 CBDATA_CLASS_INIT(BodyPipe);
7
8 // BodySink is a BodyConsumer class which just consume and drops
9 // data from a BodyPipe
10 class BodySink: public BodyConsumer
11 {
12 bool done;
13 public:
14 BodySink():AsyncJob("BodySink"), done(false) {}
15 virtual ~BodySink() {}
16
17 virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) {
18 size_t contentSize = bp->buf().contentSize();
19 bp->consume(contentSize);
20 }
21 virtual void noteBodyProductionEnded(BodyPipe::Pointer bp) {
22 stopConsumingFrom(bp);
23 done = true;
24 }
25 virtual void noteBodyProducerAborted(BodyPipe::Pointer bp) {
26 stopConsumingFrom(bp);
27 done = true;
28 }
29 bool doneAll() const {return done && AsyncJob::doneAll();}
30 CBDATA_CLASS2(BodySink);
31 };
32
33 CBDATA_CLASS_INIT(BodySink);
34
35 // The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
36 // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
37 // the BodyPipe passed as argument
38 class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
39 {
40 public:
41 typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
42
43 BodyProducerDialer(BodyProducer *aProducer, Parent::Method aHandler,
44 BodyPipe::Pointer bp): Parent(aProducer, aHandler, bp) {}
45
46 virtual bool canDial(AsyncCall &call);
47 };
48
49 // The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
50 // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
51 // of the BodyPipe passed as argument
52 class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
53 {
54 public:
55 typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
56
57 BodyConsumerDialer(BodyConsumer *aConsumer, Parent::Method aHandler,
58 BodyPipe::Pointer bp): Parent(aConsumer, aHandler, bp) {}
59
60 virtual bool canDial(AsyncCall &call);
61 };
62
63 bool
64 BodyProducerDialer::canDial(AsyncCall &call)
65 {
66 if (!Parent::canDial(call))
67 return false;
68
69 BodyProducer *producer = object;
70 BodyPipe::Pointer pipe = arg1;
71 if (!pipe->stillProducing(producer)) {
72 debugs(call.debugSection, call.debugLevel, HERE << producer <<
73 " no longer producing for " << pipe->status());
74 return call.cancel("no longer producing");
75 }
76
77 return true;
78 }
79
80 bool
81 BodyConsumerDialer::canDial(AsyncCall &call)
82 {
83 if (!Parent::canDial(call))
84 return false;
85
86 BodyConsumer *consumer = object;
87 BodyPipe::Pointer pipe = arg1;
88 if (!pipe->stillConsuming(consumer)) {
89 debugs(call.debugSection, call.debugLevel, HERE << consumer <<
90 " no longer consuming from " << pipe->status());
91 return call.cancel("no longer consuming");
92 }
93
94 return true;
95 }
96
97
98 /* BodyProducer */
99
100 // inform the pipe that we are done and clear the Pointer
101 void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
102 {
103 debugs(91,7, HERE << this << " will not produce for " << pipe <<
104 "; atEof: " << atEof);
105 assert(pipe != NULL); // be strict: the caller state may depend on this
106 pipe->clearProducer(atEof);
107 pipe = NULL;
108 }
109
110
111
112 /* BodyConsumer */
113
114 // inform the pipe that we are done and clear the Pointer
115 void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
116 {
117 debugs(91,7, HERE << this << " will not consume from " << pipe);
118 assert(pipe != NULL); // be strict: the caller state may depend on this
119 pipe->clearConsumer();
120 pipe = NULL;
121 }
122
123
124 /* BodyPipe */
125
126 BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
127 theProducer(aProducer), theConsumer(0),
128 thePutSize(0), theGetSize(0),
129 mustAutoConsume(false), isCheckedOut(false)
130 {
131 // TODO: teach MemBuf to start with zero minSize
132 // TODO: limit maxSize by theBodySize, when known?
133 theBuf.init(2*1024, MaxCapacity);
134 debugs(91,7, HERE << "created BodyPipe" << status());
135 }
136
137 BodyPipe::~BodyPipe()
138 {
139 debugs(91,7, HERE << "destroying BodyPipe" << status());
140 assert(!theProducer);
141 assert(!theConsumer);
142 theBuf.clean();
143 }
144
145 void BodyPipe::setBodySize(uint64_t aBodySize)
146 {
147 assert(!bodySizeKnown());
148 assert(aBodySize >= 0);
149 assert(thePutSize <= aBodySize);
150
151 // If this assert fails, we need to add code to check for eof and inform
152 // the consumer about the eof condition via scheduleBodyEndNotification,
153 // because just setting a body size limit may trigger the eof condition.
154 assert(!theConsumer);
155
156 theBodySize = aBodySize;
157 debugs(91,7, HERE << "set body size" << status());
158 }
159
160 uint64_t BodyPipe::bodySize() const
161 {
162 assert(bodySizeKnown());
163 return static_cast<uint64_t>(theBodySize);
164 }
165
166 bool BodyPipe::expectMoreAfter(uint64_t offset) const
167 {
168 assert(theGetSize <= offset);
169 return offset < thePutSize || // buffer has more now or
170 (!productionEnded() && mayNeedMoreData()); // buffer will have more
171 }
172
173 bool BodyPipe::exhausted() const
174 {
175 return !expectMoreAfter(theGetSize);
176 }
177
178 uint64_t BodyPipe::unproducedSize() const
179 {
180 return bodySize() - thePutSize; // bodySize() asserts that size is known
181 }
182
183 void BodyPipe::expectProductionEndAfter(uint64_t size)
184 {
185 const uint64_t expectedSize = thePutSize + size;
186 if (bodySizeKnown())
187 Must(bodySize() == expectedSize);
188 else
189 theBodySize = expectedSize;
190 }
191
192 void
193 BodyPipe::clearProducer(bool atEof)
194 {
195 if (theProducer) {
196 debugs(91,7, HERE << "clearing BodyPipe producer" << status());
197 theProducer = NULL;
198 if (atEof) {
199 if (!bodySizeKnown())
200 theBodySize = thePutSize;
201 else if (bodySize() != thePutSize)
202 debugs(91,3, HERE << "aborting on premature eof" << status());
203 } else {
204 // asserta that we can detect the abort if the consumer joins later
205 assert(!bodySizeKnown() || bodySize() != thePutSize);
206 }
207 scheduleBodyEndNotification();
208 }
209 }
210
211 size_t
212 BodyPipe::putMoreData(const char *aBuffer, size_t size)
213 {
214 if (bodySizeKnown())
215 size = min((uint64_t)size, unproducedSize());
216
217 const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
218 if ((size = min(size, spaceSize))) {
219 theBuf.append(aBuffer, size);
220 postAppend(size);
221 return size;
222 }
223 return 0;
224 }
225
226 bool
227 BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
228 {
229 assert(!theConsumer);
230 assert(aConsumer);
231
232 // TODO: convert this into an exception and remove IfNotLate suffix
233 // If there is something consumed already, we are in an auto-consuming mode
234 // and it is too late to attach a real consumer to the pipe.
235 if (theGetSize > 0) {
236 assert(mustAutoConsume);
237 return false;
238 }
239
240 theConsumer = aConsumer;
241 debugs(91,7, HERE << "set consumer" << status());
242 if (theBuf.hasContent())
243 scheduleBodyDataNotification();
244 if (!theProducer)
245 scheduleBodyEndNotification();
246
247 return true;
248 }
249
250 // When BodyPipe consumer is gone, all events for that consumer must not
251 // reach the new consumer (if any). Otherwise, the calls may go out of order
252 // (if _some_ calls are dropped due to the ultimate destination being
253 // temporary NULL). The code keeps track of the number of outstanding
254 // events and skips that number if consumer leaves. TODO: when AscyncCall
255 // support is improved, should we just schedule calls directly to consumer?
256 void
257 BodyPipe::clearConsumer()
258 {
259 if (theConsumer) {
260 debugs(91,7, HERE << "clearing consumer" << status());
261 theConsumer = NULL;
262 if (consumedSize() && !exhausted()) {
263 AsyncCall::Pointer call= asyncCall(91, 7,
264 "BodyProducer::noteBodyConsumerAborted",
265 BodyProducerDialer(theProducer,
266 &BodyProducer::noteBodyConsumerAborted, this));
267 ScheduleCallHere(call);
268 }
269 }
270 }
271
272 size_t
273 BodyPipe::getMoreData(MemBuf &aMemBuffer)
274 {
275 if (!theBuf.hasContent())
276 return 0; // did not touch the possibly uninitialized buf
277
278 if (aMemBuffer.isNull())
279 aMemBuffer.init();
280 const size_t size = min(theBuf.contentSize(), aMemBuffer.potentialSpaceSize());
281 aMemBuffer.append(theBuf.content(), size);
282 theBuf.consume(size);
283 postConsume(size);
284 return size; // cannot be zero if we called buf.init above
285 }
286
287 void
288 BodyPipe::consume(size_t size)
289 {
290 theBuf.consume(size);
291 postConsume(size);
292 }
293
294 // In the AutoConsumption mode the consumer has gone but the producer continues
295 // producing data. We are using a BodySink BodyConsumer which just discards the produced data.
296 void
297 BodyPipe::enableAutoConsumption()
298 {
299 mustAutoConsume = true;
300 debugs(91,5, HERE << "enabled auto consumption" << status());
301 if (!theConsumer && theBuf.hasContent())
302 startAutoConsumption();
303 }
304
305 // start auto consumption by creating body sink
306 void
307 BodyPipe::startAutoConsumption()
308 {
309 Must(mustAutoConsume);
310 Must(!theConsumer);
311 theConsumer = new BodySink;
312 debugs(91,7, HERE << "starting auto consumption" << status());
313 scheduleBodyDataNotification();
314 }
315
316 MemBuf &
317 BodyPipe::checkOut()
318 {
319 assert(!isCheckedOut);
320 isCheckedOut = true;
321 return theBuf;
322 }
323
324 void
325 BodyPipe::checkIn(Checkout &checkout)
326 {
327 assert(isCheckedOut);
328 isCheckedOut = false;
329 const size_t currentSize = theBuf.contentSize();
330 if (checkout.checkedOutSize > currentSize)
331 postConsume(checkout.checkedOutSize - currentSize);
332 else if (checkout.checkedOutSize < currentSize)
333 postAppend(currentSize - checkout.checkedOutSize);
334 }
335
336 void
337 BodyPipe::undoCheckOut(Checkout &checkout)
338 {
339 assert(isCheckedOut);
340 const size_t currentSize = theBuf.contentSize();
341 // We can only undo if size did not change, and even that carries
342 // some risk. If this becomes a problem, the code checking out
343 // raw buffers should always check them in (possibly unchanged)
344 // instead of relying on the automated undo mechanism of Checkout.
345 // The code can always use a temporary buffer to accomplish that.
346 assert(checkout.checkedOutSize == currentSize);
347 }
348
349 // TODO: Optimize: inform consumer/producer about more data/space only if
350 // they used the data/space since we notified them last time.
351
352 void
353 BodyPipe::postConsume(size_t size)
354 {
355 assert(!isCheckedOut);
356 theGetSize += size;
357 debugs(91,7, HERE << "consumed " << size << " bytes" << status());
358 if (mayNeedMoreData()) {
359 AsyncCall::Pointer call= asyncCall(91, 7,
360 "BodyProducer::noteMoreBodySpaceAvailable",
361 BodyProducerDialer(theProducer,
362 &BodyProducer::noteMoreBodySpaceAvailable, this));
363 ScheduleCallHere(call);
364 }
365 }
366
367 void
368 BodyPipe::postAppend(size_t size)
369 {
370 assert(!isCheckedOut);
371 thePutSize += size;
372 debugs(91,7, HERE << "added " << size << " bytes" << status());
373
374 if (mustAutoConsume && !theConsumer && size > 0)
375 startAutoConsumption();
376
377 // We should not consume here even if mustAutoConsume because the
378 // caller may not be ready for the data to be consumed during this call.
379 scheduleBodyDataNotification();
380
381 if (!mayNeedMoreData())
382 clearProducer(true); // reached end-of-body
383 }
384
385
386 void
387 BodyPipe::scheduleBodyDataNotification()
388 {
389 if (theConsumer) {
390 AsyncCall::Pointer call = asyncCall(91, 7,
391 "BodyConsumer::noteMoreBodyDataAvailable",
392 BodyConsumerDialer(theConsumer,
393 &BodyConsumer::noteMoreBodyDataAvailable, this));
394 ScheduleCallHere(call);
395 }
396 }
397
398 void
399 BodyPipe::scheduleBodyEndNotification()
400 {
401 if (theConsumer) {
402 if (bodySizeKnown() && bodySize() == thePutSize) {
403 AsyncCall::Pointer call = asyncCall(91, 7,
404 "BodyConsumer::noteBodyProductionEnded",
405 BodyConsumerDialer(theConsumer,
406 &BodyConsumer::noteBodyProductionEnded, this));
407 ScheduleCallHere(call);
408 } else {
409 AsyncCall::Pointer call = asyncCall(91, 7,
410 "BodyConsumer::noteBodyProducerAborted",
411 BodyConsumerDialer(theConsumer,
412 &BodyConsumer::noteBodyProducerAborted, this));
413 ScheduleCallHere(call);
414 }
415 }
416 }
417
418 // a short temporary string describing buffer status for debugging
419 const char *BodyPipe::status() const
420 {
421 static MemBuf outputBuffer;
422 outputBuffer.reset();
423
424 outputBuffer.append(" [", 2);
425
426 outputBuffer.Printf("%"PRIu64"<=%"PRIu64, theGetSize, thePutSize);
427 if (theBodySize >= 0)
428 outputBuffer.Printf("<=%"PRId64, theBodySize);
429 else
430 outputBuffer.append("<=?", 3);
431
432 outputBuffer.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
433
434 outputBuffer.Printf(" pipe%p", this);
435 if (theProducer)
436 outputBuffer.Printf(" prod%p", theProducer);
437 if (theConsumer)
438 outputBuffer.Printf(" cons%p", theConsumer);
439
440 if (mustAutoConsume)
441 outputBuffer.append(" A", 2);
442 if (isCheckedOut)
443 outputBuffer.append(" L", 2); // Locked
444
445 outputBuffer.append("]", 1);
446
447 outputBuffer.terminate();
448
449 return outputBuffer.content();
450 }
451
452
453 /* BodyPipeCheckout */
454
455 BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe),
456 buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
457 checkedOutSize(buf.contentSize()), checkedIn(false)
458 {
459 }
460
461 BodyPipeCheckout::~BodyPipeCheckout()
462 {
463 if (!checkedIn)
464 pipe.undoCheckOut(*this);
465 }
466
467 void
468 BodyPipeCheckout::checkIn()
469 {
470 assert(!checkedIn);
471 pipe.checkIn(*this);
472 checkedIn = true;
473 }
474
475
476 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe),
477 buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
478 checkedIn(c.checkedIn)
479 {
480 assert(false); // prevent copying
481 }
482
483 BodyPipeCheckout &
484 BodyPipeCheckout::operator =(const BodyPipeCheckout &)
485 {
486 assert(false); // prevent assignment
487 return *this;
488 }