]> git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/ecap/XactionRep.cc
Handle early eCAP transaction failures better.
[thirdparty/squid.git] / src / adaptation / ecap / XactionRep.cc
1 /*
2 * DEBUG: section 93 eCAP Interface
3 */
4 #include "squid.h"
5 #include <libecap/common/area.h>
6 #include <libecap/common/delay.h>
7 #include <libecap/adapter/xaction.h>
8 #include "HttpRequest.h"
9 #include "HttpReply.h"
10 #include "SquidTime.h"
11 #include "adaptation/ecap/XactionRep.h"
12 #include "base/TextException.h"
13
14 CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep, XactionRep);
15
16
17 Adaptation::Ecap::XactionRep::XactionRep(
18 HttpMsg *virginHeader, HttpRequest *virginCause,
19 const Adaptation::ServicePointer &aService):
20 AsyncJob("Adaptation::Ecap::XactionRep"),
21 Adaptation::Initiate("Adaptation::Ecap::XactionRep"),
22 theService(aService),
23 theVirginRep(virginHeader), theCauseRep(NULL),
24 proxyingVb(opUndecided), proxyingAb(opUndecided),
25 adaptHistoryId(-1),
26 canAccessVb(false),
27 abProductionFinished(false), abProductionAtEnd(false)
28 {
29 if (virginCause)
30 theCauseRep = new MessageRep(virginCause);
31 }
32
33 Adaptation::Ecap::XactionRep::~XactionRep()
34 {
35 assert(!theMaster);
36 delete theCauseRep;
37 theAnswerRep.reset();
38 }
39
40 void
41 Adaptation::Ecap::XactionRep::master(const AdapterXaction &x)
42 {
43 Must(!theMaster);
44 Must(x != NULL);
45 theMaster = x;
46 }
47
48 Adaptation::Service &
49 Adaptation::Ecap::XactionRep::service()
50 {
51 Must(theService != NULL);
52 return *theService;
53 }
54
55 void
56 Adaptation::Ecap::XactionRep::start()
57 {
58 Must(theMaster);
59
60 if (theVirginRep.raw().body_pipe != NULL)
61 canAccessVb = true; /// assumes nobody is consuming; \todo check
62 else
63 proxyingVb = opNever;
64
65 const HttpRequest *request = dynamic_cast<const HttpRequest*> (theCauseRep ?
66 theCauseRep->raw().header : theVirginRep.raw().header);
67 Must(request);
68 Adaptation::History::Pointer ah = request->adaptLogHistory();
69 if (ah != NULL) {
70 // retrying=false because ecap never retries transactions
71 adaptHistoryId = ah->recordXactStart(service().cfg().key, current_time, false);
72 }
73
74 theMaster->start();
75 }
76
77 void
78 Adaptation::Ecap::XactionRep::swanSong()
79 {
80 // clear body_pipes, if any
81 // this code does not maintain proxying* and canAccessVb states; should it?
82
83 if (theAnswerRep != NULL) {
84 BodyPipe::Pointer body_pipe = answer().body_pipe;
85 if (body_pipe != NULL) {
86 Must(body_pipe->stillProducing(this));
87 stopProducingFor(body_pipe, false);
88 }
89 }
90
91 if (proxyingVb == opOn) {
92 BodyPipe::Pointer body_pipe = theVirginRep.raw().body_pipe;
93 if (body_pipe != NULL) {
94 Must(body_pipe->stillConsuming(this));
95 stopConsumingFrom(body_pipe);
96 }
97 }
98
99 terminateMaster();
100
101 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
102 theCauseRep->raw().header : theVirginRep.raw().header);
103 Must(request);
104 Adaptation::History::Pointer ah = request->adaptLogHistory();
105 if (ah != NULL && adaptHistoryId >= 0)
106 ah->recordXactFinish(adaptHistoryId);
107
108 Adaptation::Initiate::swanSong();
109 }
110
111 libecap::Message &
112 Adaptation::Ecap::XactionRep::virgin()
113 {
114 return theVirginRep;
115 }
116
117 const libecap::Message &
118 Adaptation::Ecap::XactionRep::cause()
119 {
120 Must(theCauseRep != NULL);
121 return *theCauseRep;
122 }
123
124 libecap::Message &
125 Adaptation::Ecap::XactionRep::adapted()
126 {
127 Must(theAnswerRep != NULL);
128 return *theAnswerRep;
129 }
130
131 Adaptation::Message &
132 Adaptation::Ecap::XactionRep::answer()
133 {
134 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
135 Must(rep);
136 return rep->raw();
137 }
138
139 void
140 Adaptation::Ecap::XactionRep::terminateMaster()
141 {
142 if (theMaster) {
143 AdapterXaction x = theMaster;
144 theMaster.reset();
145 x->stop();
146 }
147 }
148
149 bool
150 Adaptation::Ecap::XactionRep::doneAll() const
151 {
152 return proxyingVb >= opComplete && proxyingAb >= opComplete &&
153 Adaptation::Initiate::doneAll();
154 }
155
156 // stops receiving virgin and enables auto-consumption
157 void
158 Adaptation::Ecap::XactionRep::dropVirgin(const char *reason)
159 {
160 debugs(93,4, HERE << "because " << reason << "; status:" << status());
161 Must(proxyingVb = opOn);
162
163 BodyPipePointer &p = theVirginRep.raw().body_pipe;
164 Must(p != NULL);
165 Must(p->stillConsuming(this));
166 stopConsumingFrom(p);
167 p->enableAutoConsumption();
168 proxyingVb = opComplete;
169 canAccessVb = false;
170
171 // called from adapter handler so does not inform adapter
172 }
173
174 void
175 Adaptation::Ecap::XactionRep::useVirgin()
176 {
177 debugs(93,3, HERE << status());
178 Must(proxyingAb == opUndecided);
179 proxyingAb = opNever;
180
181 BodyPipePointer &vbody_pipe = theVirginRep.raw().body_pipe;
182
183 HttpMsg *clone = theVirginRep.raw().header->clone();
184 // check that clone() copies the pipe so that we do not have to
185 Must(!vbody_pipe == !clone->body_pipe);
186
187 if (proxyingVb == opOn) {
188 Must(vbody_pipe->stillConsuming(this));
189 // if libecap consumed, we cannot shortcircuit
190 Must(!vbody_pipe->consumedSize());
191 stopConsumingFrom(vbody_pipe);
192 canAccessVb = false;
193 proxyingVb = opComplete;
194 } else if (proxyingVb == opUndecided) {
195 vbody_pipe = NULL; // it is not our pipe anymore
196 proxyingVb = opNever;
197 }
198
199 sendAnswer(clone);
200 Must(done());
201 }
202
203 void
204 Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
205 {
206 debugs(93,3, HERE << status());
207 Must(m);
208 theAnswerRep = m;
209 Must(proxyingAb == opUndecided);
210
211 HttpMsg *msg = answer().header;
212 if (!theAnswerRep->body()) { // final, bodyless answer
213 proxyingAb = opNever;
214 sendAnswer(msg);
215 } else { // got answer headers but need to handle body
216 proxyingAb = opOn;
217 Must(!msg->body_pipe); // only host can set body pipes
218 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
219 Must(rep);
220 rep->tieBody(this); // sets us as a producer
221 Must(msg->body_pipe != NULL); // check tieBody
222
223 sendAnswer(msg);
224
225 debugs(93,4, HERE << "adapter will produce body" << status());
226 theMaster->abMake(); // libecap will produce
227 }
228 }
229
230 void
231 Adaptation::Ecap::XactionRep::vbDiscard()
232 {
233 Must(proxyingVb == opUndecided);
234 // if adapter does not need vb, we do not need to send it
235 dropVirgin("vbDiscard");
236 Must(proxyingVb == opNever);
237 }
238
239 void
240 Adaptation::Ecap::XactionRep::vbMake()
241 {
242 Must(proxyingVb == opUndecided);
243 BodyPipePointer &p = theVirginRep.raw().body_pipe;
244 Must(p != NULL);
245 Must(p->setConsumerIfNotLate(this)); // to make vb, we must receive vb
246 proxyingVb = opOn;
247 }
248
249 void
250 Adaptation::Ecap::XactionRep::vbStopMaking()
251 {
252 // if adapter does not need vb, we do not need to receive it
253 if (proxyingVb == opOn)
254 dropVirgin("vbStopMaking");
255 Must(proxyingVb == opComplete);
256 }
257
258 void
259 Adaptation::Ecap::XactionRep::vbMakeMore()
260 {
261 Must(proxyingVb == opOn); // cannot make more if done proxying
262 // we cannot guarantee more vb, but we can check that there is a chance
263 Must(!theVirginRep.raw().body_pipe->exhausted());
264 }
265
266 libecap::Area
267 Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
268 {
269 Must(canAccessVb);
270 // We may not be proxyingVb yet. It should be OK, but see vbContentShift().
271
272 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
273 Must(p != NULL);
274
275 // TODO: make MemBuf use size_t?
276 const size_t haveSize = static_cast<size_t>(p->buf().contentSize());
277
278 // convert to Squid types; XXX: check for overflow
279 const uint64_t offset = static_cast<uint64_t>(o);
280 Must(offset <= haveSize); // equal iff at the end of content
281
282 // nsize means no size limit: all content starting from offset
283 const size_t size = s == libecap::nsize ?
284 haveSize - offset : static_cast<size_t>(s);
285
286 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
287 return libecap::Area::FromTempBuffer(p->buf().content() + offset,
288 min(static_cast<size_t>(haveSize - offset), size));
289 }
290
291 void
292 Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n)
293 {
294 Must(canAccessVb);
295 // We may not be proxyingVb yet. It should be OK now, but if BodyPipe
296 // consume() requirements change, we would have to return empty vbContent
297 // until the adapter registers as a consumer
298
299 BodyPipePointer &p = theVirginRep.raw().body_pipe;
300 Must(p != NULL);
301 const size_t size = static_cast<size_t>(n); // XXX: check for overflow
302 const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
303 p->consume(min(size, haveSize));
304 }
305
306 void
307 Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd)
308 {
309 Must(proxyingAb == opOn && !abProductionFinished);
310 abProductionFinished = true;
311 abProductionAtEnd = atEnd; // store until ready to stop producing ourselves
312 debugs(93,5, HERE << "adapted body production ended");
313 moveAbContent();
314 }
315
316 void
317 Adaptation::Ecap::XactionRep::noteAbContentAvailable()
318 {
319 Must(proxyingAb == opOn && !abProductionFinished);
320 moveAbContent();
321 }
322
323 #if 0 /* XXX: implement */
324 void
325 Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
326 {
327 Must(answer().body_pipe != NULL);
328 if (size.known())
329 answer().body_pipe->setBodySize(size.value());
330 // else the piped body size is unknown by default
331 }
332 #endif
333
334 void
335 Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
336 {
337 debugs(93,3, HERE << "adapter needs time: " <<
338 d.state << '/' << d.progress);
339 // XXX: set timeout?
340 }
341
342 void
343 Adaptation::Ecap::XactionRep::adaptationAborted()
344 {
345 tellQueryAborted(true); // should eCAP support retries?
346 mustStop("adaptationAborted");
347 }
348
349 bool
350 Adaptation::Ecap::XactionRep::callable() const
351 {
352 return !done();
353 }
354
355 void
356 Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
357 {
358 Must(proxyingAb == opOn);
359 moveAbContent();
360 }
361
362 void
363 Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
364 {
365 Must(proxyingAb == opOn);
366 stopProducingFor(answer().body_pipe, false);
367 Must(theMaster);
368 theMaster->abStopMaking();
369 proxyingAb = opComplete;
370 }
371
372 void
373 Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
374 {
375 Must(proxyingVb == opOn);
376 Must(theMaster);
377 theMaster->noteVbContentAvailable();
378 }
379
380 void
381 Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
382 {
383 Must(proxyingVb == opOn);
384 Must(theMaster);
385 theMaster->noteVbContentDone(true);
386 proxyingVb = opComplete;
387 }
388
389 void
390 Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
391 {
392 Must(proxyingVb == opOn);
393 Must(theMaster);
394 theMaster->noteVbContentDone(false);
395 proxyingVb = opComplete;
396 }
397
398 void
399 Adaptation::Ecap::XactionRep::noteInitiatorAborted()
400 {
401 mustStop("initiator aborted");
402 }
403
404 // get content from the adapter and put it into the adapted pipe
405 void
406 Adaptation::Ecap::XactionRep::moveAbContent()
407 {
408 Must(proxyingAb == opOn);
409 const libecap::Area c = theMaster->abContent(0, libecap::nsize);
410 debugs(93,5, HERE << "up to " << c.size << " bytes");
411 if (c.size == 0 && abProductionFinished) { // no ab now and in the future
412 stopProducingFor(answer().body_pipe, abProductionAtEnd);
413 proxyingAb = opComplete;
414 debugs(93,5, HERE << "last adapted body data retrieved");
415 } else if (c.size > 0) {
416 if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
417 theMaster->abContentShift(used);
418 }
419 }
420
421 const char *
422 Adaptation::Ecap::XactionRep::status() const
423 {
424 static MemBuf buf;
425 buf.reset();
426
427 buf.append(" [", 2);
428
429 if (proxyingVb == opOn) {
430 const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
431 if (!canAccessVb)
432 buf.append("x", 1);
433 if (vp != NULL) { // XXX: but may not be stillConsuming()
434 buf.append("Vb", 2);
435 } else
436 buf.append("V.", 2);
437 }
438
439 if (proxyingAb == opOn) {
440 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
441 Must(rep);
442 const BodyPipePointer &ap = rep->raw().body_pipe;
443 if (ap != NULL) { // XXX: but may not be stillProducing()
444 buf.append(" Ab", 3);
445 } else
446 buf.append(" A.", 3);
447 }
448
449 buf.Printf(" %s%u]", id.Prefix, id.value);
450
451 buf.terminate();
452
453 return buf.content();
454 }