]> git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/ecap/XactionRep.cc
eCAP: (A) Support optional adapter parameters and (B) fix virgin body handling.
[thirdparty/squid.git] / src / adaptation / ecap / XactionRep.cc
1 /*
2 * DEBUG: section 93 eCAP Interface
3 */
4 #include "squid.h"
5 #include <libecap/common/area.h>
6 #include <libecap/common/delay.h>
7 #include <libecap/adapter/xaction.h>
8 #include "HttpRequest.h"
9 #include "HttpReply.h"
10 #include "SquidTime.h"
11 #include "adaptation/ecap/XactionRep.h"
12 #include "adaptation/Initiator.h"
13 #include "base/TextException.h"
14
15 CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep, XactionRep);
16
17
18 Adaptation::Ecap::XactionRep::XactionRep(
19 HttpMsg *virginHeader, HttpRequest *virginCause,
20 const Adaptation::ServicePointer &aService):
21 AsyncJob("Adaptation::Ecap::XactionRep"),
22 Adaptation::Initiate("Adaptation::Ecap::XactionRep"),
23 theService(aService),
24 theVirginRep(virginHeader), theCauseRep(NULL),
25 makingVb(opUndecided), proxyingAb(opUndecided),
26 adaptHistoryId(-1),
27 vbProductionFinished(false),
28 abProductionFinished(false), abProductionAtEnd(false)
29 {
30 if (virginCause)
31 theCauseRep = new MessageRep(virginCause);
32 }
33
34 Adaptation::Ecap::XactionRep::~XactionRep()
35 {
36 assert(!theMaster);
37 delete theCauseRep;
38 theAnswerRep.reset();
39 }
40
41 void
42 Adaptation::Ecap::XactionRep::master(const AdapterXaction &x)
43 {
44 Must(!theMaster);
45 Must(x != NULL);
46 theMaster = x;
47 }
48
49 Adaptation::Service &
50 Adaptation::Ecap::XactionRep::service()
51 {
52 Must(theService != NULL);
53 return *theService;
54 }
55
56 void
57 Adaptation::Ecap::XactionRep::start()
58 {
59 Must(theMaster);
60
61 if (!theVirginRep.raw().body_pipe)
62 makingVb = opNever; // there is nothing to deliver
63
64 const HttpRequest *request = dynamic_cast<const HttpRequest*> (theCauseRep ?
65 theCauseRep->raw().header : theVirginRep.raw().header);
66 Must(request);
67 Adaptation::History::Pointer ah = request->adaptLogHistory();
68 if (ah != NULL) {
69 // retrying=false because ecap never retries transactions
70 adaptHistoryId = ah->recordXactStart(service().cfg().key, current_time, false);
71 }
72
73 theMaster->start();
74 }
75
76 void
77 Adaptation::Ecap::XactionRep::swanSong()
78 {
79 // clear body_pipes, if any
80 // this code does not maintain proxying* and canAccessVb states; should it?
81
82 if (theAnswerRep != NULL) {
83 BodyPipe::Pointer body_pipe = answer().body_pipe;
84 if (body_pipe != NULL) {
85 Must(body_pipe->stillProducing(this));
86 stopProducingFor(body_pipe, false);
87 }
88 }
89
90 BodyPipe::Pointer &body_pipe = theVirginRep.raw().body_pipe;
91 if (body_pipe != NULL && body_pipe->stillConsuming(this))
92 stopConsumingFrom(body_pipe);
93
94 terminateMaster();
95
96 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
97 theCauseRep->raw().header : theVirginRep.raw().header);
98 Must(request);
99 Adaptation::History::Pointer ah = request->adaptLogHistory();
100 if (ah != NULL && adaptHistoryId >= 0)
101 ah->recordXactFinish(adaptHistoryId);
102
103 Adaptation::Initiate::swanSong();
104 }
105
106 libecap::Message &
107 Adaptation::Ecap::XactionRep::virgin()
108 {
109 return theVirginRep;
110 }
111
112 const libecap::Message &
113 Adaptation::Ecap::XactionRep::cause()
114 {
115 Must(theCauseRep != NULL);
116 return *theCauseRep;
117 }
118
119 libecap::Message &
120 Adaptation::Ecap::XactionRep::adapted()
121 {
122 Must(theAnswerRep != NULL);
123 return *theAnswerRep;
124 }
125
126 Adaptation::Message &
127 Adaptation::Ecap::XactionRep::answer()
128 {
129 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
130 Must(rep);
131 return rep->raw();
132 }
133
134 void
135 Adaptation::Ecap::XactionRep::terminateMaster()
136 {
137 if (theMaster) {
138 AdapterXaction x = theMaster;
139 theMaster.reset();
140 x->stop();
141 }
142 }
143
144 bool
145 Adaptation::Ecap::XactionRep::doneAll() const
146 {
147 return makingVb >= opComplete && proxyingAb >= opComplete &&
148 Adaptation::Initiate::doneAll();
149 }
150
151 // stops receiving virgin and enables auto-consumption, dropping any vb bytes
152 void
153 Adaptation::Ecap::XactionRep::sinkVb(const char *reason)
154 {
155 debugs(93,4, HERE << "sink for " << reason << "; status:" << status());
156
157 // we reset raw().body_pipe when we are done, so use this one for checking
158 const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
159 if (permPipe != NULL)
160 permPipe->enableAutoConsumption();
161
162 forgetVb(reason);
163 }
164
165 // stops receiving virgin but preserves it for others to use
166 void
167 Adaptation::Ecap::XactionRep::preserveVb(const char *reason)
168 {
169 debugs(93,4, HERE << "preserve for " << reason << "; status:" << status());
170
171 // we reset raw().body_pipe when we are done, so use this one for checking
172 const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
173 if (permPipe != NULL) {
174 // if libecap consumed, we cannot preserve
175 Must(!permPipe->consumedSize());
176 }
177
178 forgetVb(reason);
179 }
180
181 // disassociates us from vb; the last step of sinking or preserving vb
182 void
183 Adaptation::Ecap::XactionRep::forgetVb(const char *reason)
184 {
185 debugs(93,9, HERE << "forget vb " << reason << "; status:" << status());
186
187 BodyPipePointer &p = theVirginRep.raw().body_pipe;
188 if (p != NULL && p->stillConsuming(this))
189 stopConsumingFrom(p);
190
191 if (makingVb == opUndecided)
192 makingVb = opNever;
193 else if (makingVb == opOn)
194 makingVb = opComplete;
195 }
196
197 void
198 Adaptation::Ecap::XactionRep::useVirgin()
199 {
200 debugs(93,3, HERE << status());
201 Must(proxyingAb == opUndecided);
202 proxyingAb = opNever;
203
204 preserveVb("useVirgin");
205
206 HttpMsg *clone = theVirginRep.raw().header->clone();
207 // check that clone() copies the pipe so that we do not have to
208 Must(!theVirginRep.raw().header->body_pipe == !clone->body_pipe);
209
210 sendAnswer(Answer::Forward(clone));
211 Must(done());
212 }
213
214 void
215 Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
216 {
217 debugs(93,3, HERE << status());
218 Must(m);
219 theAnswerRep = m;
220 Must(proxyingAb == opUndecided);
221
222 HttpMsg *msg = answer().header;
223 if (!theAnswerRep->body()) { // final, bodyless answer
224 proxyingAb = opNever;
225 sendAnswer(Answer::Forward(msg));
226 } else { // got answer headers but need to handle body
227 proxyingAb = opOn;
228 Must(!msg->body_pipe); // only host can set body pipes
229 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
230 Must(rep);
231 rep->tieBody(this); // sets us as a producer
232 Must(msg->body_pipe != NULL); // check tieBody
233
234 sendAnswer(Answer::Forward(msg));
235
236 debugs(93,4, HERE << "adapter will produce body" << status());
237 theMaster->abMake(); // libecap will produce
238 }
239 }
240
241 void
242 Adaptation::Ecap::XactionRep::blockVirgin()
243 {
244 debugs(93,3, HERE << status());
245 Must(proxyingAb == opUndecided);
246 proxyingAb = opNever;
247
248 sinkVb("blockVirgin");
249
250 sendAnswer(Answer::Block(service().cfg().key));
251 Must(done());
252 }
253
254 void
255 Adaptation::Ecap::XactionRep::vbDiscard()
256 {
257 Must(makingVb == opUndecided);
258 // if adapter does not need vb, we do not need to send it
259 sinkVb("vbDiscard");
260 Must(makingVb == opNever);
261 }
262
263 void
264 Adaptation::Ecap::XactionRep::vbMake()
265 {
266 Must(makingVb == opUndecided);
267 BodyPipePointer &p = theVirginRep.raw().body_pipe;
268 Must(p != NULL);
269 Must(p->setConsumerIfNotLate(this)); // to deliver vb, we must receive vb
270 makingVb = opOn;
271 }
272
273 void
274 Adaptation::Ecap::XactionRep::vbStopMaking()
275 {
276 Must(makingVb == opOn);
277 // if adapter does not need vb, we do not need to receive it
278 sinkVb("vbStopMaking");
279 Must(makingVb == opComplete);
280 }
281
282 void
283 Adaptation::Ecap::XactionRep::vbMakeMore()
284 {
285 Must(makingVb == opOn); // cannot make more if done proxying
286 // we cannot guarantee more vb, but we can check that there is a chance
287 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
288 Must(p != NULL && p->stillConsuming(this)); // we are plugged in
289 Must(!p->productionEnded() && p->mayNeedMoreData()); // and may get more
290 }
291
292 libecap::Area
293 Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
294 {
295 // We may not be makingVb yet. It should be OK, but see vbContentShift().
296
297 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
298 Must(p != NULL);
299
300 // TODO: make MemBuf use size_t?
301 const size_t haveSize = static_cast<size_t>(p->buf().contentSize());
302
303 // convert to Squid types; XXX: check for overflow
304 const uint64_t offset = static_cast<uint64_t>(o);
305 Must(offset <= haveSize); // equal iff at the end of content
306
307 // nsize means no size limit: all content starting from offset
308 const size_t size = s == libecap::nsize ?
309 haveSize - offset : static_cast<size_t>(s);
310
311 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
312 return libecap::Area::FromTempBuffer(p->buf().content() + offset,
313 min(static_cast<size_t>(haveSize - offset), size));
314 }
315
316 void
317 Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n)
318 {
319 // We may not be makingVb yet. It should be OK now, but if BodyPipe
320 // consume() requirements change, we would have to return empty vbContent
321 // until the adapter registers as a consumer
322
323 BodyPipePointer &p = theVirginRep.raw().body_pipe;
324 Must(p != NULL);
325 const size_t size = static_cast<size_t>(n); // XXX: check for overflow
326 const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
327 p->consume(min(size, haveSize));
328 }
329
330 void
331 Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd)
332 {
333 Must(proxyingAb == opOn && !abProductionFinished);
334 abProductionFinished = true;
335 abProductionAtEnd = atEnd; // store until ready to stop producing ourselves
336 debugs(93,5, HERE << "adapted body production ended");
337 moveAbContent();
338 }
339
340 void
341 Adaptation::Ecap::XactionRep::noteAbContentAvailable()
342 {
343 Must(proxyingAb == opOn && !abProductionFinished);
344 moveAbContent();
345 }
346
347 #if 0 /* XXX: implement */
348 void
349 Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
350 {
351 Must(answer().body_pipe != NULL);
352 if (size.known())
353 answer().body_pipe->setBodySize(size.value());
354 // else the piped body size is unknown by default
355 }
356 #endif
357
358 void
359 Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
360 {
361 debugs(93,3, HERE << "adapter needs time: " <<
362 d.state << '/' << d.progress);
363 // XXX: set timeout?
364 }
365
366 void
367 Adaptation::Ecap::XactionRep::adaptationAborted()
368 {
369 tellQueryAborted(true); // should eCAP support retries?
370 mustStop("adaptationAborted");
371 }
372
373 bool
374 Adaptation::Ecap::XactionRep::callable() const
375 {
376 return !done();
377 }
378
379 void
380 Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
381 {
382 Must(proxyingAb == opOn);
383 moveAbContent();
384 }
385
386 void
387 Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
388 {
389 Must(proxyingAb == opOn);
390 stopProducingFor(answer().body_pipe, false);
391 Must(theMaster);
392 theMaster->abStopMaking();
393 proxyingAb = opComplete;
394 }
395
396 void
397 Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
398 {
399 Must(makingVb == opOn); // or we would not be registered as a consumer
400 Must(theMaster);
401 theMaster->noteVbContentAvailable();
402 }
403
404 void
405 Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
406 {
407 Must(makingVb == opOn); // or we would not be registered as a consumer
408 Must(theMaster);
409 theMaster->noteVbContentDone(true);
410 vbProductionFinished = true;
411 }
412
413 void
414 Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
415 {
416 Must(makingVb == opOn); // or we would not be registered as a consumer
417 Must(theMaster);
418 theMaster->noteVbContentDone(false);
419 vbProductionFinished = true;
420 }
421
422 void
423 Adaptation::Ecap::XactionRep::noteInitiatorAborted()
424 {
425 mustStop("initiator aborted");
426 }
427
428 // get content from the adapter and put it into the adapted pipe
429 void
430 Adaptation::Ecap::XactionRep::moveAbContent()
431 {
432 Must(proxyingAb == opOn);
433 const libecap::Area c = theMaster->abContent(0, libecap::nsize);
434 debugs(93,5, HERE << "up to " << c.size << " bytes");
435 if (c.size == 0 && abProductionFinished) { // no ab now and in the future
436 stopProducingFor(answer().body_pipe, abProductionAtEnd);
437 proxyingAb = opComplete;
438 debugs(93,5, HERE << "last adapted body data retrieved");
439 } else if (c.size > 0) {
440 if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
441 theMaster->abContentShift(used);
442 }
443 }
444
445 const char *
446 Adaptation::Ecap::XactionRep::status() const
447 {
448 static MemBuf buf;
449 buf.reset();
450
451 buf.append(" [", 2);
452
453 if (makingVb)
454 buf.Printf("M%d", static_cast<int>(makingVb));
455
456 const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
457 if (!vp)
458 buf.append(" !V", 3);
459 else
460 if (vp->stillConsuming(const_cast<XactionRep*>(this)))
461 buf.append(" Vc", 3);
462 else
463 buf.append(" V?", 3);
464
465 if (vbProductionFinished)
466 buf.append(".", 1);
467
468
469 buf.Printf(" A%d", static_cast<int>(proxyingAb));
470
471 if (proxyingAb == opOn) {
472 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
473 Must(rep);
474 const BodyPipePointer &ap = rep->raw().body_pipe;
475 if (!ap)
476 buf.append(" !A", 3);
477 else if (ap->stillProducing(const_cast<XactionRep*>(this)))
478 buf.append(" Ap", 3);
479 else
480 buf.append(" A?", 3);
481 }
482
483 buf.Printf(" %s%u]", id.Prefix, id.value);
484
485 buf.terminate();
486
487 return buf.content();
488 }