]>
Commit | Line | Data |
---|---|---|
1 | /* | |
2 | * Copyright (C) 1996-2025 The Squid Software Foundation and contributors | |
3 | * | |
4 | * Squid software is distributed under GPLv2+ license and includes | |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
7 | */ | |
8 | ||
9 | #include "squid.h" | |
10 | #include "base/AsyncJobCalls.h" | |
11 | #include "base/TextException.h" | |
12 | #include "BodyPipe.h" | |
13 | ||
14 | // BodySink is a BodyConsumer class which just consume and drops | |
15 | // data from a BodyPipe | |
16 | class BodySink: public BodyConsumer | |
17 | { | |
18 | CBDATA_CHILD(BodySink); | |
19 | ||
20 | public: | |
21 | BodySink(const BodyPipe::Pointer &bp): AsyncJob("BodySink"), body_pipe(bp) {} | |
22 | ~BodySink() override { assert(!body_pipe); } | |
23 | ||
24 | void noteMoreBodyDataAvailable(BodyPipe::Pointer bp) override { | |
25 | size_t contentSize = bp->buf().contentSize(); | |
26 | bp->consume(contentSize); | |
27 | } | |
28 | void noteBodyProductionEnded(BodyPipe::Pointer) override { | |
29 | stopConsumingFrom(body_pipe); | |
30 | } | |
31 | void noteBodyProducerAborted(BodyPipe::Pointer) override { | |
32 | stopConsumingFrom(body_pipe); | |
33 | } | |
34 | bool doneAll() const override {return !body_pipe && AsyncJob::doneAll();} | |
35 | ||
36 | private: | |
37 | BodyPipe::Pointer body_pipe; ///< the pipe we are consuming from | |
38 | }; | |
39 | ||
40 | CBDATA_CLASS_INIT(BodySink); | |
41 | ||
42 | // The BodyProducerDialer is an AsyncCall class which used to schedule BodyProducer calls. | |
43 | // In addition to a normal AsyncCall checks if the BodyProducer is still the producer of | |
44 | // the BodyPipe passed as argument | |
45 | class BodyProducerDialer: public UnaryMemFunT<BodyProducer, BodyPipe::Pointer> | |
46 | { | |
47 | public: | |
48 | typedef UnaryMemFunT<BodyProducer, BodyPipe::Pointer> Parent; | |
49 | ||
50 | BodyProducerDialer(const BodyProducer::Pointer &aProducer, | |
51 | Parent::Method aHandler, BodyPipe::Pointer bp): | |
52 | Parent(aProducer, aHandler, bp) {} | |
53 | ||
54 | bool canDial(AsyncCall &call) override; | |
55 | }; | |
56 | ||
57 | // The BodyConsumerDialer is an AsyncCall class which used to schedule BodyConsumer calls. | |
58 | // In addition to a normal AsyncCall checks if the BodyConsumer is still the reciptient | |
59 | // of the BodyPipe passed as argument | |
60 | class BodyConsumerDialer: public UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> | |
61 | { | |
62 | public: | |
63 | typedef UnaryMemFunT<BodyConsumer, BodyPipe::Pointer> Parent; | |
64 | ||
65 | BodyConsumerDialer(const BodyConsumer::Pointer &aConsumer, | |
66 | Parent::Method aHandler, BodyPipe::Pointer bp): | |
67 | Parent(aConsumer, aHandler, bp) {} | |
68 | ||
69 | bool canDial(AsyncCall &call) override; | |
70 | }; | |
71 | ||
72 | bool | |
73 | BodyProducerDialer::canDial(AsyncCall &call) | |
74 | { | |
75 | if (!Parent::canDial(call)) | |
76 | return false; | |
77 | ||
78 | const BodyProducer::Pointer &producer = job; | |
79 | BodyPipe::Pointer aPipe = arg1; | |
80 | if (!aPipe->stillProducing(producer)) { | |
81 | debugs(call.debugSection, call.debugLevel, producer << " no longer producing for " << aPipe->status()); | |
82 | return call.cancel("no longer producing"); | |
83 | } | |
84 | ||
85 | return true; | |
86 | } | |
87 | ||
88 | bool | |
89 | BodyConsumerDialer::canDial(AsyncCall &call) | |
90 | { | |
91 | if (!Parent::canDial(call)) | |
92 | return false; | |
93 | ||
94 | const BodyConsumer::Pointer &consumer = job; | |
95 | BodyPipe::Pointer aPipe = arg1; | |
96 | if (!aPipe->stillConsuming(consumer)) { | |
97 | debugs(call.debugSection, call.debugLevel, consumer << " no longer consuming from " << aPipe->status()); | |
98 | return call.cancel("no longer consuming"); | |
99 | } | |
100 | ||
101 | return true; | |
102 | } | |
103 | ||
104 | /* BodyProducer */ | |
105 | ||
106 | // inform the pipe that we are done and clear the Pointer | |
107 | void BodyProducer::stopProducingFor(RefCount<BodyPipe> &p, bool atEof) | |
108 | { | |
109 | debugs(91,7, this << " will not produce for " << p << "; atEof: " << atEof); | |
110 | assert(p != nullptr); // be strict: the caller state may depend on this | |
111 | p->clearProducer(atEof); | |
112 | p = nullptr; | |
113 | } | |
114 | ||
115 | /* BodyConsumer */ | |
116 | ||
117 | // inform the pipe that we are done and clear the Pointer | |
118 | void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &p) | |
119 | { | |
120 | debugs(91,7, this << " will not consume from " << p); | |
121 | assert(p != nullptr); // be strict: the caller state may depend on this | |
122 | p->clearConsumer(); | |
123 | p = nullptr; | |
124 | } | |
125 | ||
126 | /* BodyPipe */ | |
127 | ||
128 | BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1), | |
129 | theProducer(aProducer), theConsumer(nullptr), | |
130 | thePutSize(0), theGetSize(0), | |
131 | mustAutoConsume(false), abortedConsumption(false), isCheckedOut(false) | |
132 | { | |
133 | // TODO: teach MemBuf to start with zero minSize | |
134 | // TODO: limit maxSize by theBodySize, when known? | |
135 | theBuf.init(2*1024, MaxCapacity); | |
136 | debugs(91,7, "created BodyPipe" << status()); | |
137 | } | |
138 | ||
139 | BodyPipe::~BodyPipe() | |
140 | { | |
141 | debugs(91,7, "destroying BodyPipe" << status()); | |
142 | assert(!theProducer); | |
143 | assert(!theConsumer); | |
144 | theBuf.clean(); | |
145 | } | |
146 | ||
147 | void BodyPipe::setBodySize(uint64_t aBodySize) | |
148 | { | |
149 | assert(!bodySizeKnown()); | |
150 | assert(thePutSize <= aBodySize); | |
151 | ||
152 | // If this assert fails, we need to add code to check for eof and inform | |
153 | // the consumer about the eof condition via scheduleBodyEndNotification, | |
154 | // because just setting a body size limit may trigger the eof condition. | |
155 | assert(!theConsumer); | |
156 | ||
157 | theBodySize = aBodySize; | |
158 | debugs(91,7, "set body size" << status()); | |
159 | } | |
160 | ||
161 | uint64_t BodyPipe::bodySize() const | |
162 | { | |
163 | assert(bodySizeKnown()); | |
164 | return static_cast<uint64_t>(theBodySize); | |
165 | } | |
166 | ||
167 | bool BodyPipe::expectMoreAfter(uint64_t offset) const | |
168 | { | |
169 | assert(theGetSize <= offset); | |
170 | return offset < thePutSize || // buffer has more now or | |
171 | (!productionEnded() && mayNeedMoreData()); // buffer will have more | |
172 | } | |
173 | ||
174 | bool BodyPipe::exhausted() const | |
175 | { | |
176 | return !expectMoreAfter(theGetSize); | |
177 | } | |
178 | ||
179 | uint64_t BodyPipe::unproducedSize() const | |
180 | { | |
181 | return bodySize() - thePutSize; // bodySize() asserts that size is known | |
182 | } | |
183 | ||
184 | void BodyPipe::expectProductionEndAfter(uint64_t size) | |
185 | { | |
186 | const uint64_t expectedSize = thePutSize + size; | |
187 | if (bodySizeKnown()) | |
188 | Must(bodySize() == expectedSize); | |
189 | else | |
190 | theBodySize = expectedSize; | |
191 | } | |
192 | ||
193 | void | |
194 | BodyPipe::clearProducer(bool atEof) | |
195 | { | |
196 | if (theProducer.set()) { | |
197 | debugs(91,7, "clearing BodyPipe producer" << status()); | |
198 | theProducer.clear(); | |
199 | if (atEof) { | |
200 | if (!bodySizeKnown()) | |
201 | theBodySize = thePutSize; | |
202 | else if (bodySize() != thePutSize) | |
203 | debugs(91,3, "aborting on premature eof" << status()); | |
204 | } else { | |
205 | // asserta that we can detect the abort if the consumer joins later | |
206 | assert(!bodySizeKnown() || bodySize() != thePutSize); | |
207 | } | |
208 | scheduleBodyEndNotification(); | |
209 | } | |
210 | } | |
211 | ||
212 | size_t | |
213 | BodyPipe::putMoreData(const char *aBuffer, size_t size) | |
214 | { | |
215 | if (bodySizeKnown()) | |
216 | size = min((uint64_t)size, unproducedSize()); | |
217 | ||
218 | const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize()); | |
219 | if ((size = min(size, spaceSize))) { | |
220 | theBuf.append(aBuffer, size); | |
221 | postAppend(size); | |
222 | return size; | |
223 | } | |
224 | return 0; | |
225 | } | |
226 | ||
227 | bool | |
228 | BodyPipe::setConsumerIfNotLate(const Consumer::Pointer &aConsumer) | |
229 | { | |
230 | assert(!theConsumer); | |
231 | assert(aConsumer.set()); // but might be invalid | |
232 | ||
233 | // TODO: convert this into an exception and remove IfNotLate suffix | |
234 | // If there is something consumed already, we are in an auto-consuming mode | |
235 | // and it is too late to attach a real consumer to the pipe. | |
236 | if (theGetSize > 0) { | |
237 | assert(mustAutoConsume); | |
238 | return false; | |
239 | } | |
240 | ||
241 | Must(!abortedConsumption); // did not promise to never consume | |
242 | ||
243 | theConsumer = aConsumer; | |
244 | debugs(91,7, "set consumer" << status()); | |
245 | if (theBuf.hasContent()) | |
246 | scheduleBodyDataNotification(); | |
247 | if (!theProducer) | |
248 | scheduleBodyEndNotification(); | |
249 | ||
250 | return true; | |
251 | } | |
252 | ||
253 | void | |
254 | BodyPipe::clearConsumer() | |
255 | { | |
256 | if (theConsumer.set()) { | |
257 | debugs(91,7, "clearing consumer" << status()); | |
258 | theConsumer.clear(); | |
259 | // do not abort if we have not consumed so that HTTP or ICAP can retry | |
260 | // benign xaction failures due to persistent connection race conditions | |
261 | if (consumedSize()) | |
262 | expectNoConsumption(); | |
263 | } | |
264 | } | |
265 | ||
266 | void | |
267 | BodyPipe::expectNoConsumption() | |
268 | { | |
269 | // We and enableAutoConsumption() may be called multiple times because | |
270 | // multiple jobs on the consumption chain may realize that there will be no | |
271 | // more setConsumer() calls (e.g., consuming code and retrying code). It is | |
272 | // both difficult and unnecessary for them to coordinate their calls. | |
273 | ||
274 | // As a consequence, we may be called when already auto-consuming, including | |
275 | // cases where abortedConsumption is still false. We could try to harden | |
276 | // this by also aborting consumption from enableAutoConsumption() when there | |
277 | // is no consumer, but see errorAppendEntry() TODO for a better plan. | |
278 | ||
279 | debugs(91, 7, status()); | |
280 | if (!abortedConsumption && !exhausted() && !theConsumer) { | |
281 | AsyncCall::Pointer call= asyncCall(91, 7, | |
282 | "BodyProducer::noteBodyConsumerAborted", | |
283 | BodyProducerDialer(theProducer, | |
284 | &BodyProducer::noteBodyConsumerAborted, this)); | |
285 | ScheduleCallHere(call); | |
286 | abortedConsumption = true; | |
287 | ||
288 | // in case somebody enabled auto-consumption before regular one aborted | |
289 | startAutoConsumptionIfNeeded(); | |
290 | } | |
291 | } | |
292 | ||
293 | size_t | |
294 | BodyPipe::getMoreData(MemBuf &aMemBuffer) | |
295 | { | |
296 | if (!theBuf.hasContent()) | |
297 | return 0; // did not touch the possibly uninitialized buf | |
298 | ||
299 | if (aMemBuffer.isNull()) | |
300 | aMemBuffer.init(); | |
301 | const size_t size = min(theBuf.contentSize(), aMemBuffer.potentialSpaceSize()); | |
302 | aMemBuffer.append(theBuf.content(), size); | |
303 | theBuf.consume(size); | |
304 | postConsume(size); | |
305 | return size; // cannot be zero if we called buf.init above | |
306 | } | |
307 | ||
308 | void | |
309 | BodyPipe::consume(size_t size) | |
310 | { | |
311 | theBuf.consume(size); | |
312 | postConsume(size); | |
313 | } | |
314 | ||
315 | void | |
316 | BodyPipe::enableAutoConsumption() | |
317 | { | |
318 | mustAutoConsume = true; | |
319 | debugs(91,5, "enabled auto consumption" << status()); | |
320 | startAutoConsumptionIfNeeded(); | |
321 | } | |
322 | ||
323 | /// Check the current need and, if needed, start auto consumption. In auto | |
324 | /// consumption mode, the consumer is gone, but the producer continues to | |
325 | /// produce data. We use a BodySink BodyConsumer to discard that data. | |
326 | void | |
327 | BodyPipe::startAutoConsumptionIfNeeded() | |
328 | { | |
329 | const auto startNow = | |
330 | mustAutoConsume && // was enabled | |
331 | !theConsumer && // has not started yet | |
332 | theProducer.valid() && // still useful (and will eventually stop) | |
333 | theBuf.hasContent(); // has something to consume right now | |
334 | if (!startNow) | |
335 | return; | |
336 | ||
337 | theConsumer = new BodySink(this); | |
338 | AsyncJob::Start(theConsumer); | |
339 | debugs(91,7, "starting auto consumption" << status()); | |
340 | scheduleBodyDataNotification(); | |
341 | } | |
342 | ||
343 | MemBuf & | |
344 | BodyPipe::checkOut() | |
345 | { | |
346 | assert(!isCheckedOut); | |
347 | isCheckedOut = true; | |
348 | return theBuf; | |
349 | } | |
350 | ||
351 | void | |
352 | BodyPipe::checkIn(Checkout &checkout) | |
353 | { | |
354 | assert(isCheckedOut); | |
355 | isCheckedOut = false; | |
356 | const size_t currentSize = theBuf.contentSize(); | |
357 | if (checkout.checkedOutSize > currentSize) | |
358 | postConsume(checkout.checkedOutSize - currentSize); | |
359 | else if (checkout.checkedOutSize < currentSize) | |
360 | postAppend(currentSize - checkout.checkedOutSize); | |
361 | } | |
362 | ||
363 | void | |
364 | BodyPipe::undoCheckOut(Checkout &checkout) | |
365 | { | |
366 | assert(isCheckedOut); | |
367 | const size_t currentSize = theBuf.contentSize(); | |
368 | // We can only undo if size did not change, and even that carries | |
369 | // some risk. If this becomes a problem, the code checking out | |
370 | // raw buffers should always check them in (possibly unchanged) | |
371 | // instead of relying on the automated undo mechanism of Checkout. | |
372 | // The code can always use a temporary buffer to accomplish that. | |
373 | Must(checkout.checkedOutSize == currentSize); | |
374 | } | |
375 | ||
376 | // TODO: Optimize: inform consumer/producer about more data/space only if | |
377 | // they used the data/space since we notified them last time. | |
378 | ||
379 | void | |
380 | BodyPipe::postConsume(size_t size) | |
381 | { | |
382 | assert(!isCheckedOut); | |
383 | theGetSize += size; | |
384 | debugs(91,7, "consumed " << size << " bytes" << status()); | |
385 | if (mayNeedMoreData()) { | |
386 | AsyncCall::Pointer call= asyncCall(91, 7, | |
387 | "BodyProducer::noteMoreBodySpaceAvailable", | |
388 | BodyProducerDialer(theProducer, | |
389 | &BodyProducer::noteMoreBodySpaceAvailable, this)); | |
390 | ScheduleCallHere(call); | |
391 | } | |
392 | } | |
393 | ||
394 | void | |
395 | BodyPipe::postAppend(size_t size) | |
396 | { | |
397 | assert(!isCheckedOut); | |
398 | thePutSize += size; | |
399 | debugs(91,7, "added " << size << " bytes" << status()); | |
400 | ||
401 | // We should not consume here even if mustAutoConsume because the | |
402 | // caller may not be ready for the data to be consumed during this call. | |
403 | scheduleBodyDataNotification(); | |
404 | ||
405 | // Do this check after scheduleBodyDataNotification() to ensure the | |
406 | // natural order of "more body data" and "production ended" events. | |
407 | if (!mayNeedMoreData()) | |
408 | clearProducer(true); // reached end-of-body | |
409 | ||
410 | startAutoConsumptionIfNeeded(); | |
411 | } | |
412 | ||
413 | void | |
414 | BodyPipe::scheduleBodyDataNotification() | |
415 | { | |
416 | if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead | |
417 | AsyncCall::Pointer call = asyncCall(91, 7, | |
418 | "BodyConsumer::noteMoreBodyDataAvailable", | |
419 | BodyConsumerDialer(theConsumer, | |
420 | &BodyConsumer::noteMoreBodyDataAvailable, this)); | |
421 | ScheduleCallHere(call); | |
422 | } | |
423 | } | |
424 | ||
425 | void | |
426 | BodyPipe::scheduleBodyEndNotification() | |
427 | { | |
428 | if (theConsumer.valid()) { // TODO: allow asyncCall() to check this instead | |
429 | if (bodySizeKnown() && bodySize() == thePutSize) { | |
430 | AsyncCall::Pointer call = asyncCall(91, 7, | |
431 | "BodyConsumer::noteBodyProductionEnded", | |
432 | BodyConsumerDialer(theConsumer, | |
433 | &BodyConsumer::noteBodyProductionEnded, this)); | |
434 | ScheduleCallHere(call); | |
435 | } else { | |
436 | AsyncCall::Pointer call = asyncCall(91, 7, | |
437 | "BodyConsumer::noteBodyProducerAborted", | |
438 | BodyConsumerDialer(theConsumer, | |
439 | &BodyConsumer::noteBodyProducerAborted, this)); | |
440 | ScheduleCallHere(call); | |
441 | } | |
442 | } | |
443 | } | |
444 | ||
445 | // a short temporary string describing buffer status for debugging | |
446 | const char *BodyPipe::status() const | |
447 | { | |
448 | static MemBuf outputBuffer; | |
449 | outputBuffer.reset(); | |
450 | ||
451 | outputBuffer.append(" [", 2); | |
452 | ||
453 | outputBuffer.appendf("%" PRIu64 "<=%" PRIu64, theGetSize, thePutSize); | |
454 | if (theBodySize >= 0) | |
455 | outputBuffer.appendf("<=%" PRId64, theBodySize); | |
456 | else | |
457 | outputBuffer.append("<=?", 3); | |
458 | ||
459 | outputBuffer.appendf(" %" PRId64 "+%" PRId64, static_cast<int64_t>(theBuf.contentSize()), static_cast<int64_t>(theBuf.spaceSize())); | |
460 | ||
461 | outputBuffer.appendf(" pipe%p", this); | |
462 | if (theProducer.set()) | |
463 | outputBuffer.appendf(" prod%p", theProducer.get()); | |
464 | if (theConsumer.set()) | |
465 | outputBuffer.appendf(" cons%p", theConsumer.get()); | |
466 | ||
467 | if (mustAutoConsume) | |
468 | outputBuffer.append(" A", 2); | |
469 | if (abortedConsumption) | |
470 | outputBuffer.append(" !C", 3); | |
471 | if (isCheckedOut) | |
472 | outputBuffer.append(" L", 2); // Locked | |
473 | ||
474 | outputBuffer.append("]", 1); | |
475 | ||
476 | outputBuffer.terminate(); | |
477 | ||
478 | return outputBuffer.content(); | |
479 | } | |
480 | ||
481 | /* BodyPipeCheckout */ | |
482 | ||
483 | BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): thePipe(aPipe), | |
484 | buf(aPipe.checkOut()), offset(aPipe.consumedSize()), | |
485 | checkedOutSize(buf.contentSize()), checkedIn(false) | |
486 | { | |
487 | } | |
488 | ||
489 | BodyPipeCheckout::~BodyPipeCheckout() | |
490 | { | |
491 | if (!checkedIn) { | |
492 | // Do not pipe.undoCheckOut(*this) because it asserts or throws | |
493 | // TODO: consider implementing the long-term solution discussed at | |
494 | // http://www.mail-archive.com/squid-dev@squid-cache.org/msg07910.html | |
495 | debugs(91,2, "Warning: cannot undo BodyPipeCheckout"); | |
496 | thePipe.checkIn(*this); | |
497 | } | |
498 | } | |
499 | ||
500 | void | |
501 | BodyPipeCheckout::checkIn() | |
502 | { | |
503 | assert(!checkedIn); | |
504 | thePipe.checkIn(*this); | |
505 | checkedIn = true; | |
506 | } | |
507 | ||
508 | BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): thePipe(c.thePipe), | |
509 | buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize), | |
510 | checkedIn(c.checkedIn) | |
511 | { | |
512 | assert(false); // prevent copying | |
513 | } | |
514 | ||
515 | BodyPipeCheckout & | |
516 | BodyPipeCheckout::operator =(const BodyPipeCheckout &) | |
517 | { | |
518 | assert(false); // prevent assignment | |
519 | return *this; | |
520 | } | |
521 |