]> git.ipfire.org Git - thirdparty/squid.git/blob - src/BodyPipe.cc
merge from trunk
[thirdparty/squid.git] / src / BodyPipe.cc
1
2 #include "squid.h"
3 #include "base/AsyncJobCalls.h"
4 #include "base/TextException.h"
5 #include "BodyPipe.h"
6
7 CBDATA_CLASS_INIT(BodyPipe);
8
9 // BodySink is a BodyConsumer class which just consume and drops
10 // data from a BodyPipe
11 class BodySink: public BodyConsumer
12 {
13 bool done;
14 public:
15 BodySink():AsyncJob("BodySink"), done(false) {}
16 virtual ~BodySink() {}
17
18 virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) {
19 size_t contentSize = bp->buf().contentSize();
20 bp->consume(contentSize);
21 }
22 virtual void noteBodyProductionEnded(BodyPipe::Pointer bp) {
23 stopConsumingFrom(bp);
24 done = true;
25 }
26 virtual void noteBodyProducerAborted(BodyPipe::Pointer bp) {
27 stopConsumingFrom(bp);
28 done = true;
29 }
30 bool doneAll() const {return done && AsyncJob::doneAll();}
31 CBDATA_CLASS2(BodySink);
32 };
33
34 CBDATA_CLASS_INIT(BodySink);
35
36 // The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
37 // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
38 // the BodyPipe passed as argument
39 class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
40 {
41 public:
42 typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
43
44 BodyProducerDialer(const BodyProducer::Pointer &aProducer,
45 Parent::Method aHandler, BodyPipe::Pointer bp):
46 Parent(aProducer, aHandler, bp) {}
47
48 virtual bool canDial(AsyncCall &call);
49 };
50
51 // The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
52 // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
53 // of the BodyPipe passed as argument
54 class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
55 {
56 public:
57 typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
58
59 BodyConsumerDialer(const BodyConsumer::Pointer &aConsumer,
60 Parent::Method aHandler, BodyPipe::Pointer bp):
61 Parent(aConsumer, aHandler, bp) {}
62
63 virtual bool canDial(AsyncCall &call);
64 };
65
66 bool
67 BodyProducerDialer::canDial(AsyncCall &call)
68 {
69 if (!Parent::canDial(call))
70 return false;
71
72 const BodyProducer::Pointer &producer = job;
73 BodyPipe::Pointer pipe = arg1;
74 if (!pipe->stillProducing(producer)) {
75 debugs(call.debugSection, call.debugLevel, HERE << producer <<
76 " no longer producing for " << pipe->status());
77 return call.cancel("no longer producing");
78 }
79
80 return true;
81 }
82
83 bool
84 BodyConsumerDialer::canDial(AsyncCall &call)
85 {
86 if (!Parent::canDial(call))
87 return false;
88
89 const BodyConsumer::Pointer &consumer = job;
90 BodyPipe::Pointer pipe = arg1;
91 if (!pipe->stillConsuming(consumer)) {
92 debugs(call.debugSection, call.debugLevel, HERE << consumer <<
93 " no longer consuming from " << pipe->status());
94 return call.cancel("no longer consuming");
95 }
96
97 return true;
98 }
99
100
101 /* BodyProducer */
102
103 // inform the pipe that we are done and clear the Pointer
104 void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
105 {
106 debugs(91,7, HERE << this << " will not produce for " << pipe <<
107 "; atEof: " << atEof);
108 assert(pipe != NULL); // be strict: the caller state may depend on this
109 pipe->clearProducer(atEof);
110 pipe = NULL;
111 }
112
113
114
115 /* BodyConsumer */
116
117 // inform the pipe that we are done and clear the Pointer
118 void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
119 {
120 debugs(91,7, HERE << this << " will not consume from " << pipe);
121 assert(pipe != NULL); // be strict: the caller state may depend on this
122 pipe->clearConsumer();
123 pipe = NULL;
124 }
125
126
127 /* BodyPipe */
128
129 BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
130 theProducer(aProducer), theConsumer(0),
131 thePutSize(0), theGetSize(0),
132 mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
133 {
134 // TODO: teach MemBuf to start with zero minSize
135 // TODO: limit maxSize by theBodySize, when known?
136 theBuf.init(2*1024, MaxCapacity);
137 debugs(91,7, HERE << "created BodyPipe" << status());
138 }
139
140 BodyPipe::~BodyPipe()
141 {
142 debugs(91,7, HERE << "destroying BodyPipe" << status());
143 assert(!theProducer);
144 assert(!theConsumer);
145 theBuf.clean();
146 }
147
148 void BodyPipe::setBodySize(uint64_t aBodySize)
149 {
150 assert(!bodySizeKnown());
151 assert(thePutSize <= aBodySize);
152
153 // If this assert fails, we need to add code to check for eof and inform
154 // the consumer about the eof condition via scheduleBodyEndNotification,
155 // because just setting a body size limit may trigger the eof condition.
156 assert(!theConsumer);
157
158 theBodySize = aBodySize;
159 debugs(91,7, HERE << "set body size" << status());
160 }
161
162 uint64_t BodyPipe::bodySize() const
163 {
164 assert(bodySizeKnown());
165 return static_cast<uint64_t>(theBodySize);
166 }
167
168 bool BodyPipe::expectMoreAfter(uint64_t offset) const
169 {
170 assert(theGetSize <= offset);
171 return offset < thePutSize || // buffer has more now or
172 (!productionEnded() && mayNeedMoreData()); // buffer will have more
173 }
174
175 bool BodyPipe::exhausted() const
176 {
177 return !expectMoreAfter(theGetSize);
178 }
179
180 uint64_t BodyPipe::unproducedSize() const
181 {
182 return bodySize() - thePutSize; // bodySize() asserts that size is known
183 }
184
185 void BodyPipe::expectProductionEndAfter(uint64_t size)
186 {
187 const uint64_t expectedSize = thePutSize + size;
188 if (bodySizeKnown())
189 Must(bodySize() == expectedSize);
190 else
191 theBodySize = expectedSize;
192 }
193
194 void
195 BodyPipe::clearProducer(bool atEof)
196 {
197 if (theProducer.set()) {
198 debugs(91,7, HERE << "clearing BodyPipe producer" << status());
199 theProducer.clear();
200 if (atEof) {
201 if (!bodySizeKnown())
202 theBodySize = thePutSize;
203 else if (bodySize() != thePutSize)
204 debugs(91,3, HERE << "aborting on premature eof" << status());
205 } else {
206 // asserta that we can detect the abort if the consumer joins later
207 assert(!bodySizeKnown() || bodySize() != thePutSize);
208 }
209 scheduleBodyEndNotification();
210 }
211 }
212
213 size_t
214 BodyPipe::putMoreData(const char *aBuffer, size_t size)
215 {
216 if (bodySizeKnown())
217 size = min((uint64_t)size, unproducedSize());
218
219 const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
220 if ((size = min(size, spaceSize))) {
221 theBuf.append(aBuffer, size);
222 postAppend(size);
223 return size;
224 }
225 return 0;
226 }
227
228 bool
229 BodyPipe::setConsumerIfNotLate(const Consumer::Pointer &aConsumer)
230 {
231 assert(!theConsumer);
232 assert(aConsumer.set()); // but might be invalid
233
234 // TODO: convert this into an exception and remove IfNotLate suffix
235 // If there is something consumed already, we are in an auto-consuming mode
236 // and it is too late to attach a real consumer to the pipe.
237 if (theGetSize > 0) {
238 assert(mustAutoConsume);
239 return false;
240 }
241
242 Must(!abortedConsumption); // did not promise to never consume
243
244 theConsumer = aConsumer;
245 debugs(91,7, HERE << "set consumer" << status());
246 if (theBuf.hasContent())
247 scheduleBodyDataNotification();
248 if (!theProducer)
249 scheduleBodyEndNotification();
250
251 return true;
252 }
253
254 void
255 BodyPipe::clearConsumer()
256 {
257 if (theConsumer.set()) {
258 debugs(91,7, HERE << "clearing consumer" << status());
259 theConsumer.clear();
260 // do not abort if we have not consumed so that HTTP or ICAP can retry
261 // benign xaction failures due to persistent connection race conditions
262 if (consumedSize())
263 expectNoConsumption();
264 }
265 }
266
267 void
268 BodyPipe::expectNoConsumption()
269 {
270 Must(!theConsumer);
271 if (!abortedConsumption && !exhausted()) {
272 AsyncCall::Pointer call= asyncCall(91, 7,
273 "BodyProducer::noteBodyConsumerAborted",
274 BodyProducerDialer(theProducer,
275 &BodyProducer::noteBodyConsumerAborted, this));
276 ScheduleCallHere(call);
277 abortedConsumption = true;
278 }
279 }
280
281 size_t
282 BodyPipe::getMoreData(MemBuf &aMemBuffer)
283 {
284 if (!theBuf.hasContent())
285 return 0; // did not touch the possibly uninitialized buf
286
287 if (aMemBuffer.isNull())
288 aMemBuffer.init();
289 const size_t size = min(theBuf.contentSize(), aMemBuffer.potentialSpaceSize());
290 aMemBuffer.append(theBuf.content(), size);
291 theBuf.consume(size);
292 postConsume(size);
293 return size; // cannot be zero if we called buf.init above
294 }
295
296 void
297 BodyPipe::consume(size_t size)
298 {
299 theBuf.consume(size);
300 postConsume(size);
301 }
302
303 // In the AutoConsumption mode the consumer has gone but the producer continues
304 // producing data. We are using a BodySink BodyConsumer which just discards the produced data.
305 void
306 BodyPipe::enableAutoConsumption()
307 {
308 mustAutoConsume = true;
309 debugs(91,5, HERE << "enabled auto consumption" << status());
310 if (!theConsumer && theBuf.hasContent())
311 startAutoConsumption();
312 }
313
314 // start auto consumption by creating body sink
315 void
316 BodyPipe::startAutoConsumption()
317 {
318 Must(mustAutoConsume);
319 Must(!theConsumer);
320 theConsumer = new BodySink;
321 debugs(91,7, HERE << "starting auto consumption" << status());
322 scheduleBodyDataNotification();
323 }
324
325 MemBuf &
326 BodyPipe::checkOut()
327 {
328 assert(!isCheckedOut);
329 isCheckedOut = true;
330 return theBuf;
331 }
332
333 void
334 BodyPipe::checkIn(Checkout &checkout)
335 {
336 assert(isCheckedOut);
337 isCheckedOut = false;
338 const size_t currentSize = theBuf.contentSize();
339 if (checkout.checkedOutSize > currentSize)
340 postConsume(checkout.checkedOutSize - currentSize);
341 else if (checkout.checkedOutSize < currentSize)
342 postAppend(currentSize - checkout.checkedOutSize);
343 }
344
345 void
346 BodyPipe::undoCheckOut(Checkout &checkout)
347 {
348 assert(isCheckedOut);
349 const size_t currentSize = theBuf.contentSize();
350 // We can only undo if size did not change, and even that carries
351 // some risk. If this becomes a problem, the code checking out
352 // raw buffers should always check them in (possibly unchanged)
353 // instead of relying on the automated undo mechanism of Checkout.
354 // The code can always use a temporary buffer to accomplish that.
355 Must(checkout.checkedOutSize == currentSize);
356 }
357
358 // TODO: Optimize: inform consumer/producer about more data/space only if
359 // they used the data/space since we notified them last time.
360
361 void
362 BodyPipe::postConsume(size_t size)
363 {
364 assert(!isCheckedOut);
365 theGetSize += size;
366 debugs(91,7, HERE << "consumed " << size << " bytes" << status());
367 if (mayNeedMoreData()) {
368 AsyncCall::Pointer call= asyncCall(91, 7,
369 "BodyProducer::noteMoreBodySpaceAvailable",
370 BodyProducerDialer(theProducer,
371 &BodyProducer::noteMoreBodySpaceAvailable, this));
372 ScheduleCallHere(call);
373 }
374 }
375
376 void
377 BodyPipe::postAppend(size_t size)
378 {
379 assert(!isCheckedOut);
380 thePutSize += size;
381 debugs(91,7, HERE << "added " << size << " bytes" << status());
382
383 if (mustAutoConsume && !theConsumer && size > 0)
384 startAutoConsumption();
385
386 // We should not consume here even if mustAutoConsume because the
387 // caller may not be ready for the data to be consumed during this call.
388 scheduleBodyDataNotification();
389
390 if (!mayNeedMoreData())
391 clearProducer(true); // reached end-of-body
392 }
393
394
395 void
396 BodyPipe::scheduleBodyDataNotification()
397 {
398 if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
399 AsyncCall::Pointer call = asyncCall(91, 7,
400 "BodyConsumer::noteMoreBodyDataAvailable",
401 BodyConsumerDialer(theConsumer,
402 &BodyConsumer::noteMoreBodyDataAvailable, this));
403 ScheduleCallHere(call);
404 }
405 }
406
407 void
408 BodyPipe::scheduleBodyEndNotification()
409 {
410 if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
411 if (bodySizeKnown() && bodySize() == thePutSize) {
412 AsyncCall::Pointer call = asyncCall(91, 7,
413 "BodyConsumer::noteBodyProductionEnded",
414 BodyConsumerDialer(theConsumer,
415 &BodyConsumer::noteBodyProductionEnded, this));
416 ScheduleCallHere(call);
417 } else {
418 AsyncCall::Pointer call = asyncCall(91, 7,
419 "BodyConsumer::noteBodyProducerAborted",
420 BodyConsumerDialer(theConsumer,
421 &BodyConsumer::noteBodyProducerAborted, this));
422 ScheduleCallHere(call);
423 }
424 }
425 }
426
427 // a short temporary string describing buffer status for debugging
428 const char *BodyPipe::status() const
429 {
430 static MemBuf outputBuffer;
431 outputBuffer.reset();
432
433 outputBuffer.append(" [", 2);
434
435 outputBuffer.Printf("%"PRIu64"<=%"PRIu64, theGetSize, thePutSize);
436 if (theBodySize >= 0)
437 outputBuffer.Printf("<=%"PRId64, theBodySize);
438 else
439 outputBuffer.append("<=?", 3);
440
441 outputBuffer.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
442
443 outputBuffer.Printf(" pipe%p", this);
444 if (theProducer.set())
445 outputBuffer.Printf(" prod%p", theProducer.get());
446 if (theConsumer.set())
447 outputBuffer.Printf(" cons%p", theConsumer.get());
448
449 if (mustAutoConsume)
450 outputBuffer.append(" A", 2);
451 if (abortedConsumption)
452 outputBuffer.append(" !C", 3);
453 if (isCheckedOut)
454 outputBuffer.append(" L", 2); // Locked
455
456 outputBuffer.append("]", 1);
457
458 outputBuffer.terminate();
459
460 return outputBuffer.content();
461 }
462
463
464 /* BodyPipeCheckout */
465
466 BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe),
467 buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
468 checkedOutSize(buf.contentSize()), checkedIn(false)
469 {
470 }
471
472 BodyPipeCheckout::~BodyPipeCheckout()
473 {
474 if (!checkedIn) {
475 // Do not pipe.undoCheckOut(*this) because it asserts or throws
476 // TODO: consider implementing the long-term solution discussed at
477 // http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html
478 debugs(91,2, HERE << "Warning: cannot undo BodyPipeCheckout");
479 pipe.checkIn(*this);
480 }
481 }
482
483 void
484 BodyPipeCheckout::checkIn()
485 {
486 assert(!checkedIn);
487 pipe.checkIn(*this);
488 checkedIn = true;
489 }
490
491
492 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe),
493 buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
494 checkedIn(c.checkedIn)
495 {
496 assert(false); // prevent copying
497 }
498
499 BodyPipeCheckout &
500 BodyPipeCheckout::operator =(const BodyPipeCheckout &)
501 {
502 assert(false); // prevent assignment
503 return *this;
504 }