]> git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/ecap/XactionRep.cc
Merge from trunk
[thirdparty/squid.git] / src / adaptation / ecap / XactionRep.cc
1 #include "squid.h"
2 #include <libecap/common/area.h>
3 #include <libecap/common/delay.h>
4 #include <libecap/adapter/xaction.h>
5 #include "TextException.h"
6 #include "HttpRequest.h"
7 #include "HttpReply.h"
8 #include "adaptation/ecap/XactionRep.h"
9
10 CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep, XactionRep);
11
12
13 Adaptation::Ecap::XactionRep::XactionRep(Adaptation::Initiator *anInitiator,
14 HttpMsg *virginHeader, HttpRequest *virginCause,
15 const Adaptation::ServicePointer &aService):
16 AsyncJob("Adaptation::Ecap::XactionRep"),
17 Adaptation::Initiate("Adaptation::Ecap::XactionRep", anInitiator, aService),
18 theVirginRep(virginHeader), theCauseRep(NULL),
19 proxyingVb(opUndecided), proxyingAb(opUndecided), canAccessVb(false)
20 {
21 if (virginCause)
22 theCauseRep = new MessageRep(virginCause);
23 }
24
25 Adaptation::Ecap::XactionRep::~XactionRep()
26 {
27 assert(!theMaster);
28 delete theCauseRep;
29 theAnswerRep.reset();
30 }
31
32 void
33 Adaptation::Ecap::XactionRep::master(const AdapterXaction &x)
34 {
35 Must(!theMaster);
36 Must(x != NULL);
37 theMaster = x;
38 }
39
40 void
41 Adaptation::Ecap::XactionRep::start()
42 {
43 Must(theMaster);
44
45 if (theVirginRep.raw().body_pipe != NULL)
46 canAccessVb = true; /// assumes nobody is consuming; \todo check
47 else
48 proxyingVb = opNever;
49
50 theMaster->start();
51 }
52
53 void
54 Adaptation::Ecap::XactionRep::swanSong()
55 {
56 // clear body_pipes, if any
57 // this code does not maintain proxying* and canAccessVb states; should it?
58
59 if (theAnswerRep != NULL) {
60 BodyPipe::Pointer body_pipe = answer().body_pipe;
61 if (body_pipe != NULL) {
62 Must(body_pipe->stillProducing(this));
63 stopProducingFor(body_pipe, false);
64 }
65 }
66
67 {
68 BodyPipe::Pointer body_pipe = theVirginRep.raw().body_pipe;
69 if (body_pipe != NULL) {
70 Must(body_pipe->stillConsuming(this));
71 stopConsumingFrom(body_pipe);
72 }
73 }
74
75 terminateMaster();
76 Adaptation::Initiate::swanSong();
77 }
78
79 libecap::Message &
80 Adaptation::Ecap::XactionRep::virgin()
81 {
82 return theVirginRep;
83 }
84
85 const libecap::Message &
86 Adaptation::Ecap::XactionRep::cause()
87 {
88 Must(theCauseRep != NULL);
89 return *theCauseRep;
90 }
91
92 libecap::Message &
93 Adaptation::Ecap::XactionRep::adapted()
94 {
95 Must(theAnswerRep != NULL);
96 return *theAnswerRep;
97 }
98
99 Adaptation::Message &
100 Adaptation::Ecap::XactionRep::answer()
101 {
102 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
103 Must(rep);
104 return rep->raw();
105 }
106
107 void
108 Adaptation::Ecap::XactionRep::terminateMaster()
109 {
110 if (theMaster) {
111 AdapterXaction x = theMaster;
112 theMaster.reset();
113 x->stop();
114 }
115 }
116
117 bool
118 Adaptation::Ecap::XactionRep::doneAll() const
119 {
120 return proxyingVb >= opComplete && proxyingAb >= opComplete &&
121 Adaptation::Initiate::doneAll();
122 }
123
124 // stops receiving virgin and enables auto-consumption
125 void
126 Adaptation::Ecap::XactionRep::dropVirgin(const char *reason)
127 {
128 debugs(93,4, HERE << "because " << reason << "; status:" << status());
129 Must(proxyingVb = opOn);
130
131 BodyPipePointer &p = theVirginRep.raw().body_pipe;
132 Must(p != NULL);
133 Must(p->stillConsuming(this));
134 stopConsumingFrom(p);
135 p->enableAutoConsumption();
136 proxyingVb = opComplete;
137 canAccessVb = false;
138
139 // called from adapter handler so does not inform adapter
140 }
141
142 void
143 Adaptation::Ecap::XactionRep::useVirgin()
144 {
145 debugs(93,3, HERE << status());
146 Must(proxyingAb == opUndecided);
147 proxyingAb = opNever;
148
149 BodyPipePointer &vbody_pipe = theVirginRep.raw().body_pipe;
150
151 HttpMsg *clone = theVirginRep.raw().header->clone();
152 // check that clone() copies the pipe so that we do not have to
153 Must(!vbody_pipe == !clone->body_pipe);
154
155 if (proxyingVb == opOn) {
156 Must(vbody_pipe->stillConsuming(this));
157 // if libecap consumed, we cannot shortcircuit
158 Must(!vbody_pipe->consumedSize());
159 stopConsumingFrom(vbody_pipe);
160 canAccessVb = false;
161 proxyingVb = opComplete;
162 } else
163 if (proxyingVb == opUndecided) {
164 vbody_pipe = NULL; // it is not our pipe anymore
165 proxyingVb = opNever;
166 }
167
168 sendAnswer(clone);
169 Must(done());
170 }
171
172 void
173 Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
174 {
175 debugs(93,3, HERE << status());
176 Must(m);
177 theAnswerRep = m;
178 Must(proxyingAb == opUndecided);
179
180 HttpMsg *msg = answer().header;
181 if (!theAnswerRep->body()) { // final, bodyless answer
182 proxyingAb = opNever;
183 sendAnswer(msg);
184 } else { // got answer headers but need to handle body
185 proxyingAb = opOn;
186 Must(!msg->body_pipe); // only host can set body pipes
187 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
188 Must(rep);
189 rep->tieBody(this); // sets us as a producer
190 Must(msg->body_pipe != NULL); // check tieBody
191
192 sendAnswer(msg);
193
194 debugs(93,4, HERE << "adapter will produce body" << status());
195 theMaster->abMake(); // libecap will produce
196 }
197 }
198
199 void
200 Adaptation::Ecap::XactionRep::vbDiscard()
201 {
202 Must(proxyingVb == opUndecided);
203 // if adapter does not need vb, we do not need to send it
204 dropVirgin("vbDiscard");
205 Must(proxyingVb == opNever);
206 }
207
208 void
209 Adaptation::Ecap::XactionRep::vbMake()
210 {
211 Must(proxyingVb == opUndecided);
212 BodyPipePointer &p = theVirginRep.raw().body_pipe;
213 Must(p != NULL);
214 Must(p->setConsumerIfNotLate(this)); // to make vb, we must receive vb
215 proxyingVb = opOn;
216 }
217
218 void
219 Adaptation::Ecap::XactionRep::vbStopMaking()
220 {
221 // if adapter does not need vb, we do not need to receive it
222 if (proxyingVb == opOn)
223 dropVirgin("vbStopMaking");
224 Must(proxyingVb == opComplete);
225 }
226
227 void
228 Adaptation::Ecap::XactionRep::vbMakeMore()
229 {
230 Must(proxyingVb == opOn); // cannot make more if done proxying
231 // we cannot guarantee more vb, but we can check that there is a chance
232 Must(!theVirginRep.raw().body_pipe->exhausted());
233 }
234
235 libecap::Area
236 Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
237 {
238 Must(canAccessVb);
239 // We may not be proxyingVb yet. It should be OK, but see vbContentShift().
240
241 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
242 Must(p != NULL);
243
244 // TODO: make MemBuf use size_t?
245 const size_t haveSize = static_cast<size_t>(p->buf().contentSize());
246
247 // convert to Squid types; XXX: check for overflow
248 const uint64_t offset = static_cast<uint64_t>(o);
249 Must(offset <= haveSize); // equal iff at the end of content
250
251 // nsize means no size limit: all content starting from offset
252 const size_t size = s == libecap::nsize ?
253 haveSize - offset : static_cast<size_t>(s);
254
255 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
256 return libecap::Area::FromTempBuffer(p->buf().content() + offset,
257 min(static_cast<size_t>(haveSize - offset), size));
258 }
259
260 void
261 Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n)
262 {
263 Must(canAccessVb);
264 // We may not be proxyingVb yet. It should be OK now, but if BodyPipe
265 // consume() requirements change, we would have to return empty vbContent
266 // until the adapter registers as a consumer
267
268 BodyPipePointer &p = theVirginRep.raw().body_pipe;
269 Must(p != NULL);
270 const size_t size = static_cast<size_t>(n); // XXX: check for overflow
271 const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
272 p->consume(min(size, haveSize));
273 }
274
275 void
276 Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd)
277 {
278 Must(proxyingAb == opOn);
279 stopProducingFor(answer().body_pipe, atEnd);
280 proxyingAb = opComplete;
281 }
282
283 void
284 Adaptation::Ecap::XactionRep::noteAbContentAvailable()
285 {
286 Must(proxyingAb == opOn);
287 moveAbContent();
288 }
289
290 #if 0 /* XXX: implement */
291 void
292 Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
293 {
294 Must(answer().body_pipe != NULL);
295 if (size.known())
296 answer().body_pipe->setBodySize(size.value());
297 // else the piped body size is unknown by default
298 }
299 #endif
300
301 void
302 Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
303 {
304 debugs(93,3, HERE << "adapter needs time: " <<
305 d.state << '/' << d.progress);
306 // XXX: set timeout?
307 }
308
309 void
310 Adaptation::Ecap::XactionRep::adaptationAborted()
311 {
312 tellQueryAborted(true); // should eCAP support retries?
313 mustStop("adaptationAborted");
314 }
315
316 bool
317 Adaptation::Ecap::XactionRep::callable() const
318 {
319 return !done();
320 }
321
322 void
323 Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
324 {
325 Must(proxyingAb == opOn);
326 moveAbContent();
327 }
328
329 void
330 Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
331 {
332 Must(proxyingAb == opOn);
333 stopProducingFor(answer().body_pipe, false);
334 Must(theMaster);
335 theMaster->abStopMaking();
336 proxyingAb = opComplete;
337 }
338
339 void
340 Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
341 {
342 Must(proxyingVb == opOn);
343 Must(theMaster);
344 theMaster->noteVbContentAvailable();
345 }
346
347 void
348 Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
349 {
350 Must(proxyingVb == opOn);
351 Must(theMaster);
352 theMaster->noteVbContentDone(true);
353 proxyingVb = opComplete;
354 }
355
356 void
357 Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
358 {
359 Must(proxyingVb == opOn);
360 Must(theMaster);
361 theMaster->noteVbContentDone(false);
362 proxyingVb = opComplete;
363 }
364
365 void
366 Adaptation::Ecap::XactionRep::noteInitiatorAborted()
367 {
368 mustStop("initiator aborted");
369 }
370
371 // get content from the adapter and put it into the adapted pipe
372 void
373 Adaptation::Ecap::XactionRep::moveAbContent()
374 {
375 Must(proxyingAb == opOn);
376 const libecap::Area c = theMaster->abContent(0, libecap::nsize);
377 debugs(93,5, HERE << " up to " << c.size << " bytes");
378 if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
379 theMaster->abContentShift(used);
380 }
381
382 const char *
383 Adaptation::Ecap::XactionRep::status() const
384 {
385 static MemBuf buf;
386 buf.reset();
387
388 buf.append(" [", 2);
389
390 if (proxyingVb == opOn) {
391 const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
392 if (!canAccessVb)
393 buf.append("x", 1);
394 if (vp != NULL && vp->stillConsuming(this)) {
395 buf.append("Vb", 2);
396 buf.append(vp->status(), strlen(vp->status())); // XXX
397 } else
398 buf.append("V.", 2);
399 }
400
401 if (proxyingAb == opOn) {
402 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
403 Must(rep);
404 const BodyPipePointer &ap = rep->raw().body_pipe;
405 if (ap != NULL && ap->stillProducing(this)) {
406 buf.append(" Ab", 3);
407 buf.append(ap->status(), strlen(ap->status())); // XXX
408 } else
409 buf.append(" A.", 3);
410 }
411
412 buf.Printf(" ecapx%d]", id);
413
414 buf.terminate();
415
416 return buf.content();
417 }