]> git.ipfire.org Git - thirdparty/squid.git/blob - src/BodyPipe.cc
Fixed and polished autoconsumption mode.
[thirdparty/squid.git] / src / BodyPipe.cc
1
2 #include "squid.h"
3 #include "BodyPipe.h"
4 #include "TextException.h"
5
6 CBDATA_CLASS_INIT(BodyPipe);
7
8 // BodySink is a BodyConsumer class which just consume and drops
9 // data from a BodyPipe
10 class BodySink: public BodyConsumer {
11 bool done;
12 public:
13 BodySink():AsyncJob("BodySink"), done(false){}
14 virtual ~BodySink() {}
15
16 virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) {
17 size_t contentSize = bp->buf().contentSize();
18 bp->consume(contentSize);
19 }
20 virtual void noteBodyProductionEnded(BodyPipe::Pointer bp) {
21 stopConsumingFrom(bp);
22 done = true;
23 }
24 virtual void noteBodyProducerAborted(BodyPipe::Pointer bp) {
25 stopConsumingFrom(bp);
26 done = true;
27 }
28 bool doneAll() const {return done && AsyncJob::doneAll();}
29 CBDATA_CLASS2(BodySink);
30 };
31
32 CBDATA_CLASS_INIT(BodySink);
33
34 // The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
35 // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
36 // the BodyPipe passed as argument
37 class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
38 {
39 public:
40 typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
41
42 BodyProducerDialer(BodyProducer *aProducer, Parent::Method aHandler,
43 BodyPipe::Pointer bp): Parent(aProducer, aHandler, bp) {}
44
45 virtual bool canDial(AsyncCall &call);
46 };
47
48 // The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
49 // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
50 // of the BodyPipe passed as argument
51 class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
52 {
53 public:
54 typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
55
56 BodyConsumerDialer(BodyConsumer *aConsumer, Parent::Method aHandler,
57 BodyPipe::Pointer bp): Parent(aConsumer, aHandler, bp) {}
58
59 virtual bool canDial(AsyncCall &call);
60 };
61
62 bool
63 BodyProducerDialer::canDial(AsyncCall &call) {
64 if (!Parent::canDial(call))
65 return false;
66
67 BodyProducer *producer = object;
68 BodyPipe::Pointer pipe = arg1;
69 if (!pipe->stillProducing(producer)) {
70 debugs(call.debugSection, call.debugLevel, HERE << producer <<
71 " no longer producing for " << pipe->status());
72 return call.cancel("no longer producing");
73 }
74
75 return true;
76 }
77
78 bool
79 BodyConsumerDialer::canDial(AsyncCall &call) {
80 if (!Parent::canDial(call))
81 return false;
82
83 BodyConsumer *consumer = object;
84 BodyPipe::Pointer pipe = arg1;
85 if (!pipe->stillConsuming(consumer)) {
86 debugs(call.debugSection, call.debugLevel, HERE << consumer <<
87 " no longer consuming from " << pipe->status());
88 return call.cancel("no longer consuming");
89 }
90
91 return true;
92 }
93
94
95 /* BodyProducer */
96
97 // inform the pipe that we are done and clear the Pointer
98 void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
99 {
100 debugs(91,7, HERE << this << " will not produce for " << pipe <<
101 "; atEof: " << atEof);
102 assert(pipe != NULL); // be strict: the caller state may depend on this
103 pipe->clearProducer(atEof);
104 pipe = NULL;
105 }
106
107
108
109 /* BodyConsumer */
110
111 // inform the pipe that we are done and clear the Pointer
112 void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
113 {
114 debugs(91,7, HERE << this << " will not consume from " << pipe);
115 assert(pipe != NULL); // be strict: the caller state may depend on this
116 pipe->clearConsumer();
117 pipe = NULL;
118 }
119
120
121 /* BodyPipe */
122
123 BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
124 theProducer(aProducer), theConsumer(0),
125 thePutSize(0), theGetSize(0),
126 mustAutoConsume(false), isCheckedOut(false)
127 {
128 // TODO: teach MemBuf to start with zero minSize
129 // TODO: limit maxSize by theBodySize, when known?
130 theBuf.init(2*1024, MaxCapacity);
131 debugs(91,7, HERE << "created BodyPipe" << status());
132 }
133
134 BodyPipe::~BodyPipe()
135 {
136 debugs(91,7, HERE << "destroying BodyPipe" << status());
137 assert(!theProducer);
138 assert(!theConsumer);
139 theBuf.clean();
140 }
141
142 void BodyPipe::setBodySize(uint64_t aBodySize)
143 {
144 assert(!bodySizeKnown());
145 assert(aBodySize >= 0);
146 assert(thePutSize <= aBodySize);
147
148 // If this assert fails, we need to add code to check for eof and inform
149 // the consumer about the eof condition via scheduleBodyEndNotification,
150 // because just setting a body size limit may trigger the eof condition.
151 assert(!theConsumer);
152
153 theBodySize = aBodySize;
154 debugs(91,7, HERE << "set body size" << status());
155 }
156
157 uint64_t BodyPipe::bodySize() const
158 {
159 assert(bodySizeKnown());
160 return static_cast<uint64_t>(theBodySize);
161 }
162
163 bool BodyPipe::expectMoreAfter(uint64_t offset) const
164 {
165 assert(theGetSize <= offset);
166 return offset < thePutSize || // buffer has more now or
167 (!productionEnded() && mayNeedMoreData()); // buffer will have more
168 }
169
170 bool BodyPipe::exhausted() const
171 {
172 return !expectMoreAfter(theGetSize);
173 }
174
175 uint64_t BodyPipe::unproducedSize() const
176 {
177 return bodySize() - thePutSize; // bodySize() asserts that size is known
178 }
179
180 void
181 BodyPipe::clearProducer(bool atEof)
182 {
183 if (theProducer) {
184 debugs(91,7, HERE << "clearing BodyPipe producer" << status());
185 theProducer = NULL;
186 if (atEof) {
187 if (!bodySizeKnown())
188 theBodySize = thePutSize;
189 else
190 if (bodySize() != thePutSize)
191 debugs(91,3, HERE << "aborting on premature eof" << status());
192 } else {
193 // asserta that we can detect the abort if the consumer joins later
194 assert(!bodySizeKnown() || bodySize() != thePutSize);
195 }
196 scheduleBodyEndNotification();
197 }
198 }
199
200 size_t
201 BodyPipe::putMoreData(const char *buf, size_t size)
202 {
203 if (bodySizeKnown())
204 size = XMIN((uint64_t)size, unproducedSize());
205
206 const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
207 if ((size = XMIN(size, spaceSize))) {
208 theBuf.append(buf, size);
209 postAppend(size);
210 return size;
211 }
212 return 0;
213 }
214
215 bool
216 BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
217 {
218 assert(!theConsumer);
219 assert(aConsumer);
220
221 // TODO: convert this into an exception and remove IfNotLate suffix
222 // If there is something consumed already, we are in an auto-consuming mode
223 // and it is too late to attach a real consumer to the pipe.
224 if (theGetSize > 0) {
225 assert(mustAutoConsume);
226 return false;
227 }
228
229 theConsumer = aConsumer;
230 debugs(91,7, HERE << "set consumer" << status());
231 if (theBuf.hasContent())
232 scheduleBodyDataNotification();
233 if (!theProducer)
234 scheduleBodyEndNotification();
235
236 return true;
237 }
238
239 // When BodyPipe consumer is gone, all events for that consumer must not
240 // reach the new consumer (if any). Otherwise, the calls may go out of order
241 // (if _some_ calls are dropped due to the ultimate destination being
242 // temporary NULL). The code keeps track of the number of outstanding
243 // events and skips that number if consumer leaves. TODO: when AscyncCall
244 // support is improved, should we just schedule calls directly to consumer?
245 void
246 BodyPipe::clearConsumer() {
247 if (theConsumer) {
248 debugs(91,7, HERE << "clearing consumer" << status());
249 theConsumer = NULL;
250 if (consumedSize() && !exhausted()) {
251 AsyncCall::Pointer call= asyncCall(91, 7,
252 "BodyProducer::noteBodyConsumerAborted",
253 BodyProducerDialer(theProducer,
254 &BodyProducer::noteBodyConsumerAborted, this));
255 ScheduleCallHere(call);
256 }
257 }
258 }
259
260 size_t
261 BodyPipe::getMoreData(MemBuf &buf)
262 {
263 if (!theBuf.hasContent())
264 return 0; // did not touch the possibly uninitialized buf
265
266 if (buf.isNull())
267 buf.init();
268 const size_t size = XMIN(theBuf.contentSize(), buf.potentialSpaceSize());
269 buf.append(theBuf.content(), size);
270 theBuf.consume(size);
271 postConsume(size);
272 return size; // cannot be zero if we called buf.init above
273 }
274
275 void
276 BodyPipe::consume(size_t size)
277 {
278 theBuf.consume(size);
279 postConsume(size);
280 }
281
282 // In the AutoConsumption mode the consumer has gone but the producer continues
283 // producing data. We are using a BodySink BodyConsumer which just discards the produced data.
284 void
285 BodyPipe::enableAutoConsumption() {
286 mustAutoConsume = true;
287 debugs(91,5, HERE << "enabled auto consumption" << status());
288 if (!theConsumer && theBuf.hasContent())
289 startAutoConsumption();
290 }
291
292 // start auto consumption by creating body sink
293 void
294 BodyPipe::startAutoConsumption()
295 {
296 Must(mustAutoConsume);
297 Must(!theConsumer);
298 theConsumer = new BodySink;
299 debugs(91,7, HERE << "starting auto consumption" << status());
300 scheduleBodyDataNotification();
301 }
302
303 MemBuf &
304 BodyPipe::checkOut() {
305 assert(!isCheckedOut);
306 isCheckedOut = true;
307 return theBuf;
308 }
309
310 void
311 BodyPipe::checkIn(Checkout &checkout)
312 {
313 assert(isCheckedOut);
314 isCheckedOut = false;
315 const size_t currentSize = theBuf.contentSize();
316 if (checkout.checkedOutSize > currentSize)
317 postConsume(checkout.checkedOutSize - currentSize);
318 else
319 if (checkout.checkedOutSize < currentSize)
320 postAppend(currentSize - checkout.checkedOutSize);
321 }
322
323 void
324 BodyPipe::undoCheckOut(Checkout &checkout)
325 {
326 assert(isCheckedOut);
327 const size_t currentSize = theBuf.contentSize();
328 // We can only undo if size did not change, and even that carries
329 // some risk. If this becomes a problem, the code checking out
330 // raw buffers should always check them in (possibly unchanged)
331 // instead of relying on the automated undo mechanism of Checkout.
332 // The code can always use a temporary buffer to accomplish that.
333 assert(checkout.checkedOutSize == currentSize);
334 }
335
336 // TODO: Optimize: inform consumer/producer about more data/space only if
337 // they used the data/space since we notified them last time.
338
339 void
340 BodyPipe::postConsume(size_t size) {
341 assert(!isCheckedOut);
342 theGetSize += size;
343 debugs(91,7, HERE << "consumed " << size << " bytes" << status());
344 if (mayNeedMoreData()){
345 AsyncCall::Pointer call= asyncCall(91, 7,
346 "BodyProducer::noteMoreBodySpaceAvailable",
347 BodyProducerDialer(theProducer,
348 &BodyProducer::noteMoreBodySpaceAvailable, this));
349 ScheduleCallHere(call);
350 }
351 }
352
353 void
354 BodyPipe::postAppend(size_t size) {
355 assert(!isCheckedOut);
356 thePutSize += size;
357 debugs(91,7, HERE << "added " << size << " bytes" << status());
358
359 if (mustAutoConsume && !theConsumer && size > 0)
360 startAutoConsumption();
361
362 // We should not consume here even if mustAutoConsume because the
363 // caller may not be ready for the data to be consumed during this call.
364 scheduleBodyDataNotification();
365
366 if (!mayNeedMoreData())
367 clearProducer(true); // reached end-of-body
368 }
369
370
371 void
372 BodyPipe::scheduleBodyDataNotification()
373 {
374 if (theConsumer) {
375 AsyncCall::Pointer call = asyncCall(91, 7,
376 "BodyConsumer::noteMoreBodyDataAvailable",
377 BodyConsumerDialer(theConsumer,
378 &BodyConsumer::noteMoreBodyDataAvailable, this));
379 ScheduleCallHere(call);
380 }
381 }
382
383 void
384 BodyPipe::scheduleBodyEndNotification()
385 {
386 if (theConsumer) {
387 if (bodySizeKnown() && bodySize() == thePutSize) {
388 AsyncCall::Pointer call = asyncCall(91, 7,
389 "BodyConsumer::noteBodyProductionEnded",
390 BodyConsumerDialer(theConsumer,
391 &BodyConsumer::noteBodyProductionEnded, this));
392 ScheduleCallHere(call);
393 }
394 else {
395 AsyncCall::Pointer call = asyncCall(91, 7,
396 "BodyConsumer::noteBodyProducerAborted",
397 BodyConsumerDialer(theConsumer,
398 &BodyConsumer::noteBodyProducerAborted, this));
399 ScheduleCallHere(call);
400 }
401 }
402 }
403
404 // a short temporary string describing buffer status for debugging
405 const char *BodyPipe::status() const
406 {
407 static MemBuf buf;
408 buf.reset();
409
410 buf.append(" [", 2);
411
412 buf.Printf("%"PRIu64"<=%"PRIu64, theGetSize, thePutSize);
413 if (theBodySize >= 0)
414 buf.Printf("<=%"PRId64, theBodySize);
415 else
416 buf.append("<=?", 3);
417
418 buf.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
419
420 buf.Printf(" pipe%p", this);
421 if (theProducer)
422 buf.Printf(" prod%p", theProducer);
423 if (theConsumer)
424 buf.Printf(" cons%p", theConsumer);
425
426 if (mustAutoConsume)
427 buf.append(" A", 2);
428 if (isCheckedOut)
429 buf.append(" L", 2); // Locked
430
431 buf.append("]", 1);
432
433 buf.terminate();
434
435 return buf.content();
436 }
437
438
439 /* BodyPipeCheckout */
440
441 BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe),
442 buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
443 checkedOutSize(buf.contentSize()), checkedIn(false)
444 {
445 }
446
447 BodyPipeCheckout::~BodyPipeCheckout()
448 {
449 if (!checkedIn)
450 pipe.undoCheckOut(*this);
451 }
452
453 void
454 BodyPipeCheckout::checkIn()
455 {
456 assert(!checkedIn);
457 pipe.checkIn(*this);
458 checkedIn = true;
459 }
460
461
462 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe),
463 buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
464 checkedIn(c.checkedIn)
465 {
466 assert(false); // prevent copying
467 }
468
469 BodyPipeCheckout &
470 BodyPipeCheckout::operator =(const BodyPipeCheckout &)
471 {
472 assert(false); // prevent assignment
473 return *this;
474 }