]> git.ipfire.org Git - thirdparty/squid.git/blob - src/BodyPipe.cc
- Replaced BodyReader with BodyPipe. BodyReader was a
[thirdparty/squid.git] / src / BodyPipe.cc
1
2 #include "squid.h"
3 #include "BodyPipe.h"
4
5 CBDATA_CLASS_INIT(BodyPipe);
6
7 // inform the pipe that we are done and clear the Pointer
8 void BodyProducer::stopProducingFor(RefCount<BodyPipe> &pipe, bool atEof)
9 {
10 debugs(91,7, HERE << this << " will not produce for " << pipe <<
11 "; atEof: " << atEof);
12 assert(pipe != NULL); // be strict: the caller state may depend on this
13 pipe->clearProducer(atEof);
14 pipe = NULL;
15 }
16
17 // inform the pipe that we are done and clear the Pointer
18 void BodyConsumer::stopConsumingFrom(RefCount<BodyPipe> &pipe)
19 {
20 debugs(91,7, HERE << this << " will not consume from " << pipe);
21 assert(pipe != NULL); // be strict: the caller state may depend on this
22 pipe->clearConsumer();
23 pipe = NULL;
24 }
25
26 /* BodyPipe */
27
28 BodyPipe::BodyPipe(Producer *aProducer): theBodySize(-1),
29 theProducer(aProducer), theConsumer(0),
30 thePutSize(0), theGetSize(0), mustAutoConsume(false), isCheckedOut(false)
31 {
32 // TODO: teach MemBuf to start with zero minSize
33 // TODO: limit maxSize by theBodySize, when known?
34 theBuf.init(2*1024, MaxCapacity);
35 debugs(91,7, HERE << "created BodyPipe" << status());
36 }
37
38 BodyPipe::~BodyPipe()
39 {
40 debugs(91,7, HERE << "destroying BodyPipe" << status());
41 assert(!theProducer);
42 assert(!theConsumer);
43 theBuf.clean();
44 }
45
46 void BodyPipe::setBodySize(size_t aBodySize)
47 {
48 assert(!bodySizeKnown());
49 assert(aBodySize >= 0);
50 assert(thePutSize <= aBodySize);
51
52 // If this assert fails, we need to add code to check for eof and inform
53 // the consumer about the eof condition via scheduleBodyEndNotification,
54 // because just setting a body size limit may trigger the eof condition.
55 assert(!theConsumer);
56
57 theBodySize = aBodySize;
58 debugs(91,7, HERE << "set body size" << status());
59 }
60
61 size_t BodyPipe::bodySize() const
62 {
63 assert(bodySizeKnown());
64 return static_cast<size_t>(theBodySize);
65 }
66
67 bool BodyPipe::expectMoreAfter(size_t offset) const
68 {
69 assert(theGetSize <= offset);
70 return offset < thePutSize || // buffer has more now or
71 (!productionEnded() && mayNeedMoreData()); // buffer will have more
72 }
73
74 bool BodyPipe::exhausted() const
75 {
76 return !expectMoreAfter(theGetSize);
77 }
78
79 size_t BodyPipe::unproducedSize() const
80 {
81 return bodySize() - thePutSize; // bodySize() asserts that size is known
82 }
83
84 void
85 BodyPipe::clearProducer(bool atEof)
86 {
87 if (theProducer) {
88 debugs(91,7, HERE << "clearing BodyPipe producer" << status());
89 theProducer = NULL;
90 if (atEof) {
91 if (!bodySizeKnown())
92 theBodySize = thePutSize;
93 else
94 if (bodySize() != thePutSize)
95 debugs(91,1, HERE << "aborting on premature eof" << status());
96 } else {
97 // asserta that we can detect the abort if the consumer joins later
98 assert(!bodySizeKnown() || bodySize() != thePutSize);
99 }
100 scheduleBodyEndNotification();
101 }
102 }
103
104 size_t
105 BodyPipe::putMoreData(const char *buf, size_t size)
106 {
107 if (bodySizeKnown())
108 size = XMIN(size, unproducedSize());
109
110 const size_t spaceSize = static_cast<size_t>(theBuf.potentialSpaceSize());
111 if ((size = XMIN(size, spaceSize))) {
112 theBuf.append(buf, size);
113 postAppend(size);
114 return size;
115 }
116 return 0;
117 }
118
119 bool
120 BodyPipe::setConsumerIfNotLate(Consumer *aConsumer)
121 {
122 assert(!theConsumer);
123 assert(aConsumer);
124
125 // TODO: convert this into an exception and remove IfNotLate suffix
126 // If there is something consumed already, we are in an auto-consuming mode
127 // and it is too late to attach a real consumer to the pipe.
128 if (theGetSize > 0) {
129 assert(mustAutoConsume);
130 return false;
131 }
132
133 theConsumer = aConsumer;
134 debugs(91,7, HERE << "set consumer" << status());
135 if (theBuf.hasContent())
136 AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable);
137 if (!theProducer)
138 scheduleBodyEndNotification();
139
140 return true;
141 }
142
143 void
144 BodyPipe::clearConsumer() {
145 if (theConsumer) {
146 debugs(91,7, HERE << "clearing consumer" << status());
147 theConsumer = NULL;
148 if (!exhausted())
149 AsyncCall(91,5, this, BodyPipe::tellBodyConsumerAborted);
150 }
151 }
152
153 size_t
154 BodyPipe::getMoreData(MemBuf &buf)
155 {
156 if (!theBuf.hasContent())
157 return 0; // did not touch the possibly uninitialized buf
158
159 if (buf.isNull())
160 buf.init();
161 const size_t size = XMIN(theBuf.contentSize(), buf.potentialSpaceSize());
162 buf.append(theBuf.content(), size);
163 theBuf.consume(size);
164 postConsume(size);
165 return size; // cannot be zero if we called buf.init above
166 }
167
168 void
169 BodyPipe::consume(size_t size)
170 {
171 theBuf.consume(size);
172 postConsume(size);
173 }
174
175 void
176 BodyPipe::enableAutoConsumption() {
177 mustAutoConsume = true;
178 debugs(91,5, HERE << "enabled auto consumption" << status());
179 if (!theConsumer && theBuf.hasContent())
180 AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable);
181 }
182
183 MemBuf &
184 BodyPipe::checkOut() {
185 assert(!isCheckedOut);
186 isCheckedOut = true;
187 return theBuf;
188 }
189
190 void
191 BodyPipe::checkIn(Checkout &checkout)
192 {
193 assert(isCheckedOut);
194 isCheckedOut = false;
195 const size_t currentSize = theBuf.contentSize();
196 if (checkout.checkedOutSize > currentSize)
197 postConsume(checkout.checkedOutSize - currentSize);
198 else
199 if (checkout.checkedOutSize < currentSize)
200 postAppend(currentSize - checkout.checkedOutSize);
201 }
202
203 void
204 BodyPipe::undoCheckOut(Checkout &checkout)
205 {
206 assert(isCheckedOut);
207 const size_t currentSize = theBuf.contentSize();
208 // We can only undo if size did not change, and even that carries
209 // some risk. If this becomes a problem, the code checking out
210 // raw buffers should always check them in (possibly unchanged)
211 // instead of relying on the automated undo mechanism of Checkout.
212 // The code can always use a temporary buffer to accomplish that.
213 assert(checkout.checkedOutSize == currentSize);
214 }
215
216 // TODO: Optimize: inform consumer/producer about more data/space only if
217 // they used the data/space since we notified them last time.
218
219 void
220 BodyPipe::postConsume(size_t size) {
221 assert(!isCheckedOut);
222 theGetSize += size;
223 debugs(91,7, HERE << "consumed " << size << " bytes" << status());
224 if (mayNeedMoreData())
225 AsyncCall(91,5, this, BodyPipe::tellMoreBodySpaceAvailable);
226 }
227
228 void
229 BodyPipe::postAppend(size_t size) {
230 assert(!isCheckedOut);
231 thePutSize += size;
232 debugs(91,7, HERE << "added " << size << " bytes" << status());
233
234 // We should not consume here even if mustAutoConsume because the
235 // caller may not be ready for the data to be consumed during this call.
236 AsyncCall(91,5, this, BodyPipe::tellMoreBodyDataAvailable);
237
238 if (!mayNeedMoreData())
239 clearProducer(true); // reached end-of-body
240 }
241
242
243 void
244 BodyPipe::scheduleBodyEndNotification()
245 {
246 if (bodySizeKnown() && bodySize() == thePutSize)
247 AsyncCall(91,5, this, BodyPipe::tellBodyProductionEnded);
248 else
249 AsyncCall(91,5, this, BodyPipe::tellBodyProducerAborted);
250 }
251
252 void BodyPipe::tellMoreBodySpaceAvailable()
253 {
254 if (theProducer != NULL)
255 theProducer->noteMoreBodySpaceAvailable(*this);
256 }
257
258 void BodyPipe::tellBodyConsumerAborted()
259 {
260 if (theProducer != NULL)
261 theProducer->noteBodyConsumerAborted(*this);
262 }
263
264 void BodyPipe::tellMoreBodyDataAvailable()
265 {
266 if (theConsumer != NULL)
267 theConsumer->noteMoreBodyDataAvailable(*this);
268 else
269 if (mustAutoConsume && theBuf.hasContent())
270 consume(theBuf.contentSize());
271 }
272
273 void BodyPipe::tellBodyProductionEnded()
274 {
275 if (theConsumer != NULL)
276 theConsumer->noteBodyProductionEnded(*this);
277 }
278
279 void BodyPipe::tellBodyProducerAborted()
280 {
281 if (theConsumer != NULL)
282 theConsumer->noteBodyProducerAborted(*this);
283 }
284
285 // a short temporary string describing buffer status for debugging
286 const char *BodyPipe::status() const
287 {
288 static MemBuf buf;
289 buf.reset();
290
291 buf.append(" [", 2);
292
293 buf.Printf("%d<=%d", (int)theGetSize, (int)thePutSize);
294 if (theBodySize >= 0)
295 buf.Printf("<=%d", (int)theBodySize);
296 else
297 buf.append("<=?", 3);
298
299 buf.Printf(" %d+%d", (int)theBuf.contentSize(), (int)theBuf.spaceSize());
300
301 buf.Printf(" pipe%p", this);
302 if (theProducer)
303 buf.Printf(" prod%p", theProducer);
304 if (theConsumer)
305 buf.Printf(" cons%p", theConsumer);
306
307 if (mustAutoConsume)
308 buf.append(" A", 2);
309 if (isCheckedOut)
310 buf.append(" L", 2); // Locked
311
312 buf.append("]", 1);
313
314 buf.terminate();
315
316 return buf.content();
317 }
318
319
320 /* BodyPipeCheckout */
321
322 BodyPipeCheckout::BodyPipeCheckout(BodyPipe &aPipe): pipe(aPipe),
323 buf(aPipe.checkOut()), offset(aPipe.consumedSize()),
324 checkedOutSize(buf.contentSize()), checkedIn(false)
325 {
326 }
327
328 BodyPipeCheckout::~BodyPipeCheckout()
329 {
330 if (!checkedIn)
331 pipe.undoCheckOut(*this);
332 }
333
334 void
335 BodyPipeCheckout::checkIn()
336 {
337 assert(!checkedIn);
338 pipe.checkIn(*this);
339 checkedIn = true;
340 }
341
342
343 BodyPipeCheckout::BodyPipeCheckout(const BodyPipeCheckout &c): pipe(c.pipe),
344 buf(c.buf), offset(c.offset), checkedOutSize(c.checkedOutSize),
345 checkedIn(c.checkedIn)
346 {
347 assert(false); // prevent copying
348 }
349
350 BodyPipeCheckout &
351 BodyPipeCheckout::operator =(const BodyPipeCheckout &)
352 {
353 assert(false); // prevent assignment
354 return *this;
355 }