]> git.ipfire.org Git - thirdparty/squid.git/blame - src/BodyPipe.cc
Simplify appending SBuf to String (#2108)
[thirdparty/squid.git] / src / BodyPipe.cc
CommitLineData
bbc27441 1/*
1f7b830e 2 * Copyright (C) 1996-2025 The Squid Software Foundation and contributors
bbc27441
AJ
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
1b39caaa 8
582c2af2 9#include "squid.h"
4299f876 10#include "base/AsyncJobCalls.h"
3d93a84d 11#include "base/TextException.h"
1b39caaa 12#include "BodyPipe.h"
13
e7352f30 14// BodySink is a BodyConsumer class which just consume and drops
26ac0430
AJ
15// data from a BodyPipe
16class BodySink: public BodyConsumer
17{
337b9aa4 18 CBDATA_CHILD(BodySink);
5c2f68b7 19
e7352f30 20public:
5a856d1e 21 BodySink(const BodyPipe::Pointer &bp): AsyncJob("BodySink"), body_pipe(bp) {}
337b9aa4 22 ~BodySink() override { assert(!body_pipe); }
26ac0430 23
337b9aa4 24 void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) override {
26ac0430
AJ
25 size_t contentSize = bp->buf().contentSize();
26 bp->consume(contentSize);
e7352f30 27 }
337b9aa4 28 void noteBodyProductionEnded(BodyPipe::Pointer) override {
5a856d1e 29 stopConsumingFrom(body_pipe);
e7352f30 30 }
337b9aa4 31 void noteBodyProducerAborted(BodyPipe::Pointer) override {
5a856d1e 32 stopConsumingFrom(body_pipe);
e7352f30 33 }
337b9aa4 34 bool doneAll() const override {return !body_pipe && AsyncJob::doneAll();}
5a856d1e
AR
35
36private:
37 BodyPipe::Pointer body_pipe; ///< the pipe we are consuming from
e7352f30 38};
39
40CBDATA_CLASS_INIT(BodySink);
41
42// The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
26ac0430 43// In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
e7352f30 44// the BodyPipe passed as argument
45class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
46{
47public:
26ac0430 48 typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
e7352f30 49
4299f876 50 BodyProducerDialer(const BodyProducer::Pointer &aProducer,
4cb2536f 51 Parent::Method aHandler, BodyPipe::Pointer bp):
f53969cc 52 Parent(aProducer, aHandler, bp) {}
e7352f30 53
337b9aa4 54 bool canDial(AsyncCall &call) override;
e7352f30 55};
56
57// The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
26ac0430 58// In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
e7352f30 59// of the BodyPipe passed as argument
60class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
61{
62public:
26ac0430 63 typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
e7352f30 64
4299f876 65 BodyConsumerDialer(const BodyConsumer::Pointer &aConsumer,
4cb2536f 66 Parent::Method aHandler, BodyPipe::Pointer bp):
f53969cc 67 Parent(aConsumer, aHandler, bp) {}
e7352f30 68
337b9aa4 69 bool canDial(AsyncCall &call) override;
e7352f30 70};
71
72bool
26ac0430
AJ
73BodyProducerDialer::canDial(AsyncCall &call)
74{
75 if (!Parent::canDial(call))
76 return false;
77
4299f876 78 const BodyProducer::Pointer &producer = job;
3be43416
AJ
79 BodyPipe::Pointer aPipe = arg1;
80 if (!aPipe->stillProducing(producer)) {
81 debugs(call.debugSection, call.debugLevel, producer << " no longer producing for " << aPipe->status());
26ac0430
AJ
82 return call.cancel("no longer producing");
83 }
e7352f30 84
26ac0430 85 return true;
e7352f30 86}
87
88bool
26ac0430
AJ
89BodyConsumerDialer::canDial(AsyncCall &call)
90{
91 if (!Parent::canDial(call))
92 return false;
93
4299f876 94 const BodyConsumer::Pointer &consumer = job;
3be43416
AJ
95 BodyPipe::Pointer aPipe = arg1;
96 if (!aPipe->stillConsuming(consumer)) {
97 debugs(call.debugSection, call.debugLevel, consumer << " no longer consuming from " << aPipe->status());
26ac0430
AJ
98 return call.cancel("no longer consuming");
99 }
e7352f30 100
26ac0430 101 return true;
e7352f30 102}
103
e7352f30 104/* BodyProducer */
105
1b39caaa 106// inform the pipe that we are done and clear the Pointer
3be43416 107void BodyProducer::stopProducingFor(RefCount<BodyPipe> &p, bool atEof)
1b39caaa 108{
3be43416 109 debugs(91,7, this << " will not produce for " << p << "; atEof: " << atEof);
aee3523a 110 assert(p != nullptr); // be strict: the caller state may depend on this
3be43416 111 p->clearProducer(atEof);
aee3523a 112 p = nullptr;
1b39caaa 113}
114
e7352f30 115/* BodyConsumer */
116
1b39caaa 117// inform the pipe that we are done and clear the Pointer
3be43416 118void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &p)
1b39caaa 119{
3be43416 120 debugs(91,7, this << " will not consume from " << p);
aee3523a 121 assert(p != nullptr); // be strict: the caller state may depend on this
3be43416 122 p->clearConsumer();
aee3523a 123 p = nullptr;
1b39caaa 124}
125
126/* BodyPipe */
127
128BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
aee3523a 129 theProducer(aProducer), theConsumer(nullptr),
f53969cc
SM
130 thePutSize(0), theGetSize(0),
131 mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
1b39caaa 132{
26ac0430
AJ
133 // TODO: teach MemBuf to start with zero minSize
134 // TODO: limit maxSize by theBodySize, when known?
135 theBuf.init(2*1024, MaxCapacity);
bf95c10a 136 debugs(91,7, "created BodyPipe" << status());
1b39caaa 137}
138
139BodyPipe::~BodyPipe()
140{
bf95c10a 141 debugs(91,7, "destroying BodyPipe" << status());
26ac0430
AJ
142 assert(!theProducer);
143 assert(!theConsumer);
144 theBuf.clean();
1b39caaa 145}
146
47f6e231 147void BodyPipe::setBodySize(uint64_t aBodySize)
1b39caaa 148{
26ac0430 149 assert(!bodySizeKnown());
26ac0430 150 assert(thePutSize <= aBodySize);
1b39caaa 151
26ac0430
AJ
152 // If this assert fails, we need to add code to check for eof and inform
153 // the consumer about the eof condition via scheduleBodyEndNotification,
154 // because just setting a body size limit may trigger the eof condition.
155 assert(!theConsumer);
1b39caaa 156
26ac0430 157 theBodySize = aBodySize;
bf95c10a 158 debugs(91,7, "set body size" << status());
1b39caaa 159}
160
47f6e231 161uint64_t BodyPipe::bodySize() const
1b39caaa 162{
26ac0430
AJ
163 assert(bodySizeKnown());
164 return static_cast<uint64_t>(theBodySize);
1b39caaa 165}
166
47f6e231 167bool BodyPipe::expectMoreAfter(uint64_t offset) const
1b39caaa 168{
26ac0430
AJ
169 assert(theGetSize <= offset);
170 return offset < thePutSize || // buffer has more now or
171 (!productionEnded() && mayNeedMoreData()); // buffer will have more
1b39caaa 172}
173
174bool BodyPipe::exhausted() const
175{
26ac0430 176 return !expectMoreAfter(theGetSize);
1b39caaa 177}
178
610d8f3b 179uint64_t BodyPipe::unproducedSize() const
1b39caaa 180{
26ac0430 181 return bodySize() - thePutSize; // bodySize() asserts that size is known
1b39caaa 182}
183
83c51da9
CT
184void BodyPipe::expectProductionEndAfter(uint64_t size)
185{
186 const uint64_t expectedSize = thePutSize + size;
187 if (bodySizeKnown())
188 Must(bodySize() == expectedSize);
189 else
190 theBodySize = expectedSize;
191}
192
1b39caaa 193void
194BodyPipe::clearProducer(bool atEof)
195{
4299f876 196 if (theProducer.set()) {
bf95c10a 197 debugs(91,7, "clearing BodyPipe producer" << status());
4299f876 198 theProducer.clear();
26ac0430
AJ
199 if (atEof) {
200 if (!bodySizeKnown())
201 theBodySize = thePutSize;
e1381638 202 else if (bodySize() != thePutSize)
bf95c10a 203 debugs(91,3, "aborting on premature eof" << status());
26ac0430
AJ
204 } else {
205 // asserta that we can detect the abort if the consumer joins later
206 assert(!bodySizeKnown() || bodySize() != thePutSize);
207 }
208 scheduleBodyEndNotification();
209 }
1b39caaa 210}
211
212size_t
350e2aec 213BodyPipe::putMoreData(const char *aBuffer, size_t size)
1b39caaa 214{
26ac0430 215 if (bodySizeKnown())
d85c3078 216 size = min((uint64_t)size, unproducedSize());
26ac0430
AJ
217
218 const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
d85c3078 219 if ((size = min(size, spaceSize))) {
350e2aec 220 theBuf.append(aBuffer, size);
26ac0430
AJ
221 postAppend(size);
222 return size;
223 }
224 return 0;
1b39caaa 225}
226
227bool
4299f876 228BodyPipe::setConsumerIfNotLate(const Consumer::Pointer &aConsumer)
1b39caaa 229{
26ac0430 230 assert(!theConsumer);
4299f876 231 assert(aConsumer.set()); // but might be invalid
26ac0430
AJ
232
233 // TODO: convert this into an exception and remove IfNotLate suffix
234 // If there is something consumed already, we are in an auto-consuming mode
235 // and it is too late to attach a real consumer to the pipe.
236 if (theGetSize > 0) {
237 assert(mustAutoConsume);
238 return false;
239 }
1b39caaa 240
abe286b8
AR
241 Must(!abortedConsumption); // did not promise to never consume
242
26ac0430 243 theConsumer = aConsumer;
bf95c10a 244 debugs(91,7, "set consumer" << status());
26ac0430
AJ
245 if (theBuf.hasContent())
246 scheduleBodyDataNotification();
247 if (!theProducer)
248 scheduleBodyEndNotification();
1b39caaa 249
26ac0430 250 return true;
1b39caaa 251}
252
253void
26ac0430
AJ
254BodyPipe::clearConsumer()
255{
4299f876 256 if (theConsumer.set()) {
bf95c10a 257 debugs(91,7, "clearing consumer" << status());
4299f876 258 theConsumer.clear();
abe286b8
AR
259 // do not abort if we have not consumed so that HTTP or ICAP can retry
260 // benign xaction failures due to persistent connection race conditions
261 if (consumedSize())
262 expectNoConsumption();
263 }
264}
265
266void
267BodyPipe::expectNoConsumption()
268{
94c40964
AR
269 // We and enableAutoConsumption() may be called multiple times because
270 // multiple jobs on the consumption chain may realize that there will be no
271 // more setConsumer() calls (e.g., consuming code and retrying code). It is
272 // both difficult and unnecessary for them to coordinate their calls.
273
274 // As a consequence, we may be called when already auto-consuming, including
275 // cases where abortedConsumption is still false. We could try to harden
276 // this by also aborting consumption from enableAutoConsumption() when there
277 // is no consumer, but see errorAppendEntry() TODO for a better plan.
278
279 debugs(91, 7, status());
280 if (!abortedConsumption && !exhausted() && !theConsumer) {
e6f9e263
A
281 AsyncCall::Pointer call= asyncCall(91, 7,
282 "BodyProducer::noteBodyConsumerAborted",
283 BodyProducerDialer(theProducer,
f53969cc 284 &BodyProducer::noteBodyConsumerAborted, this));
e6f9e263
A
285 ScheduleCallHere(call);
286 abortedConsumption = true;
b84d327b
AR
287
288 // in case somebody enabled auto-consumption before regular one aborted
b599471b 289 startAutoConsumptionIfNeeded();
26ac0430 290 }
1b39caaa 291}
292
293size_t
350e2aec 294BodyPipe::getMoreData(MemBuf &aMemBuffer)
1b39caaa 295{
26ac0430
AJ
296 if (!theBuf.hasContent())
297 return 0; // did not touch the possibly uninitialized buf
1b39caaa 298
350e2aec
FC
299 if (aMemBuffer.isNull())
300 aMemBuffer.init();
301 const size_t size = min(theBuf.contentSize(), aMemBuffer.potentialSpaceSize());
302 aMemBuffer.append(theBuf.content(), size);
26ac0430
AJ
303 theBuf.consume(size);
304 postConsume(size);
305 return size; // cannot be zero if we called buf.init above
1b39caaa 306}
307
308void
309BodyPipe::consume(size_t size)
310{
26ac0430
AJ
311 theBuf.consume(size);
312 postConsume(size);
1b39caaa 313}
314
315void
26ac0430
AJ
316BodyPipe::enableAutoConsumption()
317{
318 mustAutoConsume = true;
bf95c10a 319 debugs(91,5, "enabled auto consumption" << status());
b599471b 320 startAutoConsumptionIfNeeded();
5121d48e
AR
321}
322
b599471b
AR
323/// Check the current need and, if needed, start auto consumption. In auto
324/// consumption mode, the consumer is gone, but the producer continues to
325/// produce data. We use a BodySink BodyConsumer to discard that data.
5121d48e 326void
b599471b 327BodyPipe::startAutoConsumptionIfNeeded()
5121d48e 328{
b599471b
AR
329 const auto startNow =
330 mustAutoConsume && // was enabled
331 !theConsumer && // has not started yet
332 theProducer.valid() && // still useful (and will eventually stop)
333 theBuf.hasContent(); // has something to consume right now
334 if (!startNow)
335 return;
336
5a856d1e 337 theConsumer = new BodySink(this);
2b6b1bcb 338 AsyncJob::Start(theConsumer);
bf95c10a 339 debugs(91,7, "starting auto consumption" << status());
26ac0430 340 scheduleBodyDataNotification();
1b39caaa 341}
342
343MemBuf &
26ac0430
AJ
344BodyPipe::checkOut()
345{
346 assert(!isCheckedOut);
347 isCheckedOut = true;
348 return theBuf;
1b39caaa 349}
350
351void
352BodyPipe::checkIn(Checkout &checkout)
353{
26ac0430
AJ
354 assert(isCheckedOut);
355 isCheckedOut = false;
356 const size_t currentSize = theBuf.contentSize();
357 if (checkout.checkedOutSize > currentSize)
358 postConsume(checkout.checkedOutSize - currentSize);
e1381638
AJ
359 else if (checkout.checkedOutSize < currentSize)
360 postAppend(currentSize - checkout.checkedOutSize);
1b39caaa 361}
362
363void
364BodyPipe::undoCheckOut(Checkout &checkout)
365{
26ac0430
AJ
366 assert(isCheckedOut);
367 const size_t currentSize = theBuf.contentSize();
368 // We can only undo if size did not change, and even that carries
369 // some risk. If this becomes a problem, the code checking out
370 // raw buffers should always check them in (possibly unchanged)
371 // instead of relying on the automated undo mechanism of Checkout.
372 // The code can always use a temporary buffer to accomplish that.
87728a5b 373 Must(checkout.checkedOutSize == currentSize);
1b39caaa 374}
375
376// TODO: Optimize: inform consumer/producer about more data/space only if
377// they used the data/space since we notified them last time.
378
379void
26ac0430
AJ
380BodyPipe::postConsume(size_t size)
381{
382 assert(!isCheckedOut);
383 theGetSize += size;
bf95c10a 384 debugs(91,7, "consumed " << size << " bytes" << status());
26ac0430
AJ
385 if (mayNeedMoreData()) {
386 AsyncCall::Pointer call= asyncCall(91, 7,
387 "BodyProducer::noteMoreBodySpaceAvailable",
388 BodyProducerDialer(theProducer,
f53969cc 389 &BodyProducer::noteMoreBodySpaceAvailable, this));
26ac0430
AJ
390 ScheduleCallHere(call);
391 }
1b39caaa 392}
393
394void
26ac0430
AJ
395BodyPipe::postAppend(size_t size)
396{
397 assert(!isCheckedOut);
398 thePutSize += size;
bf95c10a 399 debugs(91,7, "added " << size << " bytes" << status());
1b39caaa 400
26ac0430
AJ
401 // We should not consume here even if mustAutoConsume because the
402 // caller may not be ready for the data to be consumed during this call.
403 scheduleBodyDataNotification();
1b39caaa 404
664dc267
AR
405 // Do this check after scheduleBodyDataNotification() to ensure the
406 // natural order of "more body data" and "production ended" events.
407 if (!mayNeedMoreData())
408 clearProducer(true); // reached end-of-body
409
b599471b 410 startAutoConsumptionIfNeeded();
1b39caaa 411}
412
6c56baf6 413void
414BodyPipe::scheduleBodyDataNotification()
415{
4299f876 416 if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
26ac0430
AJ
417 AsyncCall::Pointer call = asyncCall(91, 7,
418 "BodyConsumer::noteMoreBodyDataAvailable",
419 BodyConsumerDialer(theConsumer,
f53969cc 420 &BodyConsumer::noteMoreBodyDataAvailable, this));
26ac0430
AJ
421 ScheduleCallHere(call);
422 }
6c56baf6 423}
424
1b39caaa 425void
426BodyPipe::scheduleBodyEndNotification()
427{
4299f876 428 if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
26ac0430
AJ
429 if (bodySizeKnown() && bodySize() == thePutSize) {
430 AsyncCall::Pointer call = asyncCall(91, 7,
431 "BodyConsumer::noteBodyProductionEnded",
432 BodyConsumerDialer(theConsumer,
f53969cc 433 &BodyConsumer::noteBodyProductionEnded, this));
26ac0430
AJ
434 ScheduleCallHere(call);
435 } else {
436 AsyncCall::Pointer call = asyncCall(91, 7,
437 "BodyConsumer::noteBodyProducerAborted",
438 BodyConsumerDialer(theConsumer,
f53969cc 439 &BodyConsumer::noteBodyProducerAborted, this));
26ac0430
AJ
440 ScheduleCallHere(call);
441 }
442 }
1b39caaa 443}
444
1b39caaa 445// a short temporary string describing buffer status for debugging
446const char *BodyPipe::status() const
447{
350e2aec
FC
448 static MemBuf outputBuffer;
449 outputBuffer.reset();
1b39caaa 450
350e2aec 451 outputBuffer.append(" [", 2);
1b39caaa 452
4391cd15 453 outputBuffer.appendf("%" PRIu64 "<=%" PRIu64, theGetSize, thePutSize);
1b39caaa 454 if (theBodySize >= 0)
4391cd15 455 outputBuffer.appendf("<=%" PRId64, theBodySize);
26ac0430 456 else
350e2aec 457 outputBuffer.append("<=?", 3);
1b39caaa 458
63d8cc40 459 outputBuffer.appendf(" %" PRId64 "+%" PRId64, static_cast<int64_t>(theBuf.contentSize()), static_cast<int64_t>(theBuf.spaceSize()));
1b39caaa 460
4391cd15 461 outputBuffer.appendf(" pipe%p", this);
4299f876 462 if (theProducer.set())
4391cd15 463 outputBuffer.appendf(" prod%p", theProducer.get());
4299f876 464 if (theConsumer.set())
4391cd15 465 outputBuffer.appendf(" cons%p", theConsumer.get());
1b39caaa 466
26ac0430 467 if (mustAutoConsume)
350e2aec 468 outputBuffer.append(" A", 2);
abe286b8
AR
469 if (abortedConsumption)
470 outputBuffer.append(" !C", 3);
26ac0430 471 if (isCheckedOut)
350e2aec 472 outputBuffer.append(" L", 2); // Locked
1b39caaa 473
350e2aec 474 outputBuffer.append("]", 1);
1b39caaa 475
350e2aec 476 outputBuffer.terminate();
1b39caaa 477
350e2aec 478 return outputBuffer.content();
1b39caaa 479}
480
1b39caaa 481/* BodyPipeCheckout */
482
3be43416 483BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): thePipe(aPipe),
f53969cc
SM
484 buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
485 checkedOutSize(buf.contentSize()), checkedIn(false)
1b39caaa 486{
487}
488
489BodyPipeCheckout::~BodyPipeCheckout()
490{
87728a5b
AR
491 if (!checkedIn) {
492 // Do not pipe.undoCheckOut(*this) because it asserts or throws
493 // TODO: consider implementing the long-term solution discussed at
494 // http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html
bf95c10a 495 debugs(91,2, "Warning: cannot undo BodyPipeCheckout");
3be43416 496 thePipe.checkIn(*this);
87728a5b 497 }
1b39caaa 498}
499
500void
501BodyPipeCheckout::checkIn()
502{
26ac0430 503 assert(!checkedIn);
3be43416 504 thePipe.checkIn(*this);
26ac0430 505 checkedIn = true;
1b39caaa 506}
507
3be43416 508BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): thePipe(c.thePipe),
f53969cc
SM
509 buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
510 checkedIn(c.checkedIn)
1b39caaa 511{
26ac0430 512 assert(false); // prevent copying
1b39caaa 513}
514
515BodyPipeCheckout &
516BodyPipeCheckout::operator =(const BodyPipeCheckout &)
517{
26ac0430
AJ
518 assert(false); // prevent assignment
519 return *this;
1b39caaa 520}
f53969cc 521