]> git.ipfire.org Git - thirdparty/squid.git/blob - src/BodyPipe.cc
Merged from trunk
[thirdparty/squid.git] / src / BodyPipe.cc
1
2 #include "squid.h"
3 #include "base/AsyncJobCalls.h"
4 #include "base/TextException.h"
5 #include "BodyPipe.h"
6
7 CBDATA_CLASS_INIT(BodyPipe);
8
9 // BodySink is a BodyConsumer class which just consume and drops
10 // data from a BodyPipe
11 class BodySink: public BodyConsumer
12 {
13 bool done;
14 public:
15 BodySink():AsyncJob("BodySink"), done(false) {}
16 virtual ~BodySink() {}
17
18 virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) {
19 size_t contentSize = bp->buf().contentSize();
20 bp->consume(contentSize);
21 }
22 virtual void noteBodyProductionEnded(BodyPipe::Pointer bp) {
23 stopConsumingFrom(bp);
24 done = true;
25 }
26 virtual void noteBodyProducerAborted(BodyPipe::Pointer bp) {
27 stopConsumingFrom(bp);
28 done = true;
29 }
30 bool doneAll() const {return done && AsyncJob::doneAll();}
31 CBDATA_CLASS2(BodySink);
32 };
33
34 CBDATA_CLASS_INIT(BodySink);
35
36 // The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
37 // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
38 // the BodyPipe passed as argument
39 class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
40 {
41 public:
42 typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
43
44 BodyProducerDialer(const BodyProducer::Pointer &aProducer,
45 Parent::Method aHandler, BodyPipe::Pointer bp):
46 Parent(aProducer, aHandler, bp) {}
47
48 virtual bool canDial(AsyncCall &call);
49 };
50
51 // The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
52 // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
53 // of the BodyPipe passed as argument
54 class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
55 {
56 public:
57 typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
58
59 BodyConsumerDialer(const BodyConsumer::Pointer &aConsumer,
60 Parent::Method aHandler, BodyPipe::Pointer bp):
61 Parent(aConsumer, aHandler, bp) {}
62
63 virtual bool canDial(AsyncCall &call);
64 };
65
66 bool
67 BodyProducerDialer::canDial(AsyncCall &call)
68 {
69 if (!Parent::canDial(call))
70 return false;
71
72 const BodyProducer::Pointer &producer = job;
73 BodyPipe::Pointer pipe = arg1;
74 if (!pipe->stillProducing(producer)) {
75 debugs(call.debugSection, call.debugLevel, HERE << producer <<
76 " no longer producing for " << pipe->status());
77 return call.cancel("no longer producing");
78 }
79
80 return true;
81 }
82
83 bool
84 BodyConsumerDialer::canDial(AsyncCall &call)
85 {
86 if (!Parent::canDial(call))
87 return false;
88
89 const BodyConsumer::Pointer &consumer = job;
90 BodyPipe::Pointer pipe = arg1;
91 if (!pipe->stillConsuming(consumer)) {
92 debugs(call.debugSection, call.debugLevel, HERE << consumer <<
93 " no longer consuming from " << pipe->status());
94 return call.cancel("no longer consuming");
95 }
96
97 return true;
98 }
99
100
101 /* BodyProducer */
102
103 // inform the pipe that we are done and clear the Pointer
104 void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
105 {
106 debugs(91,7, HERE << this << " will not produce for " << pipe <<
107 "; atEof: " << atEof);
108 assert(pipe != NULL); // be strict: the caller state may depend on this
109 pipe->clearProducer(atEof);
110 pipe = NULL;
111 }
112
113
114
115 /* BodyConsumer */
116
117 // inform the pipe that we are done and clear the Pointer
118 void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
119 {
120 debugs(91,7, HERE << this << " will not consume from " << pipe);
121 assert(pipe != NULL); // be strict: the caller state may depend on this
122 pipe->clearConsumer();
123 pipe = NULL;
124 }
125
126
127 /* BodyPipe */
128
129 BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
130 theProducer(aProducer), theConsumer(0),
131 thePutSize(0), theGetSize(0),
132 mustAutoConsume(false), isCheckedOut(false)
133 {
134 // TODO: teach MemBuf to start with zero minSize
135 // TODO: limit maxSize by theBodySize, when known?
136 theBuf.init(2*1024, MaxCapacity);
137 debugs(91,7, HERE << "created BodyPipe" << status());
138 }
139
140 BodyPipe::~BodyPipe()
141 {
142 debugs(91,7, HERE << "destroying BodyPipe" << status());
143 assert(!theProducer);
144 assert(!theConsumer);
145 theBuf.clean();
146 }
147
148 void BodyPipe::setBodySize(uint64_t aBodySize)
149 {
150 assert(!bodySizeKnown());
151 assert(aBodySize >= 0);
152 assert(thePutSize <= aBodySize);
153
154 // If this assert fails, we need to add code to check for eof and inform
155 // the consumer about the eof condition via scheduleBodyEndNotification,
156 // because just setting a body size limit may trigger the eof condition.
157 assert(!theConsumer);
158
159 theBodySize = aBodySize;
160 debugs(91,7, HERE << "set body size" << status());
161 }
162
163 uint64_t BodyPipe::bodySize() const
164 {
165 assert(bodySizeKnown());
166 return static_cast<uint64_t>(theBodySize);
167 }
168
169 bool BodyPipe::expectMoreAfter(uint64_t offset) const
170 {
171 assert(theGetSize <= offset);
172 return offset < thePutSize || // buffer has more now or
173 (!productionEnded() && mayNeedMoreData()); // buffer will have more
174 }
175
176 bool BodyPipe::exhausted() const
177 {
178 return !expectMoreAfter(theGetSize);
179 }
180
181 uint64_t BodyPipe::unproducedSize() const
182 {
183 return bodySize() - thePutSize; // bodySize() asserts that size is known
184 }
185
186 void BodyPipe::expectProductionEndAfter(uint64_t size)
187 {
188 const uint64_t expectedSize = thePutSize + size;
189 if (bodySizeKnown())
190 Must(bodySize() == expectedSize);
191 else
192 theBodySize = expectedSize;
193 }
194
195 void
196 BodyPipe::clearProducer(bool atEof)
197 {
198 if (theProducer.set()) {
199 debugs(91,7, HERE << "clearing BodyPipe producer" << status());
200 theProducer.clear();
201 if (atEof) {
202 if (!bodySizeKnown())
203 theBodySize = thePutSize;
204 else if (bodySize() != thePutSize)
205 debugs(91,3, HERE << "aborting on premature eof" << status());
206 } else {
207 // asserta that we can detect the abort if the consumer joins later
208 assert(!bodySizeKnown() || bodySize() != thePutSize);
209 }
210 scheduleBodyEndNotification();
211 }
212 }
213
214 size_t
215 BodyPipe::putMoreData(const char *aBuffer, size_t size)
216 {
217 if (bodySizeKnown())
218 size = min((uint64_t)size, unproducedSize());
219
220 const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
221 if ((size = min(size, spaceSize))) {
222 theBuf.append(aBuffer, size);
223 postAppend(size);
224 return size;
225 }
226 return 0;
227 }
228
229 bool
230 BodyPipe::setConsumerIfNotLate(const Consumer::Pointer &aConsumer)
231 {
232 assert(!theConsumer);
233 assert(aConsumer.set()); // but might be invalid
234
235 // TODO: convert this into an exception and remove IfNotLate suffix
236 // If there is something consumed already, we are in an auto-consuming mode
237 // and it is too late to attach a real consumer to the pipe.
238 if (theGetSize > 0) {
239 assert(mustAutoConsume);
240 return false;
241 }
242
243 theConsumer = aConsumer;
244 debugs(91,7, HERE << "set consumer" << status());
245 if (theBuf.hasContent())
246 scheduleBodyDataNotification();
247 if (!theProducer)
248 scheduleBodyEndNotification();
249
250 return true;
251 }
252
253 // When BodyPipe consumer is gone, all events for that consumer must not
254 // reach the new consumer (if any). Otherwise, the calls may go out of order
255 // (if _some_ calls are dropped due to the ultimate destination being
256 // temporary NULL). The code keeps track of the number of outstanding
257 // events and skips that number if consumer leaves. TODO: when AscyncCall
258 // support is improved, should we just schedule calls directly to consumer?
259 void
260 BodyPipe::clearConsumer()
261 {
262 if (theConsumer.set()) {
263 debugs(91,7, HERE << "clearing consumer" << status());
264 theConsumer.clear();
265 if (consumedSize() && !exhausted()) {
266 AsyncCall::Pointer call= asyncCall(91, 7,
267 "BodyProducer::noteBodyConsumerAborted",
268 BodyProducerDialer(theProducer,
269 &BodyProducer::noteBodyConsumerAborted, this));
270 ScheduleCallHere(call);
271 }
272 }
273 }
274
275 size_t
276 BodyPipe::getMoreData(MemBuf &aMemBuffer)
277 {
278 if (!theBuf.hasContent())
279 return 0; // did not touch the possibly uninitialized buf
280
281 if (aMemBuffer.isNull())
282 aMemBuffer.init();
283 const size_t size = min(theBuf.contentSize(), aMemBuffer.potentialSpaceSize());
284 aMemBuffer.append(theBuf.content(), size);
285 theBuf.consume(size);
286 postConsume(size);
287 return size; // cannot be zero if we called buf.init above
288 }
289
290 void
291 BodyPipe::consume(size_t size)
292 {
293 theBuf.consume(size);
294 postConsume(size);
295 }
296
297 // In the AutoConsumption mode the consumer has gone but the producer continues
298 // producing data. We are using a BodySink BodyConsumer which just discards the produced data.
299 void
300 BodyPipe::enableAutoConsumption()
301 {
302 mustAutoConsume = true;
303 debugs(91,5, HERE << "enabled auto consumption" << status());
304 if (!theConsumer && theBuf.hasContent())
305 startAutoConsumption();
306 }
307
308 // start auto consumption by creating body sink
309 void
310 BodyPipe::startAutoConsumption()
311 {
312 Must(mustAutoConsume);
313 Must(!theConsumer);
314 theConsumer = new BodySink;
315 debugs(91,7, HERE << "starting auto consumption" << status());
316 scheduleBodyDataNotification();
317 }
318
319 MemBuf &
320 BodyPipe::checkOut()
321 {
322 assert(!isCheckedOut);
323 isCheckedOut = true;
324 return theBuf;
325 }
326
327 void
328 BodyPipe::checkIn(Checkout &checkout)
329 {
330 assert(isCheckedOut);
331 isCheckedOut = false;
332 const size_t currentSize = theBuf.contentSize();
333 if (checkout.checkedOutSize > currentSize)
334 postConsume(checkout.checkedOutSize - currentSize);
335 else if (checkout.checkedOutSize < currentSize)
336 postAppend(currentSize - checkout.checkedOutSize);
337 }
338
339 void
340 BodyPipe::undoCheckOut(Checkout &checkout)
341 {
342 assert(isCheckedOut);
343 const size_t currentSize = theBuf.contentSize();
344 // We can only undo if size did not change, and even that carries
345 // some risk. If this becomes a problem, the code checking out
346 // raw buffers should always check them in (possibly unchanged)
347 // instead of relying on the automated undo mechanism of Checkout.
348 // The code can always use a temporary buffer to accomplish that.
349 assert(checkout.checkedOutSize == currentSize);
350 }
351
352 // TODO: Optimize: inform consumer/producer about more data/space only if
353 // they used the data/space since we notified them last time.
354
355 void
356 BodyPipe::postConsume(size_t size)
357 {
358 assert(!isCheckedOut);
359 theGetSize += size;
360 debugs(91,7, HERE << "consumed " << size << " bytes" << status());
361 if (mayNeedMoreData()) {
362 AsyncCall::Pointer call= asyncCall(91, 7,
363 "BodyProducer::noteMoreBodySpaceAvailable",
364 BodyProducerDialer(theProducer,
365 &BodyProducer::noteMoreBodySpaceAvailable, this));
366 ScheduleCallHere(call);
367 }
368 }
369
370 void
371 BodyPipe::postAppend(size_t size)
372 {
373 assert(!isCheckedOut);
374 thePutSize += size;
375 debugs(91,7, HERE << "added " << size << " bytes" << status());
376
377 if (mustAutoConsume && !theConsumer && size > 0)
378 startAutoConsumption();
379
380 // We should not consume here even if mustAutoConsume because the
381 // caller may not be ready for the data to be consumed during this call.
382 scheduleBodyDataNotification();
383
384 if (!mayNeedMoreData())
385 clearProducer(true); // reached end-of-body
386 }
387
388
389 void
390 BodyPipe::scheduleBodyDataNotification()
391 {
392 if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
393 AsyncCall::Pointer call = asyncCall(91, 7,
394 "BodyConsumer::noteMoreBodyDataAvailable",
395 BodyConsumerDialer(theConsumer,
396 &BodyConsumer::noteMoreBodyDataAvailable, this));
397 ScheduleCallHere(call);
398 }
399 }
400
401 void
402 BodyPipe::scheduleBodyEndNotification()
403 {
404 if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
405 if (bodySizeKnown() && bodySize() == thePutSize) {
406 AsyncCall::Pointer call = asyncCall(91, 7,
407 "BodyConsumer::noteBodyProductionEnded",
408 BodyConsumerDialer(theConsumer,
409 &BodyConsumer::noteBodyProductionEnded, this));
410 ScheduleCallHere(call);
411 } else {
412 AsyncCall::Pointer call = asyncCall(91, 7,
413 "BodyConsumer::noteBodyProducerAborted",
414 BodyConsumerDialer(theConsumer,
415 &BodyConsumer::noteBodyProducerAborted, this));
416 ScheduleCallHere(call);
417 }
418 }
419 }
420
421 // a short temporary string describing buffer status for debugging
422 const char *BodyPipe::status() const
423 {
424 static MemBuf outputBuffer;
425 outputBuffer.reset();
426
427 outputBuffer.append(" [", 2);
428
429 outputBuffer.Printf("%"PRIu64"<=%"PRIu64, theGetSize, thePutSize);
430 if (theBodySize >= 0)
431 outputBuffer.Printf("<=%"PRId64, theBodySize);
432 else
433 outputBuffer.append("<=?", 3);
434
435 outputBuffer.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
436
437 outputBuffer.Printf(" pipe%p", this);
438 if (theProducer.set())
439 outputBuffer.Printf(" prod%p", theProducer.get());
440 if (theConsumer.set())
441 outputBuffer.Printf(" cons%p", theConsumer.get());
442
443 if (mustAutoConsume)
444 outputBuffer.append(" A", 2);
445 if (isCheckedOut)
446 outputBuffer.append(" L", 2); // Locked
447
448 outputBuffer.append("]", 1);
449
450 outputBuffer.terminate();
451
452 return outputBuffer.content();
453 }
454
455
456 /* BodyPipeCheckout */
457
458 BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe),
459 buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
460 checkedOutSize(buf.contentSize()), checkedIn(false)
461 {
462 }
463
464 BodyPipeCheckout::~BodyPipeCheckout()
465 {
466 if (!checkedIn)
467 pipe.undoCheckOut(*this);
468 }
469
470 void
471 BodyPipeCheckout::checkIn()
472 {
473 assert(!checkedIn);
474 pipe.checkIn(*this);
475 checkedIn = true;
476 }
477
478
479 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe),
480 buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
481 checkedIn(c.checkedIn)
482 {
483 assert(false); // prevent copying
484 }
485
486 BodyPipeCheckout &
487 BodyPipeCheckout::operator =(const BodyPipeCheckout &)
488 {
489 assert(false); // prevent assignment
490 return *this;
491 }