]> git.ipfire.org Git - thirdparty/squid.git/blame - src/BodyPipe.cc
Remove missed configure.in entry for no_check helper
[thirdparty/squid.git] / src / BodyPipe.cc
CommitLineData
1b39caaa 1
2#include "squid.h"
3d93a84d 3#include "base/TextException.h"
1b39caaa 4#include "BodyPipe.h"
5
6CBDATA_CLASS_INIT(BodyPipe);
7
e7352f30 8// BodySink is a BodyConsumer class which just consume and drops
26ac0430
AJ
9// data from a BodyPipe
10class BodySink: public BodyConsumer
11{
e7352f30 12 bool done;
13public:
26ac0430 14 BodySink():AsyncJob("BodySink"), done(false) {}
e7352f30 15 virtual ~BodySink() {}
26ac0430 16
e7352f30 17 virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) {
26ac0430
AJ
18 size_t contentSize = bp->buf().contentSize();
19 bp->consume(contentSize);
e7352f30 20 }
21 virtual void noteBodyProductionEnded(BodyPipe::Pointer bp) {
26ac0430
AJ
22 stopConsumingFrom(bp);
23 done = true;
e7352f30 24 }
25 virtual void noteBodyProducerAborted(BodyPipe::Pointer bp) {
26ac0430
AJ
26 stopConsumingFrom(bp);
27 done = true;
e7352f30 28 }
29 bool doneAll() const {return done && AsyncJob::doneAll();}
30 CBDATA_CLASS2(BodySink);
31};
32
33CBDATA_CLASS_INIT(BodySink);
34
35// The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
26ac0430 36// In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
e7352f30 37// the BodyPipe passed as argument
38class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
39{
40public:
26ac0430 41 typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
e7352f30 42
26ac0430
AJ
43 BodyProducerDialer(BodyProducer *aProducer, Parent::Method aHandler,
44 BodyPipe::Pointer bp): Parent(aProducer, aHandler, bp) {}
e7352f30 45
26ac0430 46 virtual bool canDial(AsyncCall &call);
e7352f30 47};
48
49// The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
26ac0430 50// In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
e7352f30 51// of the BodyPipe passed as argument
52class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
53{
54public:
26ac0430 55 typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
e7352f30 56
26ac0430
AJ
57 BodyConsumerDialer(BodyConsumer *aConsumer, Parent::Method aHandler,
58 BodyPipe::Pointer bp): Parent(aConsumer, aHandler, bp) {}
e7352f30 59
26ac0430 60 virtual bool canDial(AsyncCall &call);
e7352f30 61};
62
63bool
26ac0430
AJ
64BodyProducerDialer::canDial(AsyncCall &call)
65{
66 if (!Parent::canDial(call))
67 return false;
68
69 BodyProducer *producer = object;
70 BodyPipe::Pointer pipe = arg1;
71 if (!pipe->stillProducing(producer)) {
72 debugs(call.debugSection, call.debugLevel, HERE << producer <<
73 " no longer producing for " << pipe->status());
74 return call.cancel("no longer producing");
75 }
e7352f30 76
26ac0430 77 return true;
e7352f30 78}
79
80bool
26ac0430
AJ
81BodyConsumerDialer::canDial(AsyncCall &call)
82{
83 if (!Parent::canDial(call))
84 return false;
85
86 BodyConsumer *consumer = object;
87 BodyPipe::Pointer pipe = arg1;
88 if (!pipe->stillConsuming(consumer)) {
89 debugs(call.debugSection, call.debugLevel, HERE << consumer <<
90 " no longer consuming from " << pipe->status());
91 return call.cancel("no longer consuming");
92 }
e7352f30 93
26ac0430 94 return true;
e7352f30 95}
96
97
98/* BodyProducer */
99
1b39caaa 100// inform the pipe that we are done and clear the Pointer
101void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
102{
26ac0430
AJ
103 debugs(91,7, HERE << this << " will not produce for " << pipe <<
104 "; atEof: " << atEof);
105 assert(pipe != NULL); // be strict: the caller state may depend on this
106 pipe->clearProducer(atEof);
107 pipe = NULL;
1b39caaa 108}
109
e7352f30 110
111
112/* BodyConsumer */
113
1b39caaa 114// inform the pipe that we are done and clear the Pointer
115void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
116{
26ac0430
AJ
117 debugs(91,7, HERE << this << " will not consume from " << pipe);
118 assert(pipe != NULL); // be strict: the caller state may depend on this
119 pipe->clearConsumer();
120 pipe = NULL;
1b39caaa 121}
122
e7352f30 123
1b39caaa 124/* BodyPipe */
125
126BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
26ac0430
AJ
127 theProducer(aProducer), theConsumer(0),
128 thePutSize(0), theGetSize(0),
129 mustAutoConsume(false), isCheckedOut(false)
1b39caaa 130{
26ac0430
AJ
131 // TODO: teach MemBuf to start with zero minSize
132 // TODO: limit maxSize by theBodySize, when known?
133 theBuf.init(2*1024, MaxCapacity);
134 debugs(91,7, HERE << "created BodyPipe" << status());
1b39caaa 135}
136
137BodyPipe::~BodyPipe()
138{
26ac0430
AJ
139 debugs(91,7, HERE << "destroying BodyPipe" << status());
140 assert(!theProducer);
141 assert(!theConsumer);
142 theBuf.clean();
1b39caaa 143}
144
47f6e231 145void BodyPipe::setBodySize(uint64_t aBodySize)
1b39caaa 146{
26ac0430
AJ
147 assert(!bodySizeKnown());
148 assert(aBodySize >= 0);
149 assert(thePutSize <= aBodySize);
1b39caaa 150
26ac0430
AJ
151 // If this assert fails, we need to add code to check for eof and inform
152 // the consumer about the eof condition via scheduleBodyEndNotification,
153 // because just setting a body size limit may trigger the eof condition.
154 assert(!theConsumer);
1b39caaa 155
26ac0430
AJ
156 theBodySize = aBodySize;
157 debugs(91,7, HERE << "set body size" << status());
1b39caaa 158}
159
47f6e231 160uint64_t BodyPipe::bodySize() const
1b39caaa 161{
26ac0430
AJ
162 assert(bodySizeKnown());
163 return static_cast<uint64_t>(theBodySize);
1b39caaa 164}
165
47f6e231 166bool BodyPipe::expectMoreAfter(uint64_t offset) const
1b39caaa 167{
26ac0430
AJ
168 assert(theGetSize <= offset);
169 return offset < thePutSize || // buffer has more now or
170 (!productionEnded() && mayNeedMoreData()); // buffer will have more
1b39caaa 171}
172
173bool BodyPipe::exhausted() const
174{
26ac0430 175 return !expectMoreAfter(theGetSize);
1b39caaa 176}
177
610d8f3b 178uint64_t BodyPipe::unproducedSize() const
1b39caaa 179{
26ac0430 180 return bodySize() - thePutSize; // bodySize() asserts that size is known
1b39caaa 181}
182
183void
184BodyPipe::clearProducer(bool atEof)
185{
26ac0430
AJ
186 if (theProducer) {
187 debugs(91,7, HERE << "clearing BodyPipe producer" << status());
188 theProducer = NULL;
189 if (atEof) {
190 if (!bodySizeKnown())
191 theBodySize = thePutSize;
e1381638
AJ
192 else if (bodySize() != thePutSize)
193 debugs(91,3, HERE << "aborting on premature eof" << status());
26ac0430
AJ
194 } else {
195 // asserta that we can detect the abort if the consumer joins later
196 assert(!bodySizeKnown() || bodySize() != thePutSize);
197 }
198 scheduleBodyEndNotification();
199 }
1b39caaa 200}
201
202size_t
350e2aec 203BodyPipe::putMoreData(const char *aBuffer, size_t size)
1b39caaa 204{
26ac0430 205 if (bodySizeKnown())
d85c3078 206 size = min((uint64_t)size, unproducedSize());
26ac0430
AJ
207
208 const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
d85c3078 209 if ((size = min(size, spaceSize))) {
350e2aec 210 theBuf.append(aBuffer, size);
26ac0430
AJ
211 postAppend(size);
212 return size;
213 }
214 return 0;
1b39caaa 215}
216
217bool
218BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
219{
26ac0430
AJ
220 assert(!theConsumer);
221 assert(aConsumer);
222
223 // TODO: convert this into an exception and remove IfNotLate suffix
224 // If there is something consumed already, we are in an auto-consuming mode
225 // and it is too late to attach a real consumer to the pipe.
226 if (theGetSize > 0) {
227 assert(mustAutoConsume);
228 return false;
229 }
1b39caaa 230
26ac0430
AJ
231 theConsumer = aConsumer;
232 debugs(91,7, HERE << "set consumer" << status());
233 if (theBuf.hasContent())
234 scheduleBodyDataNotification();
235 if (!theProducer)
236 scheduleBodyEndNotification();
1b39caaa 237
26ac0430 238 return true;
1b39caaa 239}
240
6c56baf6 241// When BodyPipe consumer is gone, all events for that consumer must not
242// reach the new consumer (if any). Otherwise, the calls may go out of order
26ac0430 243// (if _some_ calls are dropped due to the ultimate destination being
6c56baf6 244// temporary NULL). The code keeps track of the number of outstanding
245// events and skips that number if consumer leaves. TODO: when AscyncCall
246// support is improved, should we just schedule calls directly to consumer?
1b39caaa 247void
26ac0430
AJ
248BodyPipe::clearConsumer()
249{
250 if (theConsumer) {
251 debugs(91,7, HERE << "clearing consumer" << status());
252 theConsumer = NULL;
253 if (consumedSize() && !exhausted()) {
254 AsyncCall::Pointer call= asyncCall(91, 7,
255 "BodyProducer::noteBodyConsumerAborted",
256 BodyProducerDialer(theProducer,
257 &BodyProducer::noteBodyConsumerAborted, this));
258 ScheduleCallHere(call);
259 }
260 }
1b39caaa 261}
262
263size_t
350e2aec 264BodyPipe::getMoreData(MemBuf &aMemBuffer)
1b39caaa 265{
26ac0430
AJ
266 if (!theBuf.hasContent())
267 return 0; // did not touch the possibly uninitialized buf
1b39caaa 268
350e2aec
FC
269 if (aMemBuffer.isNull())
270 aMemBuffer.init();
271 const size_t size = min(theBuf.contentSize(), aMemBuffer.potentialSpaceSize());
272 aMemBuffer.append(theBuf.content(), size);
26ac0430
AJ
273 theBuf.consume(size);
274 postConsume(size);
275 return size; // cannot be zero if we called buf.init above
1b39caaa 276}
277
278void
279BodyPipe::consume(size_t size)
280{
26ac0430
AJ
281 theBuf.consume(size);
282 postConsume(size);
1b39caaa 283}
284
26ac0430 285// In the AutoConsumption mode the consumer has gone but the producer continues
e7352f30 286// producing data. We are using a BodySink BodyConsumer which just discards the produced data.
1b39caaa 287void
26ac0430
AJ
288BodyPipe::enableAutoConsumption()
289{
290 mustAutoConsume = true;
291 debugs(91,5, HERE << "enabled auto consumption" << status());
292 if (!theConsumer && theBuf.hasContent())
293 startAutoConsumption();
5121d48e
AR
294}
295
296// start auto consumption by creating body sink
297void
298BodyPipe::startAutoConsumption()
299{
26ac0430
AJ
300 Must(mustAutoConsume);
301 Must(!theConsumer);
302 theConsumer = new BodySink;
303 debugs(91,7, HERE << "starting auto consumption" << status());
304 scheduleBodyDataNotification();
1b39caaa 305}
306
307MemBuf &
26ac0430
AJ
308BodyPipe::checkOut()
309{
310 assert(!isCheckedOut);
311 isCheckedOut = true;
312 return theBuf;
1b39caaa 313}
314
315void
316BodyPipe::checkIn(Checkout &checkout)
317{
26ac0430
AJ
318 assert(isCheckedOut);
319 isCheckedOut = false;
320 const size_t currentSize = theBuf.contentSize();
321 if (checkout.checkedOutSize > currentSize)
322 postConsume(checkout.checkedOutSize - currentSize);
e1381638
AJ
323 else if (checkout.checkedOutSize < currentSize)
324 postAppend(currentSize - checkout.checkedOutSize);
1b39caaa 325}
326
327void
328BodyPipe::undoCheckOut(Checkout &checkout)
329{
26ac0430
AJ
330 assert(isCheckedOut);
331 const size_t currentSize = theBuf.contentSize();
332 // We can only undo if size did not change, and even that carries
333 // some risk. If this becomes a problem, the code checking out
334 // raw buffers should always check them in (possibly unchanged)
335 // instead of relying on the automated undo mechanism of Checkout.
336 // The code can always use a temporary buffer to accomplish that.
337 assert(checkout.checkedOutSize == currentSize);
1b39caaa 338}
339
340// TODO: Optimize: inform consumer/producer about more data/space only if
341// they used the data/space since we notified them last time.
342
343void
26ac0430
AJ
344BodyPipe::postConsume(size_t size)
345{
346 assert(!isCheckedOut);
347 theGetSize += size;
348 debugs(91,7, HERE << "consumed " << size << " bytes" << status());
349 if (mayNeedMoreData()) {
350 AsyncCall::Pointer call= asyncCall(91, 7,
351 "BodyProducer::noteMoreBodySpaceAvailable",
352 BodyProducerDialer(theProducer,
353 &BodyProducer::noteMoreBodySpaceAvailable, this));
354 ScheduleCallHere(call);
355 }
1b39caaa 356}
357
358void
26ac0430
AJ
359BodyPipe::postAppend(size_t size)
360{
361 assert(!isCheckedOut);
362 thePutSize += size;
363 debugs(91,7, HERE << "added " << size << " bytes" << status());
1b39caaa 364
26ac0430
AJ
365 if (mustAutoConsume && !theConsumer && size > 0)
366 startAutoConsumption();
5121d48e 367
26ac0430
AJ
368 // We should not consume here even if mustAutoConsume because the
369 // caller may not be ready for the data to be consumed during this call.
370 scheduleBodyDataNotification();
1b39caaa 371
26ac0430
AJ
372 if (!mayNeedMoreData())
373 clearProducer(true); // reached end-of-body
1b39caaa 374}
375
376
6c56baf6 377void
378BodyPipe::scheduleBodyDataNotification()
379{
26ac0430
AJ
380 if (theConsumer) {
381 AsyncCall::Pointer call = asyncCall(91, 7,
382 "BodyConsumer::noteMoreBodyDataAvailable",
383 BodyConsumerDialer(theConsumer,
384 &BodyConsumer::noteMoreBodyDataAvailable, this));
385 ScheduleCallHere(call);
386 }
6c56baf6 387}
388
1b39caaa 389void
390BodyPipe::scheduleBodyEndNotification()
391{
26ac0430
AJ
392 if (theConsumer) {
393 if (bodySizeKnown() && bodySize() == thePutSize) {
394 AsyncCall::Pointer call = asyncCall(91, 7,
395 "BodyConsumer::noteBodyProductionEnded",
396 BodyConsumerDialer(theConsumer,
397 &BodyConsumer::noteBodyProductionEnded, this));
398 ScheduleCallHere(call);
399 } else {
400 AsyncCall::Pointer call = asyncCall(91, 7,
401 "BodyConsumer::noteBodyProducerAborted",
402 BodyConsumerDialer(theConsumer,
403 &BodyConsumer::noteBodyProducerAborted, this));
404 ScheduleCallHere(call);
405 }
406 }
1b39caaa 407}
408
1b39caaa 409// a short temporary string describing buffer status for debugging
410const char *BodyPipe::status() const
411{
350e2aec
FC
412 static MemBuf outputBuffer;
413 outputBuffer.reset();
1b39caaa 414
350e2aec 415 outputBuffer.append(" [", 2);
1b39caaa 416
350e2aec 417 outputBuffer.Printf("%"PRIu64"<=%"PRIu64, theGetSize, thePutSize);
1b39caaa 418 if (theBodySize >= 0)
350e2aec 419 outputBuffer.Printf("<=%"PRId64, theBodySize);
26ac0430 420 else
350e2aec 421 outputBuffer.append("<=?", 3);
1b39caaa 422
350e2aec 423 outputBuffer.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
1b39caaa 424
350e2aec 425 outputBuffer.Printf(" pipe%p", this);
1b39caaa 426 if (theProducer)
350e2aec 427 outputBuffer.Printf(" prod%p", theProducer);
1b39caaa 428 if (theConsumer)
350e2aec 429 outputBuffer.Printf(" cons%p", theConsumer);
1b39caaa 430
26ac0430 431 if (mustAutoConsume)
350e2aec 432 outputBuffer.append(" A", 2);
26ac0430 433 if (isCheckedOut)
350e2aec 434 outputBuffer.append(" L", 2); // Locked
1b39caaa 435
350e2aec 436 outputBuffer.append("]", 1);
1b39caaa 437
350e2aec 438 outputBuffer.terminate();
1b39caaa 439
350e2aec 440 return outputBuffer.content();
1b39caaa 441}
442
443
444/* BodyPipeCheckout */
445
446BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe),
26ac0430
AJ
447 buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
448 checkedOutSize(buf.contentSize()), checkedIn(false)
1b39caaa 449{
450}
451
452BodyPipeCheckout::~BodyPipeCheckout()
453{
26ac0430
AJ
454 if (!checkedIn)
455 pipe.undoCheckOut(*this);
1b39caaa 456}
457
458void
459BodyPipeCheckout::checkIn()
460{
26ac0430
AJ
461 assert(!checkedIn);
462 pipe.checkIn(*this);
463 checkedIn = true;
1b39caaa 464}
465
466
467BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe),
26ac0430
AJ
468 buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
469 checkedIn(c.checkedIn)
1b39caaa 470{
26ac0430 471 assert(false); // prevent copying
1b39caaa 472}
473
474BodyPipeCheckout &
475BodyPipeCheckout::operator =(const BodyPipeCheckout &)
476{
26ac0430
AJ
477 assert(false); // prevent assignment
478 return *this;
1b39caaa 479}