]> git.ipfire.org Git - thirdparty/squid.git/blame - src/BodyPipe.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / BodyPipe.cc
CommitLineData
bbc27441 1/*
bde978a6 2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
bbc27441
AJ
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
1b39caaa 8
582c2af2 9#include "squid.h"
4299f876 10#include "base/AsyncJobCalls.h"
3d93a84d 11#include "base/TextException.h"
1b39caaa 12#include "BodyPipe.h"
13
14CBDATA_CLASS_INIT(BodyPipe);
15
e7352f30 16// BodySink is a BodyConsumer class which just consume and drops
26ac0430
AJ
17// data from a BodyPipe
18class BodySink: public BodyConsumer
19{
5c2f68b7
AJ
20 CBDATA_CLASS(BodySink);
21
e7352f30 22public:
5a856d1e
AR
23 BodySink(const BodyPipe::Pointer &bp): AsyncJob("BodySink"), body_pipe(bp) {}
24 virtual ~BodySink() { assert(!body_pipe); }
26ac0430 25
e7352f30 26 virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) {
26ac0430
AJ
27 size_t contentSize = bp->buf().contentSize();
28 bp->consume(contentSize);
e7352f30 29 }
ced8def3 30 virtual void noteBodyProductionEnded(BodyPipe::Pointer) {
5a856d1e 31 stopConsumingFrom(body_pipe);
e7352f30 32 }
ced8def3 33 virtual void noteBodyProducerAborted(BodyPipe::Pointer) {
5a856d1e 34 stopConsumingFrom(body_pipe);
e7352f30 35 }
5a856d1e
AR
36 bool doneAll() const {return !body_pipe && AsyncJob::doneAll();}
37
38private:
39 BodyPipe::Pointer body_pipe; ///< the pipe we are consuming from
e7352f30 40};
41
42CBDATA_CLASS_INIT(BodySink);
43
44// The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
26ac0430 45// In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
e7352f30 46// the BodyPipe passed as argument
47class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
48{
49public:
26ac0430 50 typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
e7352f30 51
4299f876 52 BodyProducerDialer(const BodyProducer::Pointer &aProducer,
4cb2536f 53 Parent::Method aHandler, BodyPipe::Pointer bp):
f53969cc 54 Parent(aProducer, aHandler, bp) {}
e7352f30 55
26ac0430 56 virtual bool canDial(AsyncCall &call);
e7352f30 57};
58
59// The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
26ac0430 60// In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
e7352f30 61// of the BodyPipe passed as argument
62class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
63{
64public:
26ac0430 65 typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
e7352f30 66
4299f876 67 BodyConsumerDialer(const BodyConsumer::Pointer &aConsumer,
4cb2536f 68 Parent::Method aHandler, BodyPipe::Pointer bp):
f53969cc 69 Parent(aConsumer, aHandler, bp) {}
e7352f30 70
26ac0430 71 virtual bool canDial(AsyncCall &call);
e7352f30 72};
73
74bool
26ac0430
AJ
75BodyProducerDialer::canDial(AsyncCall &call)
76{
77 if (!Parent::canDial(call))
78 return false;
79
4299f876 80 const BodyProducer::Pointer &producer = job;
3be43416
AJ
81 BodyPipe::Pointer aPipe = arg1;
82 if (!aPipe->stillProducing(producer)) {
83 debugs(call.debugSection, call.debugLevel, producer << " no longer producing for " << aPipe->status());
26ac0430
AJ
84 return call.cancel("no longer producing");
85 }
e7352f30 86
26ac0430 87 return true;
e7352f30 88}
89
90bool
26ac0430
AJ
91BodyConsumerDialer::canDial(AsyncCall &call)
92{
93 if (!Parent::canDial(call))
94 return false;
95
4299f876 96 const BodyConsumer::Pointer &consumer = job;
3be43416
AJ
97 BodyPipe::Pointer aPipe = arg1;
98 if (!aPipe->stillConsuming(consumer)) {
99 debugs(call.debugSection, call.debugLevel, consumer << " no longer consuming from " << aPipe->status());
26ac0430
AJ
100 return call.cancel("no longer consuming");
101 }
e7352f30 102
26ac0430 103 return true;
e7352f30 104}
105
e7352f30 106/* BodyProducer */
107
1b39caaa 108// inform the pipe that we are done and clear the Pointer
3be43416 109void BodyProducer::stopProducingFor(RefCount<BodyPipe> &p, bool atEof)
1b39caaa 110{
3be43416
AJ
111 debugs(91,7, this << " will not produce for " << p << "; atEof: " << atEof);
112 assert(p != NULL); // be strict: the caller state may depend on this
113 p->clearProducer(atEof);
114 p = NULL;
1b39caaa 115}
116
e7352f30 117/* BodyConsumer */
118
1b39caaa 119// inform the pipe that we are done and clear the Pointer
3be43416 120void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &p)
1b39caaa 121{
3be43416
AJ
122 debugs(91,7, this << " will not consume from " << p);
123 assert(p != NULL); // be strict: the caller state may depend on this
124 p->clearConsumer();
125 p = NULL;
1b39caaa 126}
127
128/* BodyPipe */
129
130BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
f53969cc
SM
131 theProducer(aProducer), theConsumer(0),
132 thePutSize(0), theGetSize(0),
133 mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
1b39caaa 134{
26ac0430
AJ
135 // TODO: teach MemBuf to start with zero minSize
136 // TODO: limit maxSize by theBodySize, when known?
137 theBuf.init(2*1024, MaxCapacity);
138 debugs(91,7, HERE << "created BodyPipe" << status());
1b39caaa 139}
140
141BodyPipe::~BodyPipe()
142{
26ac0430
AJ
143 debugs(91,7, HERE << "destroying BodyPipe" << status());
144 assert(!theProducer);
145 assert(!theConsumer);
146 theBuf.clean();
1b39caaa 147}
148
47f6e231 149void BodyPipe::setBodySize(uint64_t aBodySize)
1b39caaa 150{
26ac0430 151 assert(!bodySizeKnown());
26ac0430 152 assert(thePutSize <= aBodySize);
1b39caaa 153
26ac0430
AJ
154 // If this assert fails, we need to add code to check for eof and inform
155 // the consumer about the eof condition via scheduleBodyEndNotification,
156 // because just setting a body size limit may trigger the eof condition.
157 assert(!theConsumer);
1b39caaa 158
26ac0430
AJ
159 theBodySize = aBodySize;
160 debugs(91,7, HERE << "set body size" << status());
1b39caaa 161}
162
47f6e231 163uint64_t BodyPipe::bodySize() const
1b39caaa 164{
26ac0430
AJ
165 assert(bodySizeKnown());
166 return static_cast<uint64_t>(theBodySize);
1b39caaa 167}
168
47f6e231 169bool BodyPipe::expectMoreAfter(uint64_t offset) const
1b39caaa 170{
26ac0430
AJ
171 assert(theGetSize <= offset);
172 return offset < thePutSize || // buffer has more now or
173 (!productionEnded() && mayNeedMoreData()); // buffer will have more
1b39caaa 174}
175
176bool BodyPipe::exhausted() const
177{
26ac0430 178 return !expectMoreAfter(theGetSize);
1b39caaa 179}
180
610d8f3b 181uint64_t BodyPipe::unproducedSize() const
1b39caaa 182{
26ac0430 183 return bodySize() - thePutSize; // bodySize() asserts that size is known
1b39caaa 184}
185
83c51da9
CT
186void BodyPipe::expectProductionEndAfter(uint64_t size)
187{
188 const uint64_t expectedSize = thePutSize + size;
189 if (bodySizeKnown())
190 Must(bodySize() == expectedSize);
191 else
192 theBodySize = expectedSize;
193}
194
1b39caaa 195void
196BodyPipe::clearProducer(bool atEof)
197{
4299f876 198 if (theProducer.set()) {
26ac0430 199 debugs(91,7, HERE << "clearing BodyPipe producer" << status());
4299f876 200 theProducer.clear();
26ac0430
AJ
201 if (atEof) {
202 if (!bodySizeKnown())
203 theBodySize = thePutSize;
e1381638
AJ
204 else if (bodySize() != thePutSize)
205 debugs(91,3, HERE << "aborting on premature eof" << status());
26ac0430
AJ
206 } else {
207 // asserta that we can detect the abort if the consumer joins later
208 assert(!bodySizeKnown() || bodySize() != thePutSize);
209 }
210 scheduleBodyEndNotification();
211 }
1b39caaa 212}
213
214size_t
350e2aec 215BodyPipe::putMoreData(const char *aBuffer, size_t size)
1b39caaa 216{
26ac0430 217 if (bodySizeKnown())
d85c3078 218 size = min((uint64_t)size, unproducedSize());
26ac0430
AJ
219
220 const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
d85c3078 221 if ((size = min(size, spaceSize))) {
350e2aec 222 theBuf.append(aBuffer, size);
26ac0430
AJ
223 postAppend(size);
224 return size;
225 }
226 return 0;
1b39caaa 227}
228
229bool
4299f876 230BodyPipe::setConsumerIfNotLate(const Consumer::Pointer &aConsumer)
1b39caaa 231{
26ac0430 232 assert(!theConsumer);
4299f876 233 assert(aConsumer.set()); // but might be invalid
26ac0430
AJ
234
235 // TODO: convert this into an exception and remove IfNotLate suffix
236 // If there is something consumed already, we are in an auto-consuming mode
237 // and it is too late to attach a real consumer to the pipe.
238 if (theGetSize > 0) {
239 assert(mustAutoConsume);
240 return false;
241 }
1b39caaa 242
abe286b8
AR
243 Must(!abortedConsumption); // did not promise to never consume
244
26ac0430
AJ
245 theConsumer = aConsumer;
246 debugs(91,7, HERE << "set consumer" << status());
247 if (theBuf.hasContent())
248 scheduleBodyDataNotification();
249 if (!theProducer)
250 scheduleBodyEndNotification();
1b39caaa 251
26ac0430 252 return true;
1b39caaa 253}
254
255void
26ac0430
AJ
256BodyPipe::clearConsumer()
257{
4299f876 258 if (theConsumer.set()) {
26ac0430 259 debugs(91,7, HERE << "clearing consumer" << status());
4299f876 260 theConsumer.clear();
abe286b8
AR
261 // do not abort if we have not consumed so that HTTP or ICAP can retry
262 // benign xaction failures due to persistent connection race conditions
263 if (consumedSize())
264 expectNoConsumption();
265 }
266}
267
268void
269BodyPipe::expectNoConsumption()
270{
b84d327b
AR
271 // We may be called multiple times because multiple jobs on the consumption
272 // chain may realize that there will be no more setConsumer() calls (e.g.,
273 // consuming code and retrying code). It is both difficult and not really
274 // necessary for them to coordinate their expectNoConsumption() calls.
275
276 // As a consequence, we may be called when we are auto-consuming already.
277
abe286b8 278 if (!abortedConsumption && !exhausted()) {
b84d327b 279 // Before we abort, any regular consumption should be over and auto
d8eddc48 280 // consumption must not be started.
b84d327b
AR
281 Must(!theConsumer);
282
e6f9e263
A
283 AsyncCall::Pointer call= asyncCall(91, 7,
284 "BodyProducer::noteBodyConsumerAborted",
285 BodyProducerDialer(theProducer,
f53969cc 286 &BodyProducer::noteBodyConsumerAborted, this));
e6f9e263
A
287 ScheduleCallHere(call);
288 abortedConsumption = true;
b84d327b
AR
289
290 // in case somebody enabled auto-consumption before regular one aborted
291 if (mustAutoConsume)
292 startAutoConsumption();
26ac0430 293 }
1b39caaa 294}
295
296size_t
350e2aec 297BodyPipe::getMoreData(MemBuf &aMemBuffer)
1b39caaa 298{
26ac0430
AJ
299 if (!theBuf.hasContent())
300 return 0; // did not touch the possibly uninitialized buf
1b39caaa 301
350e2aec
FC
302 if (aMemBuffer.isNull())
303 aMemBuffer.init();
304 const size_t size = min(theBuf.contentSize(), aMemBuffer.potentialSpaceSize());
305 aMemBuffer.append(theBuf.content(), size);
26ac0430
AJ
306 theBuf.consume(size);
307 postConsume(size);
308 return size; // cannot be zero if we called buf.init above
1b39caaa 309}
310
311void
312BodyPipe::consume(size_t size)
313{
26ac0430
AJ
314 theBuf.consume(size);
315 postConsume(size);
1b39caaa 316}
317
26ac0430 318// In the AutoConsumption mode the consumer has gone but the producer continues
e7352f30 319// producing data. We are using a BodySink BodyConsumer which just discards the produced data.
1b39caaa 320void
26ac0430
AJ
321BodyPipe::enableAutoConsumption()
322{
323 mustAutoConsume = true;
324 debugs(91,5, HERE << "enabled auto consumption" << status());
325 if (!theConsumer && theBuf.hasContent())
326 startAutoConsumption();
5121d48e
AR
327}
328
329// start auto consumption by creating body sink
330void
331BodyPipe::startAutoConsumption()
332{
26ac0430
AJ
333 Must(mustAutoConsume);
334 Must(!theConsumer);
5a856d1e 335 theConsumer = new BodySink(this);
26ac0430
AJ
336 debugs(91,7, HERE << "starting auto consumption" << status());
337 scheduleBodyDataNotification();
1b39caaa 338}
339
340MemBuf &
26ac0430
AJ
341BodyPipe::checkOut()
342{
343 assert(!isCheckedOut);
344 isCheckedOut = true;
345 return theBuf;
1b39caaa 346}
347
348void
349BodyPipe::checkIn(Checkout &checkout)
350{
26ac0430
AJ
351 assert(isCheckedOut);
352 isCheckedOut = false;
353 const size_t currentSize = theBuf.contentSize();
354 if (checkout.checkedOutSize > currentSize)
355 postConsume(checkout.checkedOutSize - currentSize);
e1381638
AJ
356 else if (checkout.checkedOutSize < currentSize)
357 postAppend(currentSize - checkout.checkedOutSize);
1b39caaa 358}
359
360void
361BodyPipe::undoCheckOut(Checkout &checkout)
362{
26ac0430
AJ
363 assert(isCheckedOut);
364 const size_t currentSize = theBuf.contentSize();
365 // We can only undo if size did not change, and even that carries
366 // some risk. If this becomes a problem, the code checking out
367 // raw buffers should always check them in (possibly unchanged)
368 // instead of relying on the automated undo mechanism of Checkout.
369 // The code can always use a temporary buffer to accomplish that.
87728a5b 370 Must(checkout.checkedOutSize == currentSize);
1b39caaa 371}
372
373// TODO: Optimize: inform consumer/producer about more data/space only if
374// they used the data/space since we notified them last time.
375
376void
26ac0430
AJ
377BodyPipe::postConsume(size_t size)
378{
379 assert(!isCheckedOut);
380 theGetSize += size;
381 debugs(91,7, HERE << "consumed " << size << " bytes" << status());
382 if (mayNeedMoreData()) {
383 AsyncCall::Pointer call= asyncCall(91, 7,
384 "BodyProducer::noteMoreBodySpaceAvailable",
385 BodyProducerDialer(theProducer,
f53969cc 386 &BodyProducer::noteMoreBodySpaceAvailable, this));
26ac0430
AJ
387 ScheduleCallHere(call);
388 }
1b39caaa 389}
390
391void
26ac0430
AJ
392BodyPipe::postAppend(size_t size)
393{
394 assert(!isCheckedOut);
395 thePutSize += size;
396 debugs(91,7, HERE << "added " << size << " bytes" << status());
1b39caaa 397
26ac0430
AJ
398 if (mustAutoConsume && !theConsumer && size > 0)
399 startAutoConsumption();
5121d48e 400
26ac0430
AJ
401 // We should not consume here even if mustAutoConsume because the
402 // caller may not be ready for the data to be consumed during this call.
403 scheduleBodyDataNotification();
1b39caaa 404
26ac0430
AJ
405 if (!mayNeedMoreData())
406 clearProducer(true); // reached end-of-body
1b39caaa 407}
408
6c56baf6 409void
410BodyPipe::scheduleBodyDataNotification()
411{
4299f876 412 if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
26ac0430
AJ
413 AsyncCall::Pointer call = asyncCall(91, 7,
414 "BodyConsumer::noteMoreBodyDataAvailable",
415 BodyConsumerDialer(theConsumer,
f53969cc 416 &BodyConsumer::noteMoreBodyDataAvailable, this));
26ac0430
AJ
417 ScheduleCallHere(call);
418 }
6c56baf6 419}
420
1b39caaa 421void
422BodyPipe::scheduleBodyEndNotification()
423{
4299f876 424 if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
26ac0430
AJ
425 if (bodySizeKnown() && bodySize() == thePutSize) {
426 AsyncCall::Pointer call = asyncCall(91, 7,
427 "BodyConsumer::noteBodyProductionEnded",
428 BodyConsumerDialer(theConsumer,
f53969cc 429 &BodyConsumer::noteBodyProductionEnded, this));
26ac0430
AJ
430 ScheduleCallHere(call);
431 } else {
432 AsyncCall::Pointer call = asyncCall(91, 7,
433 "BodyConsumer::noteBodyProducerAborted",
434 BodyConsumerDialer(theConsumer,
f53969cc 435 &BodyConsumer::noteBodyProducerAborted, this));
26ac0430
AJ
436 ScheduleCallHere(call);
437 }
438 }
1b39caaa 439}
440
1b39caaa 441// a short temporary string describing buffer status for debugging
442const char *BodyPipe::status() const
443{
350e2aec
FC
444 static MemBuf outputBuffer;
445 outputBuffer.reset();
1b39caaa 446
350e2aec 447 outputBuffer.append(" [", 2);
1b39caaa 448
c91ca3ce 449 outputBuffer.Printf("%" PRIu64 "<=%" PRIu64, theGetSize, thePutSize);
1b39caaa 450 if (theBodySize >= 0)
c91ca3ce 451 outputBuffer.Printf("<=%" PRId64, theBodySize);
26ac0430 452 else
350e2aec 453 outputBuffer.append("<=?", 3);
1b39caaa 454
350e2aec 455 outputBuffer.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
1b39caaa 456
350e2aec 457 outputBuffer.Printf(" pipe%p", this);
4299f876
AR
458 if (theProducer.set())
459 outputBuffer.Printf(" prod%p", theProducer.get());
460 if (theConsumer.set())
461 outputBuffer.Printf(" cons%p", theConsumer.get());
1b39caaa 462
26ac0430 463 if (mustAutoConsume)
350e2aec 464 outputBuffer.append(" A", 2);
abe286b8
AR
465 if (abortedConsumption)
466 outputBuffer.append(" !C", 3);
26ac0430 467 if (isCheckedOut)
350e2aec 468 outputBuffer.append(" L", 2); // Locked
1b39caaa 469
350e2aec 470 outputBuffer.append("]", 1);
1b39caaa 471
350e2aec 472 outputBuffer.terminate();
1b39caaa 473
350e2aec 474 return outputBuffer.content();
1b39caaa 475}
476
1b39caaa 477/* BodyPipeCheckout */
478
3be43416 479BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): thePipe(aPipe),
f53969cc
SM
480 buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
481 checkedOutSize(buf.contentSize()), checkedIn(false)
1b39caaa 482{
483}
484
485BodyPipeCheckout::~BodyPipeCheckout()
486{
87728a5b
AR
487 if (!checkedIn) {
488 // Do not pipe.undoCheckOut(*this) because it asserts or throws
489 // TODO: consider implementing the long-term solution discussed at
490 // http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html
491 debugs(91,2, HERE << "Warning: cannot undo BodyPipeCheckout");
3be43416 492 thePipe.checkIn(*this);
87728a5b 493 }
1b39caaa 494}
495
496void
497BodyPipeCheckout::checkIn()
498{
26ac0430 499 assert(!checkedIn);
3be43416 500 thePipe.checkIn(*this);
26ac0430 501 checkedIn = true;
1b39caaa 502}
503
3be43416 504BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): thePipe(c.thePipe),
f53969cc
SM
505 buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
506 checkedIn(c.checkedIn)
1b39caaa 507{
26ac0430 508 assert(false); // prevent copying
1b39caaa 509}
510
511BodyPipeCheckout &
512BodyPipeCheckout::operator =(const BodyPipeCheckout &)
513{
26ac0430
AJ
514 assert(false); // prevent assignment
515 return *this;
1b39caaa 516}
f53969cc 517