]> git.ipfire.org Git - thirdparty/squid.git/blob - src/BodyPipe.cc
4abd5359435f0bb2c2970c21dc863c771e0b0cea
[thirdparty/squid.git] / src / BodyPipe.cc
1
2 #include "squid.h"
3 #include "base/AsyncJobCalls.h"
4 #include "base/TextException.h"
5 #include "BodyPipe.h"
6
7 CBDATA_CLASS_INIT(BodyPipe);
8
9 // BodySink is a BodyConsumer class which just consume and drops
10 // data from a BodyPipe
11 class BodySink: public BodyConsumer
12 {
13 public:
14 BodySink(const BodyPipe::Pointer &bp): AsyncJob("BodySink"), body_pipe(bp) {}
15 virtual ~BodySink() { assert(!body_pipe); }
16
17 virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) {
18 size_t contentSize = bp->buf().contentSize();
19 bp->consume(contentSize);
20 }
21 virtual void noteBodyProductionEnded(BodyPipe::Pointer bp) {
22 stopConsumingFrom(body_pipe);
23 }
24 virtual void noteBodyProducerAborted(BodyPipe::Pointer bp) {
25 stopConsumingFrom(body_pipe);
26 }
27 bool doneAll() const {return !body_pipe && AsyncJob::doneAll();}
28
29 private:
30 BodyPipe::Pointer body_pipe; ///< the pipe we are consuming from
31
32 CBDATA_CLASS2(BodySink);
33 };
34
35 CBDATA_CLASS_INIT(BodySink);
36
37 // The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
38 // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
39 // the BodyPipe passed as argument
40 class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
41 {
42 public:
43 typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
44
45 BodyProducerDialer(const BodyProducer::Pointer &aProducer,
46 Parent::Method aHandler, BodyPipe::Pointer bp):
47 Parent(aProducer, aHandler, bp) {}
48
49 virtual bool canDial(AsyncCall &call);
50 };
51
52 // The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
53 // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
54 // of the BodyPipe passed as argument
55 class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
56 {
57 public:
58 typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
59
60 BodyConsumerDialer(const BodyConsumer::Pointer &aConsumer,
61 Parent::Method aHandler, BodyPipe::Pointer bp):
62 Parent(aConsumer, aHandler, bp) {}
63
64 virtual bool canDial(AsyncCall &call);
65 };
66
67 bool
68 BodyProducerDialer::canDial(AsyncCall &call)
69 {
70 if (!Parent::canDial(call))
71 return false;
72
73 const BodyProducer::Pointer &producer = job;
74 BodyPipe::Pointer pipe = arg1;
75 if (!pipe->stillProducing(producer)) {
76 debugs(call.debugSection, call.debugLevel, HERE << producer <<
77 " no longer producing for " << pipe->status());
78 return call.cancel("no longer producing");
79 }
80
81 return true;
82 }
83
84 bool
85 BodyConsumerDialer::canDial(AsyncCall &call)
86 {
87 if (!Parent::canDial(call))
88 return false;
89
90 const BodyConsumer::Pointer &consumer = job;
91 BodyPipe::Pointer pipe = arg1;
92 if (!pipe->stillConsuming(consumer)) {
93 debugs(call.debugSection, call.debugLevel, HERE << consumer <<
94 " no longer consuming from " << pipe->status());
95 return call.cancel("no longer consuming");
96 }
97
98 return true;
99 }
100
101
102 /* BodyProducer */
103
104 // inform the pipe that we are done and clear the Pointer
105 void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
106 {
107 debugs(91,7, HERE << this << " will not produce for " << pipe <<
108 "; atEof: " << atEof);
109 assert(pipe != NULL); // be strict: the caller state may depend on this
110 pipe->clearProducer(atEof);
111 pipe = NULL;
112 }
113
114
115
116 /* BodyConsumer */
117
118 // inform the pipe that we are done and clear the Pointer
119 void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
120 {
121 debugs(91,7, HERE << this << " will not consume from " << pipe);
122 assert(pipe != NULL); // be strict: the caller state may depend on this
123 pipe->clearConsumer();
124 pipe = NULL;
125 }
126
127
128 /* BodyPipe */
129
130 BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
131 theProducer(aProducer), theConsumer(0),
132 thePutSize(0), theGetSize(0),
133 mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
134 {
135 // TODO: teach MemBuf to start with zero minSize
136 // TODO: limit maxSize by theBodySize, when known?
137 theBuf.init(2*1024, MaxCapacity);
138 debugs(91,7, HERE << "created BodyPipe" << status());
139 }
140
141 BodyPipe::~BodyPipe()
142 {
143 debugs(91,7, HERE << "destroying BodyPipe" << status());
144 assert(!theProducer);
145 assert(!theConsumer);
146 theBuf.clean();
147 }
148
149 void BodyPipe::setBodySize(uint64_t aBodySize)
150 {
151 assert(!bodySizeKnown());
152 assert(thePutSize <= aBodySize);
153
154 // If this assert fails, we need to add code to check for eof and inform
155 // the consumer about the eof condition via scheduleBodyEndNotification,
156 // because just setting a body size limit may trigger the eof condition.
157 assert(!theConsumer);
158
159 theBodySize = aBodySize;
160 debugs(91,7, HERE << "set body size" << status());
161 }
162
163 uint64_t BodyPipe::bodySize() const
164 {
165 assert(bodySizeKnown());
166 return static_cast<uint64_t>(theBodySize);
167 }
168
169 bool BodyPipe::expectMoreAfter(uint64_t offset) const
170 {
171 assert(theGetSize <= offset);
172 return offset < thePutSize || // buffer has more now or
173 (!productionEnded() && mayNeedMoreData()); // buffer will have more
174 }
175
176 bool BodyPipe::exhausted() const
177 {
178 return !expectMoreAfter(theGetSize);
179 }
180
181 uint64_t BodyPipe::unproducedSize() const
182 {
183 return bodySize() - thePutSize; // bodySize() asserts that size is known
184 }
185
186 void BodyPipe::expectProductionEndAfter(uint64_t size)
187 {
188 const uint64_t expectedSize = thePutSize + size;
189 if (bodySizeKnown())
190 Must(bodySize() == expectedSize);
191 else
192 theBodySize = expectedSize;
193 }
194
195 void
196 BodyPipe::clearProducer(bool atEof)
197 {
198 if (theProducer.set()) {
199 debugs(91,7, HERE << "clearing BodyPipe producer" << status());
200 theProducer.clear();
201 if (atEof) {
202 if (!bodySizeKnown())
203 theBodySize = thePutSize;
204 else if (bodySize() != thePutSize)
205 debugs(91,3, HERE << "aborting on premature eof" << status());
206 } else {
207 // asserta that we can detect the abort if the consumer joins later
208 assert(!bodySizeKnown() || bodySize() != thePutSize);
209 }
210 scheduleBodyEndNotification();
211 }
212 }
213
214 size_t
215 BodyPipe::putMoreData(const char *aBuffer, size_t size)
216 {
217 if (bodySizeKnown())
218 size = min((uint64_t)size, unproducedSize());
219
220 const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
221 if ((size = min(size, spaceSize))) {
222 theBuf.append(aBuffer, size);
223 postAppend(size);
224 return size;
225 }
226 return 0;
227 }
228
229 bool
230 BodyPipe::setConsumerIfNotLate(const Consumer::Pointer &aConsumer)
231 {
232 assert(!theConsumer);
233 assert(aConsumer.set()); // but might be invalid
234
235 // TODO: convert this into an exception and remove IfNotLate suffix
236 // If there is something consumed already, we are in an auto-consuming mode
237 // and it is too late to attach a real consumer to the pipe.
238 if (theGetSize > 0) {
239 assert(mustAutoConsume);
240 return false;
241 }
242
243 Must(!abortedConsumption); // did not promise to never consume
244
245 theConsumer = aConsumer;
246 debugs(91,7, HERE << "set consumer" << status());
247 if (theBuf.hasContent())
248 scheduleBodyDataNotification();
249 if (!theProducer)
250 scheduleBodyEndNotification();
251
252 return true;
253 }
254
255 void
256 BodyPipe::clearConsumer()
257 {
258 if (theConsumer.set()) {
259 debugs(91,7, HERE << "clearing consumer" << status());
260 theConsumer.clear();
261 // do not abort if we have not consumed so that HTTP or ICAP can retry
262 // benign xaction failures due to persistent connection race conditions
263 if (consumedSize())
264 expectNoConsumption();
265 }
266 }
267
268 void
269 BodyPipe::expectNoConsumption()
270 {
271 // We may be called multiple times because multiple jobs on the consumption
272 // chain may realize that there will be no more setConsumer() calls (e.g.,
273 // consuming code and retrying code). It is both difficult and not really
274 // necessary for them to coordinate their expectNoConsumption() calls.
275
276 // As a consequence, we may be called when we are auto-consuming already.
277
278 if (!abortedConsumption && !exhausted()) {
279 // Before we abort, any regular consumption should be over and auto
280 // consumption must not be started.
281 Must(!theConsumer);
282
283 AsyncCall::Pointer call= asyncCall(91, 7,
284 "BodyProducer::noteBodyConsumerAborted",
285 BodyProducerDialer(theProducer,
286 &BodyProducer::noteBodyConsumerAborted, this));
287 ScheduleCallHere(call);
288 abortedConsumption = true;
289
290 // in case somebody enabled auto-consumption before regular one aborted
291 if (mustAutoConsume)
292 startAutoConsumption();
293 }
294 }
295
296 size_t
297 BodyPipe::getMoreData(MemBuf &aMemBuffer)
298 {
299 if (!theBuf.hasContent())
300 return 0; // did not touch the possibly uninitialized buf
301
302 if (aMemBuffer.isNull())
303 aMemBuffer.init();
304 const size_t size = min(theBuf.contentSize(), aMemBuffer.potentialSpaceSize());
305 aMemBuffer.append(theBuf.content(), size);
306 theBuf.consume(size);
307 postConsume(size);
308 return size; // cannot be zero if we called buf.init above
309 }
310
311 void
312 BodyPipe::consume(size_t size)
313 {
314 theBuf.consume(size);
315 postConsume(size);
316 }
317
318 // In the AutoConsumption mode the consumer has gone but the producer continues
319 // producing data. We are using a BodySink BodyConsumer which just discards the produced data.
320 void
321 BodyPipe::enableAutoConsumption()
322 {
323 mustAutoConsume = true;
324 debugs(91,5, HERE << "enabled auto consumption" << status());
325 if (!theConsumer && theBuf.hasContent())
326 startAutoConsumption();
327 }
328
329 // start auto consumption by creating body sink
330 void
331 BodyPipe::startAutoConsumption()
332 {
333 Must(mustAutoConsume);
334 Must(!theConsumer);
335 theConsumer = new BodySink(this);
336 debugs(91,7, HERE << "starting auto consumption" << status());
337 scheduleBodyDataNotification();
338 }
339
340 MemBuf &
341 BodyPipe::checkOut()
342 {
343 assert(!isCheckedOut);
344 isCheckedOut = true;
345 return theBuf;
346 }
347
348 void
349 BodyPipe::checkIn(Checkout &checkout)
350 {
351 assert(isCheckedOut);
352 isCheckedOut = false;
353 const size_t currentSize = theBuf.contentSize();
354 if (checkout.checkedOutSize > currentSize)
355 postConsume(checkout.checkedOutSize - currentSize);
356 else if (checkout.checkedOutSize < currentSize)
357 postAppend(currentSize - checkout.checkedOutSize);
358 }
359
360 void
361 BodyPipe::undoCheckOut(Checkout &checkout)
362 {
363 assert(isCheckedOut);
364 const size_t currentSize = theBuf.contentSize();
365 // We can only undo if size did not change, and even that carries
366 // some risk. If this becomes a problem, the code checking out
367 // raw buffers should always check them in (possibly unchanged)
368 // instead of relying on the automated undo mechanism of Checkout.
369 // The code can always use a temporary buffer to accomplish that.
370 Must(checkout.checkedOutSize == currentSize);
371 }
372
373 // TODO: Optimize: inform consumer/producer about more data/space only if
374 // they used the data/space since we notified them last time.
375
376 void
377 BodyPipe::postConsume(size_t size)
378 {
379 assert(!isCheckedOut);
380 theGetSize += size;
381 debugs(91,7, HERE << "consumed " << size << " bytes" << status());
382 if (mayNeedMoreData()) {
383 AsyncCall::Pointer call= asyncCall(91, 7,
384 "BodyProducer::noteMoreBodySpaceAvailable",
385 BodyProducerDialer(theProducer,
386 &BodyProducer::noteMoreBodySpaceAvailable, this));
387 ScheduleCallHere(call);
388 }
389 }
390
391 void
392 BodyPipe::postAppend(size_t size)
393 {
394 assert(!isCheckedOut);
395 thePutSize += size;
396 debugs(91,7, HERE << "added " << size << " bytes" << status());
397
398 if (mustAutoConsume && !theConsumer && size > 0)
399 startAutoConsumption();
400
401 // We should not consume here even if mustAutoConsume because the
402 // caller may not be ready for the data to be consumed during this call.
403 scheduleBodyDataNotification();
404
405 if (!mayNeedMoreData())
406 clearProducer(true); // reached end-of-body
407 }
408
409
410 void
411 BodyPipe::scheduleBodyDataNotification()
412 {
413 if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
414 AsyncCall::Pointer call = asyncCall(91, 7,
415 "BodyConsumer::noteMoreBodyDataAvailable",
416 BodyConsumerDialer(theConsumer,
417 &BodyConsumer::noteMoreBodyDataAvailable, this));
418 ScheduleCallHere(call);
419 }
420 }
421
422 void
423 BodyPipe::scheduleBodyEndNotification()
424 {
425 if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
426 if (bodySizeKnown() && bodySize() == thePutSize) {
427 AsyncCall::Pointer call = asyncCall(91, 7,
428 "BodyConsumer::noteBodyProductionEnded",
429 BodyConsumerDialer(theConsumer,
430 &BodyConsumer::noteBodyProductionEnded, this));
431 ScheduleCallHere(call);
432 } else {
433 AsyncCall::Pointer call = asyncCall(91, 7,
434 "BodyConsumer::noteBodyProducerAborted",
435 BodyConsumerDialer(theConsumer,
436 &BodyConsumer::noteBodyProducerAborted, this));
437 ScheduleCallHere(call);
438 }
439 }
440 }
441
442 // a short temporary string describing buffer status for debugging
443 const char *BodyPipe::status() const
444 {
445 static MemBuf outputBuffer;
446 outputBuffer.reset();
447
448 outputBuffer.append(" [", 2);
449
450 outputBuffer.Printf("%" PRIu64 "<=%" PRIu64, theGetSize, thePutSize);
451 if (theBodySize >= 0)
452 outputBuffer.Printf("<=%" PRId64, theBodySize);
453 else
454 outputBuffer.append("<=?", 3);
455
456 outputBuffer.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
457
458 outputBuffer.Printf(" pipe%p", this);
459 if (theProducer.set())
460 outputBuffer.Printf(" prod%p", theProducer.get());
461 if (theConsumer.set())
462 outputBuffer.Printf(" cons%p", theConsumer.get());
463
464 if (mustAutoConsume)
465 outputBuffer.append(" A", 2);
466 if (abortedConsumption)
467 outputBuffer.append(" !C", 3);
468 if (isCheckedOut)
469 outputBuffer.append(" L", 2); // Locked
470
471 outputBuffer.append("]", 1);
472
473 outputBuffer.terminate();
474
475 return outputBuffer.content();
476 }
477
478
479 /* BodyPipeCheckout */
480
481 BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe),
482 buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
483 checkedOutSize(buf.contentSize()), checkedIn(false)
484 {
485 }
486
487 BodyPipeCheckout::~BodyPipeCheckout()
488 {
489 if (!checkedIn) {
490 // Do not pipe.undoCheckOut(*this) because it asserts or throws
491 // TODO: consider implementing the long-term solution discussed at
492 // http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html
493 debugs(91,2, HERE << "Warning: cannot undo BodyPipeCheckout");
494 pipe.checkIn(*this);
495 }
496 }
497
498 void
499 BodyPipeCheckout::checkIn()
500 {
501 assert(!checkedIn);
502 pipe.checkIn(*this);
503 checkedIn = true;
504 }
505
506
507 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe),
508 buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
509 checkedIn(c.checkedIn)
510 {
511 assert(false); // prevent copying
512 }
513
514 BodyPipeCheckout &
515 BodyPipeCheckout::operator =(const BodyPipeCheckout &)
516 {
517 assert(false); // prevent assignment
518 return *this;
519 }