]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/ecap/XactionRep.cc
2 #include <libecap/common/area.h>
3 #include <libecap/common/delay.h>
4 #include <libecap/adapter/xaction.h>
5 #include "TextException.h"
6 #include "HttpRequest.h"
9 #include "adaptation/ecap/XactionRep.h"
11 CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep
, XactionRep
);
14 Adaptation::Ecap::XactionRep::XactionRep(Adaptation::Initiator
*anInitiator
,
15 HttpMsg
*virginHeader
, HttpRequest
*virginCause
,
16 const Adaptation::ServicePointer
&aService
):
17 AsyncJob("Adaptation::Ecap::XactionRep"),
18 Adaptation::Initiate("Adaptation::Ecap::XactionRep", anInitiator
),
20 theVirginRep(virginHeader
), theCauseRep(NULL
),
21 proxyingVb(opUndecided
), proxyingAb(opUndecided
),
24 abProductionFinished(false), abProductionAtEnd(false)
27 theCauseRep
= new MessageRep(virginCause
);
30 Adaptation::Ecap::XactionRep::~XactionRep()
38 Adaptation::Ecap::XactionRep::master(const AdapterXaction
&x
)
46 Adaptation::Ecap::XactionRep::service()
48 Must(theService
!= NULL
);
53 Adaptation::Ecap::XactionRep::start()
57 if (theVirginRep
.raw().body_pipe
!= NULL
)
58 canAccessVb
= true; /// assumes nobody is consuming; \todo check
62 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*> (theCauseRep
?
63 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
65 Adaptation::History::Pointer ah
= request
->adaptLogHistory();
67 // retrying=false because ecap never retries transactions
68 adaptHistoryId
= ah
->recordXactStart(service().cfg().key
, current_time
, false);
75 Adaptation::Ecap::XactionRep::swanSong()
77 // clear body_pipes, if any
78 // this code does not maintain proxying* and canAccessVb states; should it?
80 if (theAnswerRep
!= NULL
) {
81 BodyPipe::Pointer body_pipe
= answer().body_pipe
;
82 if (body_pipe
!= NULL
) {
83 Must(body_pipe
->stillProducing(this));
84 stopProducingFor(body_pipe
, false);
89 BodyPipe::Pointer body_pipe
= theVirginRep
.raw().body_pipe
;
90 if (body_pipe
!= NULL
) {
91 Must(body_pipe
->stillConsuming(this));
92 stopConsumingFrom(body_pipe
);
98 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*>(theCauseRep
?
99 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
101 Adaptation::History::Pointer ah
= request
->adaptLogHistory();
102 if (ah
!= NULL
&& adaptHistoryId
>= 0)
103 ah
->recordXactFinish(adaptHistoryId
);
105 Adaptation::Initiate::swanSong();
109 Adaptation::Ecap::XactionRep::virgin()
114 const libecap::Message
&
115 Adaptation::Ecap::XactionRep::cause()
117 Must(theCauseRep
!= NULL
);
122 Adaptation::Ecap::XactionRep::adapted()
124 Must(theAnswerRep
!= NULL
);
125 return *theAnswerRep
;
128 Adaptation::Message
&
129 Adaptation::Ecap::XactionRep::answer()
131 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
137 Adaptation::Ecap::XactionRep::terminateMaster()
140 AdapterXaction x
= theMaster
;
147 Adaptation::Ecap::XactionRep::doneAll() const
149 return proxyingVb
>= opComplete
&& proxyingAb
>= opComplete
&&
150 Adaptation::Initiate::doneAll();
153 // stops receiving virgin and enables auto-consumption
155 Adaptation::Ecap::XactionRep::dropVirgin(const char *reason
)
157 debugs(93,4, HERE
<< "because " << reason
<< "; status:" << status());
158 Must(proxyingVb
= opOn
);
160 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
162 Must(p
->stillConsuming(this));
163 stopConsumingFrom(p
);
164 p
->enableAutoConsumption();
165 proxyingVb
= opComplete
;
168 // called from adapter handler so does not inform adapter
172 Adaptation::Ecap::XactionRep::useVirgin()
174 debugs(93,3, HERE
<< status());
175 Must(proxyingAb
== opUndecided
);
176 proxyingAb
= opNever
;
178 BodyPipePointer
&vbody_pipe
= theVirginRep
.raw().body_pipe
;
180 HttpMsg
*clone
= theVirginRep
.raw().header
->clone();
181 // check that clone() copies the pipe so that we do not have to
182 Must(!vbody_pipe
== !clone
->body_pipe
);
184 if (proxyingVb
== opOn
) {
185 Must(vbody_pipe
->stillConsuming(this));
186 // if libecap consumed, we cannot shortcircuit
187 Must(!vbody_pipe
->consumedSize());
188 stopConsumingFrom(vbody_pipe
);
190 proxyingVb
= opComplete
;
191 } else if (proxyingVb
== opUndecided
) {
192 vbody_pipe
= NULL
; // it is not our pipe anymore
193 proxyingVb
= opNever
;
201 Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr
<libecap::Message
> &m
)
203 debugs(93,3, HERE
<< status());
206 Must(proxyingAb
== opUndecided
);
208 HttpMsg
*msg
= answer().header
;
209 if (!theAnswerRep
->body()) { // final, bodyless answer
210 proxyingAb
= opNever
;
212 } else { // got answer headers but need to handle body
214 Must(!msg
->body_pipe
); // only host can set body pipes
215 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
217 rep
->tieBody(this); // sets us as a producer
218 Must(msg
->body_pipe
!= NULL
); // check tieBody
222 debugs(93,4, HERE
<< "adapter will produce body" << status());
223 theMaster
->abMake(); // libecap will produce
228 Adaptation::Ecap::XactionRep::vbDiscard()
230 Must(proxyingVb
== opUndecided
);
231 // if adapter does not need vb, we do not need to send it
232 dropVirgin("vbDiscard");
233 Must(proxyingVb
== opNever
);
237 Adaptation::Ecap::XactionRep::vbMake()
239 Must(proxyingVb
== opUndecided
);
240 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
242 Must(p
->setConsumerIfNotLate(this)); // to make vb, we must receive vb
247 Adaptation::Ecap::XactionRep::vbStopMaking()
249 // if adapter does not need vb, we do not need to receive it
250 if (proxyingVb
== opOn
)
251 dropVirgin("vbStopMaking");
252 Must(proxyingVb
== opComplete
);
256 Adaptation::Ecap::XactionRep::vbMakeMore()
258 Must(proxyingVb
== opOn
); // cannot make more if done proxying
259 // we cannot guarantee more vb, but we can check that there is a chance
260 Must(!theVirginRep
.raw().body_pipe
->exhausted());
264 Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o
, libecap::size_type s
)
267 // We may not be proxyingVb yet. It should be OK, but see vbContentShift().
269 const BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
272 // TODO: make MemBuf use size_t?
273 const size_t haveSize
= static_cast<size_t>(p
->buf().contentSize());
275 // convert to Squid types; XXX: check for overflow
276 const uint64_t offset
= static_cast<uint64_t>(o
);
277 Must(offset
<= haveSize
); // equal iff at the end of content
279 // nsize means no size limit: all content starting from offset
280 const size_t size
= s
== libecap::nsize
?
281 haveSize
- offset
: static_cast<size_t>(s
);
283 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
284 return libecap::Area::FromTempBuffer(p
->buf().content() + offset
,
285 min(static_cast<size_t>(haveSize
- offset
), size
));
289 Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n
)
292 // We may not be proxyingVb yet. It should be OK now, but if BodyPipe
293 // consume() requirements change, we would have to return empty vbContent
294 // until the adapter registers as a consumer
296 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
298 const size_t size
= static_cast<size_t>(n
); // XXX: check for overflow
299 const size_t haveSize
= static_cast<size_t>(p
->buf().contentSize()); // TODO: make MemBuf use size_t?
300 p
->consume(min(size
, haveSize
));
304 Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd
)
306 Must(proxyingAb
== opOn
&& !abProductionFinished
);
307 abProductionFinished
= true;
308 abProductionAtEnd
= atEnd
; // store until ready to stop producing ourselves
309 debugs(93,5, HERE
<< "adapted body production ended");
314 Adaptation::Ecap::XactionRep::noteAbContentAvailable()
316 Must(proxyingAb
== opOn
&& !abProductionFinished
);
320 #if 0 /* XXX: implement */
322 Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize
&size
)
324 Must(answer().body_pipe
!= NULL
);
326 answer().body_pipe
->setBodySize(size
.value());
327 // else the piped body size is unknown by default
332 Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay
&d
)
334 debugs(93,3, HERE
<< "adapter needs time: " <<
335 d
.state
<< '/' << d
.progress
);
340 Adaptation::Ecap::XactionRep::adaptationAborted()
342 tellQueryAborted(true); // should eCAP support retries?
343 mustStop("adaptationAborted");
347 Adaptation::Ecap::XactionRep::callable() const
353 Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount
<BodyPipe
> bp
)
355 Must(proxyingAb
== opOn
);
360 Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount
<BodyPipe
> bp
)
362 Must(proxyingAb
== opOn
);
363 stopProducingFor(answer().body_pipe
, false);
365 theMaster
->abStopMaking();
366 proxyingAb
= opComplete
;
370 Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount
<BodyPipe
> bp
)
372 Must(proxyingVb
== opOn
);
374 theMaster
->noteVbContentAvailable();
378 Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount
<BodyPipe
> bp
)
380 Must(proxyingVb
== opOn
);
382 theMaster
->noteVbContentDone(true);
383 proxyingVb
= opComplete
;
387 Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount
<BodyPipe
> bp
)
389 Must(proxyingVb
== opOn
);
391 theMaster
->noteVbContentDone(false);
392 proxyingVb
= opComplete
;
396 Adaptation::Ecap::XactionRep::noteInitiatorAborted()
398 mustStop("initiator aborted");
401 // get content from the adapter and put it into the adapted pipe
403 Adaptation::Ecap::XactionRep::moveAbContent()
405 Must(proxyingAb
== opOn
);
406 const libecap::Area c
= theMaster
->abContent(0, libecap::nsize
);
407 debugs(93,5, HERE
<< "up to " << c
.size
<< " bytes");
408 if (c
.size
== 0 && abProductionFinished
) { // no ab now and in the future
409 stopProducingFor(answer().body_pipe
, abProductionAtEnd
);
410 proxyingAb
= opComplete
;
411 debugs(93,5, HERE
<< "last adapted body data retrieved");
412 } else if (c
.size
> 0) {
413 if (const size_t used
= answer().body_pipe
->putMoreData(c
.start
, c
.size
))
414 theMaster
->abContentShift(used
);
419 Adaptation::Ecap::XactionRep::status() const
426 if (proxyingVb
== opOn
) {
427 const BodyPipePointer
&vp
= theVirginRep
.raw().body_pipe
;
430 if (vp
!= NULL
&& vp
->stillConsuming(this)) {
432 buf
.append(vp
->status(), strlen(vp
->status())); // XXX
437 if (proxyingAb
== opOn
) {
438 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
440 const BodyPipePointer
&ap
= rep
->raw().body_pipe
;
441 if (ap
!= NULL
&& ap
->stillProducing(this)) {
442 buf
.append(" Ab", 3);
443 buf
.append(ap
->status(), strlen(ap
->status())); // XXX
445 buf
.append(" A.", 3);
448 buf
.Printf(" ecapx%d]", id
);
452 return buf
.content();