]> git.ipfire.org Git - thirdparty/squid.git/blob - src/BodyPipe.cc
Various audit updates
[thirdparty/squid.git] / src / BodyPipe.cc
1
2 #include "squid.h"
3 #include "base/AsyncJobCalls.h"
4 #include "base/TextException.h"
5 #include "BodyPipe.h"
6
7 CBDATA_CLASS_INIT(BodyPipe);
8
9 // BodySink is a BodyConsumer class which just consume and drops
10 // data from a BodyPipe
11 class BodySink: public BodyConsumer
12 {
13 public:
14 BodySink(const BodyPipe::Pointer &bp): AsyncJob("BodySink"), body_pipe(bp) {}
15 virtual ~BodySink() { assert(!body_pipe); }
16
17 virtual void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) {
18 size_t contentSize = bp->buf().contentSize();
19 bp->consume(contentSize);
20 }
21 virtual void noteBodyProductionEnded(BodyPipe::Pointer bp) {
22 stopConsumingFrom(body_pipe);
23 }
24 virtual void noteBodyProducerAborted(BodyPipe::Pointer bp) {
25 stopConsumingFrom(body_pipe);
26 }
27 bool doneAll() const {return !body_pipe && AsyncJob::doneAll();}
28
29 private:
30 BodyPipe::Pointer body_pipe; ///< the pipe we are consuming from
31
32 CBDATA_CLASS2(BodySink);
33 };
34
35 CBDATA_CLASS_INIT(BodySink);
36
37 // The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls.
38 // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of
39 // the BodyPipe passed as argument
40 class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer>
41 {
42 public:
43 typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent;
44
45 BodyProducerDialer(const BodyProducer::Pointer &aProducer,
46 Parent::Method aHandler, BodyPipe::Pointer bp):
47 Parent(aProducer, aHandler, bp) {}
48
49 virtual bool canDial(AsyncCall &call);
50 };
51
52 // The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls.
53 // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient
54 // of the BodyPipe passed as argument
55 class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer>
56 {
57 public:
58 typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent;
59
60 BodyConsumerDialer(const BodyConsumer::Pointer &aConsumer,
61 Parent::Method aHandler, BodyPipe::Pointer bp):
62 Parent(aConsumer, aHandler, bp) {}
63
64 virtual bool canDial(AsyncCall &call);
65 };
66
67 bool
68 BodyProducerDialer::canDial(AsyncCall &call)
69 {
70 if (!Parent::canDial(call))
71 return false;
72
73 const BodyProducer::Pointer &producer = job;
74 BodyPipe::Pointer pipe = arg1;
75 if (!pipe->stillProducing(producer)) {
76 debugs(call.debugSection, call.debugLevel, HERE << producer <<
77 " no longer producing for " << pipe->status());
78 return call.cancel("no longer producing");
79 }
80
81 return true;
82 }
83
84 bool
85 BodyConsumerDialer::canDial(AsyncCall &call)
86 {
87 if (!Parent::canDial(call))
88 return false;
89
90 const BodyConsumer::Pointer &consumer = job;
91 BodyPipe::Pointer pipe = arg1;
92 if (!pipe->stillConsuming(consumer)) {
93 debugs(call.debugSection, call.debugLevel, HERE << consumer <<
94 " no longer consuming from " << pipe->status());
95 return call.cancel("no longer consuming");
96 }
97
98 return true;
99 }
100
101 /* BodyProducer */
102
103 // inform the pipe that we are done and clear the Pointer
104 void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
105 {
106 debugs(91,7, HERE << this << " will not produce for " << pipe <<
107 "; atEof: " << atEof);
108 assert(pipe != NULL); // be strict: the caller state may depend on this
109 pipe->clearProducer(atEof);
110 pipe = NULL;
111 }
112
113 /* BodyConsumer */
114
115 // inform the pipe that we are done and clear the Pointer
116 void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
117 {
118 debugs(91,7, HERE << this << " will not consume from " << pipe);
119 assert(pipe != NULL); // be strict: the caller state may depend on this
120 pipe->clearConsumer();
121 pipe = NULL;
122 }
123
124 /* BodyPipe */
125
126 BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
127 theProducer(aProducer), theConsumer(0),
128 thePutSize(0), theGetSize(0),
129 mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false)
130 {
131 // TODO: teach MemBuf to start with zero minSize
132 // TODO: limit maxSize by theBodySize, when known?
133 theBuf.init(2*1024, MaxCapacity);
134 debugs(91,7, HERE << "created BodyPipe" << status());
135 }
136
137 BodyPipe::~BodyPipe()
138 {
139 debugs(91,7, HERE << "destroying BodyPipe" << status());
140 assert(!theProducer);
141 assert(!theConsumer);
142 theBuf.clean();
143 }
144
145 void BodyPipe::setBodySize(uint64_t aBodySize)
146 {
147 assert(!bodySizeKnown());
148 assert(thePutSize <= aBodySize);
149
150 // If this assert fails, we need to add code to check for eof and inform
151 // the consumer about the eof condition via scheduleBodyEndNotification,
152 // because just setting a body size limit may trigger the eof condition.
153 assert(!theConsumer);
154
155 theBodySize = aBodySize;
156 debugs(91,7, HERE << "set body size" << status());
157 }
158
159 uint64_t BodyPipe::bodySize() const
160 {
161 assert(bodySizeKnown());
162 return static_cast<uint64_t>(theBodySize);
163 }
164
165 bool BodyPipe::expectMoreAfter(uint64_t offset) const
166 {
167 assert(theGetSize <= offset);
168 return offset < thePutSize || // buffer has more now or
169 (!productionEnded() && mayNeedMoreData()); // buffer will have more
170 }
171
172 bool BodyPipe::exhausted() const
173 {
174 return !expectMoreAfter(theGetSize);
175 }
176
177 uint64_t BodyPipe::unproducedSize() const
178 {
179 return bodySize() - thePutSize; // bodySize() asserts that size is known
180 }
181
182 void BodyPipe::expectProductionEndAfter(uint64_t size)
183 {
184 const uint64_t expectedSize = thePutSize + size;
185 if (bodySizeKnown())
186 Must(bodySize() == expectedSize);
187 else
188 theBodySize = expectedSize;
189 }
190
191 void
192 BodyPipe::clearProducer(bool atEof)
193 {
194 if (theProducer.set()) {
195 debugs(91,7, HERE << "clearing BodyPipe producer" << status());
196 theProducer.clear();
197 if (atEof) {
198 if (!bodySizeKnown())
199 theBodySize = thePutSize;
200 else if (bodySize() != thePutSize)
201 debugs(91,3, HERE << "aborting on premature eof" << status());
202 } else {
203 // asserta that we can detect the abort if the consumer joins later
204 assert(!bodySizeKnown() || bodySize() != thePutSize);
205 }
206 scheduleBodyEndNotification();
207 }
208 }
209
210 size_t
211 BodyPipe::putMoreData(const char *aBuffer, size_t size)
212 {
213 if (bodySizeKnown())
214 size = min((uint64_t)size, unproducedSize());
215
216 const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
217 if ((size = min(size, spaceSize))) {
218 theBuf.append(aBuffer, size);
219 postAppend(size);
220 return size;
221 }
222 return 0;
223 }
224
225 bool
226 BodyPipe::setConsumerIfNotLate(const Consumer::Pointer &aConsumer)
227 {
228 assert(!theConsumer);
229 assert(aConsumer.set()); // but might be invalid
230
231 // TODO: convert this into an exception and remove IfNotLate suffix
232 // If there is something consumed already, we are in an auto-consuming mode
233 // and it is too late to attach a real consumer to the pipe.
234 if (theGetSize > 0) {
235 assert(mustAutoConsume);
236 return false;
237 }
238
239 Must(!abortedConsumption); // did not promise to never consume
240
241 theConsumer = aConsumer;
242 debugs(91,7, HERE << "set consumer" << status());
243 if (theBuf.hasContent())
244 scheduleBodyDataNotification();
245 if (!theProducer)
246 scheduleBodyEndNotification();
247
248 return true;
249 }
250
251 void
252 BodyPipe::clearConsumer()
253 {
254 if (theConsumer.set()) {
255 debugs(91,7, HERE << "clearing consumer" << status());
256 theConsumer.clear();
257 // do not abort if we have not consumed so that HTTP or ICAP can retry
258 // benign xaction failures due to persistent connection race conditions
259 if (consumedSize())
260 expectNoConsumption();
261 }
262 }
263
264 void
265 BodyPipe::expectNoConsumption()
266 {
267 // We may be called multiple times because multiple jobs on the consumption
268 // chain may realize that there will be no more setConsumer() calls (e.g.,
269 // consuming code and retrying code). It is both difficult and not really
270 // necessary for them to coordinate their expectNoConsumption() calls.
271
272 // As a consequence, we may be called when we are auto-consuming already.
273
274 if (!abortedConsumption && !exhausted()) {
275 // Before we abort, any regular consumption should be over and auto
276 // consumption must not be started.
277 Must(!theConsumer);
278
279 AsyncCall::Pointer call= asyncCall(91, 7,
280 "BodyProducer::noteBodyConsumerAborted",
281 BodyProducerDialer(theProducer,
282 &BodyProducer::noteBodyConsumerAborted, this));
283 ScheduleCallHere(call);
284 abortedConsumption = true;
285
286 // in case somebody enabled auto-consumption before regular one aborted
287 if (mustAutoConsume)
288 startAutoConsumption();
289 }
290 }
291
292 size_t
293 BodyPipe::getMoreData(MemBuf &aMemBuffer)
294 {
295 if (!theBuf.hasContent())
296 return 0; // did not touch the possibly uninitialized buf
297
298 if (aMemBuffer.isNull())
299 aMemBuffer.init();
300 const size_t size = min(theBuf.contentSize(), aMemBuffer.potentialSpaceSize());
301 aMemBuffer.append(theBuf.content(), size);
302 theBuf.consume(size);
303 postConsume(size);
304 return size; // cannot be zero if we called buf.init above
305 }
306
307 void
308 BodyPipe::consume(size_t size)
309 {
310 theBuf.consume(size);
311 postConsume(size);
312 }
313
314 // In the AutoConsumption mode the consumer has gone but the producer continues
315 // producing data. We are using a BodySink BodyConsumer which just discards the produced data.
316 void
317 BodyPipe::enableAutoConsumption()
318 {
319 mustAutoConsume = true;
320 debugs(91,5, HERE << "enabled auto consumption" << status());
321 if (!theConsumer && theBuf.hasContent())
322 startAutoConsumption();
323 }
324
325 // start auto consumption by creating body sink
326 void
327 BodyPipe::startAutoConsumption()
328 {
329 Must(mustAutoConsume);
330 Must(!theConsumer);
331 theConsumer = new BodySink(this);
332 debugs(91,7, HERE << "starting auto consumption" << status());
333 scheduleBodyDataNotification();
334 }
335
336 MemBuf &
337 BodyPipe::checkOut()
338 {
339 assert(!isCheckedOut);
340 isCheckedOut = true;
341 return theBuf;
342 }
343
344 void
345 BodyPipe::checkIn(Checkout &checkout)
346 {
347 assert(isCheckedOut);
348 isCheckedOut = false;
349 const size_t currentSize = theBuf.contentSize();
350 if (checkout.checkedOutSize > currentSize)
351 postConsume(checkout.checkedOutSize - currentSize);
352 else if (checkout.checkedOutSize < currentSize)
353 postAppend(currentSize - checkout.checkedOutSize);
354 }
355
356 void
357 BodyPipe::undoCheckOut(Checkout &checkout)
358 {
359 assert(isCheckedOut);
360 const size_t currentSize = theBuf.contentSize();
361 // We can only undo if size did not change, and even that carries
362 // some risk. If this becomes a problem, the code checking out
363 // raw buffers should always check them in (possibly unchanged)
364 // instead of relying on the automated undo mechanism of Checkout.
365 // The code can always use a temporary buffer to accomplish that.
366 Must(checkout.checkedOutSize == currentSize);
367 }
368
369 // TODO: Optimize: inform consumer/producer about more data/space only if
370 // they used the data/space since we notified them last time.
371
372 void
373 BodyPipe::postConsume(size_t size)
374 {
375 assert(!isCheckedOut);
376 theGetSize += size;
377 debugs(91,7, HERE << "consumed " << size << " bytes" << status());
378 if (mayNeedMoreData()) {
379 AsyncCall::Pointer call= asyncCall(91, 7,
380 "BodyProducer::noteMoreBodySpaceAvailable",
381 BodyProducerDialer(theProducer,
382 &BodyProducer::noteMoreBodySpaceAvailable, this));
383 ScheduleCallHere(call);
384 }
385 }
386
387 void
388 BodyPipe::postAppend(size_t size)
389 {
390 assert(!isCheckedOut);
391 thePutSize += size;
392 debugs(91,7, HERE << "added " << size << " bytes" << status());
393
394 if (mustAutoConsume && !theConsumer && size > 0)
395 startAutoConsumption();
396
397 // We should not consume here even if mustAutoConsume because the
398 // caller may not be ready for the data to be consumed during this call.
399 scheduleBodyDataNotification();
400
401 if (!mayNeedMoreData())
402 clearProducer(true); // reached end-of-body
403 }
404
405 void
406 BodyPipe::scheduleBodyDataNotification()
407 {
408 if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
409 AsyncCall::Pointer call = asyncCall(91, 7,
410 "BodyConsumer::noteMoreBodyDataAvailable",
411 BodyConsumerDialer(theConsumer,
412 &BodyConsumer::noteMoreBodyDataAvailable, this));
413 ScheduleCallHere(call);
414 }
415 }
416
417 void
418 BodyPipe::scheduleBodyEndNotification()
419 {
420 if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead
421 if (bodySizeKnown() && bodySize() == thePutSize) {
422 AsyncCall::Pointer call = asyncCall(91, 7,
423 "BodyConsumer::noteBodyProductionEnded",
424 BodyConsumerDialer(theConsumer,
425 &BodyConsumer::noteBodyProductionEnded, this));
426 ScheduleCallHere(call);
427 } else {
428 AsyncCall::Pointer call = asyncCall(91, 7,
429 "BodyConsumer::noteBodyProducerAborted",
430 BodyConsumerDialer(theConsumer,
431 &BodyConsumer::noteBodyProducerAborted, this));
432 ScheduleCallHere(call);
433 }
434 }
435 }
436
437 // a short temporary string describing buffer status for debugging
438 const char *BodyPipe::status() const
439 {
440 static MemBuf outputBuffer;
441 outputBuffer.reset();
442
443 outputBuffer.append(" [", 2);
444
445 outputBuffer.Printf("%" PRIu64 "<=%" PRIu64, theGetSize, thePutSize);
446 if (theBodySize >= 0)
447 outputBuffer.Printf("<=%" PRId64, theBodySize);
448 else
449 outputBuffer.append("<=?", 3);
450
451 outputBuffer.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
452
453 outputBuffer.Printf(" pipe%p", this);
454 if (theProducer.set())
455 outputBuffer.Printf(" prod%p", theProducer.get());
456 if (theConsumer.set())
457 outputBuffer.Printf(" cons%p", theConsumer.get());
458
459 if (mustAutoConsume)
460 outputBuffer.append(" A", 2);
461 if (abortedConsumption)
462 outputBuffer.append(" !C", 3);
463 if (isCheckedOut)
464 outputBuffer.append(" L", 2); // Locked
465
466 outputBuffer.append("]", 1);
467
468 outputBuffer.terminate();
469
470 return outputBuffer.content();
471 }
472
473 /* BodyPipeCheckout */
474
475 BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe),
476 buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
477 checkedOutSize(buf.contentSize()), checkedIn(false)
478 {
479 }
480
481 BodyPipeCheckout::~BodyPipeCheckout()
482 {
483 if (!checkedIn) {
484 // Do not pipe.undoCheckOut(*this) because it asserts or throws
485 // TODO: consider implementing the long-term solution discussed at
486 // http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html
487 debugs(91,2, HERE << "Warning: cannot undo BodyPipeCheckout");
488 pipe.checkIn(*this);
489 }
490 }
491
492 void
493 BodyPipeCheckout::checkIn()
494 {
495 assert(!checkedIn);
496 pipe.checkIn(*this);
497 checkedIn = true;
498 }
499
500 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe),
501 buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
502 checkedIn(c.checkedIn)
503 {
504 assert(false); // prevent copying
505 }
506
507 BodyPipeCheckout &
508 BodyPipeCheckout::operator =(const BodyPipeCheckout &)
509 {
510 assert(false); // prevent assignment
511 return *this;
512 }