]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/ecap/XactionRep.cc
2 * DEBUG: section 93 eCAP Interface
5 #include <libecap/common/area.h>
6 #include <libecap/common/delay.h>
7 #include <libecap/adapter/xaction.h>
8 #include "HttpRequest.h"
10 #include "SquidTime.h"
11 #include "adaptation/ecap/XactionRep.h"
12 #include "adaptation/Initiator.h"
13 #include "base/TextException.h"
15 CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep
, XactionRep
);
18 Adaptation::Ecap::XactionRep::XactionRep(
19 HttpMsg
*virginHeader
, HttpRequest
*virginCause
,
20 const Adaptation::ServicePointer
&aService
):
21 AsyncJob("Adaptation::Ecap::XactionRep"),
22 Adaptation::Initiate("Adaptation::Ecap::XactionRep"),
24 theVirginRep(virginHeader
), theCauseRep(NULL
),
25 makingVb(opUndecided
), proxyingAb(opUndecided
),
27 vbProductionFinished(false),
28 abProductionFinished(false), abProductionAtEnd(false)
31 theCauseRep
= new MessageRep(virginCause
);
34 Adaptation::Ecap::XactionRep::~XactionRep()
42 Adaptation::Ecap::XactionRep::master(const AdapterXaction
&x
)
50 Adaptation::Ecap::XactionRep::service()
52 Must(theService
!= NULL
);
57 Adaptation::Ecap::XactionRep::start()
61 if (!theVirginRep
.raw().body_pipe
)
62 makingVb
= opNever
; // there is nothing to deliver
64 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*> (theCauseRep
?
65 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
67 Adaptation::History::Pointer ah
= request
->adaptLogHistory();
69 // retrying=false because ecap never retries transactions
70 adaptHistoryId
= ah
->recordXactStart(service().cfg().key
, current_time
, false);
77 Adaptation::Ecap::XactionRep::swanSong()
79 // clear body_pipes, if any
80 // this code does not maintain proxying* and canAccessVb states; should it?
82 if (theAnswerRep
!= NULL
) {
83 BodyPipe::Pointer body_pipe
= answer().body_pipe
;
84 if (body_pipe
!= NULL
) {
85 Must(body_pipe
->stillProducing(this));
86 stopProducingFor(body_pipe
, false);
90 BodyPipe::Pointer
&body_pipe
= theVirginRep
.raw().body_pipe
;
91 if (body_pipe
!= NULL
&& body_pipe
->stillConsuming(this))
92 stopConsumingFrom(body_pipe
);
96 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*>(theCauseRep
?
97 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
99 Adaptation::History::Pointer ah
= request
->adaptLogHistory();
100 if (ah
!= NULL
&& adaptHistoryId
>= 0)
101 ah
->recordXactFinish(adaptHistoryId
);
103 Adaptation::Initiate::swanSong();
107 Adaptation::Ecap::XactionRep::virgin()
112 const libecap::Message
&
113 Adaptation::Ecap::XactionRep::cause()
115 Must(theCauseRep
!= NULL
);
120 Adaptation::Ecap::XactionRep::adapted()
122 Must(theAnswerRep
!= NULL
);
123 return *theAnswerRep
;
126 Adaptation::Message
&
127 Adaptation::Ecap::XactionRep::answer()
129 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
135 Adaptation::Ecap::XactionRep::terminateMaster()
138 AdapterXaction x
= theMaster
;
145 Adaptation::Ecap::XactionRep::doneAll() const
147 return makingVb
>= opComplete
&& proxyingAb
>= opComplete
&&
148 Adaptation::Initiate::doneAll();
151 // stops receiving virgin and enables auto-consumption, dropping any vb bytes
153 Adaptation::Ecap::XactionRep::sinkVb(const char *reason
)
155 debugs(93,4, HERE
<< "sink for " << reason
<< "; status:" << status());
157 // we reset raw().body_pipe when we are done, so use this one for checking
158 const BodyPipePointer
&permPipe
= theVirginRep
.raw().header
->body_pipe
;
159 if (permPipe
!= NULL
)
160 permPipe
->enableAutoConsumption();
165 // stops receiving virgin but preserves it for others to use
167 Adaptation::Ecap::XactionRep::preserveVb(const char *reason
)
169 debugs(93,4, HERE
<< "preserve for " << reason
<< "; status:" << status());
171 // we reset raw().body_pipe when we are done, so use this one for checking
172 const BodyPipePointer
&permPipe
= theVirginRep
.raw().header
->body_pipe
;
173 if (permPipe
!= NULL
) {
174 // if libecap consumed, we cannot preserve
175 Must(!permPipe
->consumedSize());
181 // disassociates us from vb; the last step of sinking or preserving vb
183 Adaptation::Ecap::XactionRep::forgetVb(const char *reason
)
185 debugs(93,9, HERE
<< "forget vb " << reason
<< "; status:" << status());
187 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
188 if (p
!= NULL
&& p
->stillConsuming(this))
189 stopConsumingFrom(p
);
191 if (makingVb
== opUndecided
)
193 else if (makingVb
== opOn
)
194 makingVb
= opComplete
;
198 Adaptation::Ecap::XactionRep::useVirgin()
200 debugs(93,3, HERE
<< status());
201 Must(proxyingAb
== opUndecided
);
202 proxyingAb
= opNever
;
204 preserveVb("useVirgin");
206 HttpMsg
*clone
= theVirginRep
.raw().header
->clone();
207 // check that clone() copies the pipe so that we do not have to
208 Must(!theVirginRep
.raw().header
->body_pipe
== !clone
->body_pipe
);
210 sendAnswer(Answer::Forward(clone
));
215 Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr
<libecap::Message
> &m
)
217 debugs(93,3, HERE
<< status());
220 Must(proxyingAb
== opUndecided
);
222 HttpMsg
*msg
= answer().header
;
223 if (!theAnswerRep
->body()) { // final, bodyless answer
224 proxyingAb
= opNever
;
225 sendAnswer(Answer::Forward(msg
));
226 } else { // got answer headers but need to handle body
228 Must(!msg
->body_pipe
); // only host can set body pipes
229 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
231 rep
->tieBody(this); // sets us as a producer
232 Must(msg
->body_pipe
!= NULL
); // check tieBody
234 sendAnswer(Answer::Forward(msg
));
236 debugs(93,4, HERE
<< "adapter will produce body" << status());
237 theMaster
->abMake(); // libecap will produce
242 Adaptation::Ecap::XactionRep::blockVirgin()
244 debugs(93,3, HERE
<< status());
245 Must(proxyingAb
== opUndecided
);
246 proxyingAb
= opNever
;
248 sinkVb("blockVirgin");
250 sendAnswer(Answer::Block(service().cfg().key
));
255 Adaptation::Ecap::XactionRep::vbDiscard()
257 Must(makingVb
== opUndecided
);
258 // if adapter does not need vb, we do not need to send it
260 Must(makingVb
== opNever
);
264 Adaptation::Ecap::XactionRep::vbMake()
266 Must(makingVb
== opUndecided
);
267 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
269 Must(p
->setConsumerIfNotLate(this)); // to deliver vb, we must receive vb
274 Adaptation::Ecap::XactionRep::vbStopMaking()
276 Must(makingVb
== opOn
);
277 // if adapter does not need vb, we do not need to receive it
278 sinkVb("vbStopMaking");
279 Must(makingVb
== opComplete
);
283 Adaptation::Ecap::XactionRep::vbMakeMore()
285 Must(makingVb
== opOn
); // cannot make more if done proxying
286 // we cannot guarantee more vb, but we can check that there is a chance
287 const BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
288 Must(p
!= NULL
&& p
->stillConsuming(this)); // we are plugged in
289 Must(!p
->productionEnded() && p
->mayNeedMoreData()); // and may get more
293 Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o
, libecap::size_type s
)
295 // We may not be makingVb yet. It should be OK, but see vbContentShift().
297 const BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
300 // TODO: make MemBuf use size_t?
301 const size_t haveSize
= static_cast<size_t>(p
->buf().contentSize());
303 // convert to Squid types; XXX: check for overflow
304 const uint64_t offset
= static_cast<uint64_t>(o
);
305 Must(offset
<= haveSize
); // equal iff at the end of content
307 // nsize means no size limit: all content starting from offset
308 const size_t size
= s
== libecap::nsize
?
309 haveSize
- offset
: static_cast<size_t>(s
);
311 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
312 return libecap::Area::FromTempBuffer(p
->buf().content() + offset
,
313 min(static_cast<size_t>(haveSize
- offset
), size
));
317 Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n
)
319 // We may not be makingVb yet. It should be OK now, but if BodyPipe
320 // consume() requirements change, we would have to return empty vbContent
321 // until the adapter registers as a consumer
323 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
325 const size_t size
= static_cast<size_t>(n
); // XXX: check for overflow
326 const size_t haveSize
= static_cast<size_t>(p
->buf().contentSize()); // TODO: make MemBuf use size_t?
327 p
->consume(min(size
, haveSize
));
331 Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd
)
333 Must(proxyingAb
== opOn
&& !abProductionFinished
);
334 abProductionFinished
= true;
335 abProductionAtEnd
= atEnd
; // store until ready to stop producing ourselves
336 debugs(93,5, HERE
<< "adapted body production ended");
341 Adaptation::Ecap::XactionRep::noteAbContentAvailable()
343 Must(proxyingAb
== opOn
&& !abProductionFinished
);
347 #if 0 /* XXX: implement */
349 Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize
&size
)
351 Must(answer().body_pipe
!= NULL
);
353 answer().body_pipe
->setBodySize(size
.value());
354 // else the piped body size is unknown by default
359 Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay
&d
)
361 debugs(93,3, HERE
<< "adapter needs time: " <<
362 d
.state
<< '/' << d
.progress
);
367 Adaptation::Ecap::XactionRep::adaptationAborted()
369 tellQueryAborted(true); // should eCAP support retries?
370 mustStop("adaptationAborted");
374 Adaptation::Ecap::XactionRep::callable() const
380 Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount
<BodyPipe
> bp
)
382 Must(proxyingAb
== opOn
);
387 Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount
<BodyPipe
> bp
)
389 Must(proxyingAb
== opOn
);
390 stopProducingFor(answer().body_pipe
, false);
392 theMaster
->abStopMaking();
393 proxyingAb
= opComplete
;
397 Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount
<BodyPipe
> bp
)
399 Must(makingVb
== opOn
); // or we would not be registered as a consumer
401 theMaster
->noteVbContentAvailable();
405 Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount
<BodyPipe
> bp
)
407 Must(makingVb
== opOn
); // or we would not be registered as a consumer
409 theMaster
->noteVbContentDone(true);
410 vbProductionFinished
= true;
414 Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount
<BodyPipe
> bp
)
416 Must(makingVb
== opOn
); // or we would not be registered as a consumer
418 theMaster
->noteVbContentDone(false);
419 vbProductionFinished
= true;
423 Adaptation::Ecap::XactionRep::noteInitiatorAborted()
425 mustStop("initiator aborted");
428 // get content from the adapter and put it into the adapted pipe
430 Adaptation::Ecap::XactionRep::moveAbContent()
432 Must(proxyingAb
== opOn
);
433 const libecap::Area c
= theMaster
->abContent(0, libecap::nsize
);
434 debugs(93,5, HERE
<< "up to " << c
.size
<< " bytes");
435 if (c
.size
== 0 && abProductionFinished
) { // no ab now and in the future
436 stopProducingFor(answer().body_pipe
, abProductionAtEnd
);
437 proxyingAb
= opComplete
;
438 debugs(93,5, HERE
<< "last adapted body data retrieved");
439 } else if (c
.size
> 0) {
440 if (const size_t used
= answer().body_pipe
->putMoreData(c
.start
, c
.size
))
441 theMaster
->abContentShift(used
);
446 Adaptation::Ecap::XactionRep::status() const
454 buf
.Printf("M%d", static_cast<int>(makingVb
));
456 const BodyPipePointer
&vp
= theVirginRep
.raw().body_pipe
;
458 buf
.append(" !V", 3);
460 if (vp
->stillConsuming(const_cast<XactionRep
*>(this)))
461 buf
.append(" Vc", 3);
463 buf
.append(" V?", 3);
465 if (vbProductionFinished
)
469 buf
.Printf(" A%d", static_cast<int>(proxyingAb
));
471 if (proxyingAb
== opOn
) {
472 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
474 const BodyPipePointer
&ap
= rep
->raw().body_pipe
;
476 buf
.append(" !A", 3);
477 else if (ap
->stillProducing(const_cast<XactionRep
*>(this)))
478 buf
.append(" Ap", 3);
480 buf
.append(" A?", 3);
483 buf
.Printf(" %s%u]", id
.Prefix
, id
.value
);
487 return buf
.content();