]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/ecap/XactionRep.cc
2 * DEBUG: section 93 eCAP Interface
5 #include <libecap/common/area.h>
6 #include <libecap/common/delay.h>
7 #include <libecap/adapter/xaction.h>
8 #include "HttpRequest.h"
10 #include "SquidTime.h"
11 #include "adaptation/ecap/XactionRep.h"
12 #include "base/TextException.h"
14 CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep
, XactionRep
);
17 Adaptation::Ecap::XactionRep::XactionRep(Adaptation::Initiator
*anInitiator
,
18 HttpMsg
*virginHeader
, HttpRequest
*virginCause
,
19 const Adaptation::ServicePointer
&aService
):
20 AsyncJob("Adaptation::Ecap::XactionRep"),
21 Adaptation::Initiate("Adaptation::Ecap::XactionRep", anInitiator
),
23 theVirginRep(virginHeader
), theCauseRep(NULL
),
24 proxyingVb(opUndecided
), proxyingAb(opUndecided
),
27 abProductionFinished(false), abProductionAtEnd(false)
30 theCauseRep
= new MessageRep(virginCause
);
33 Adaptation::Ecap::XactionRep::~XactionRep()
41 Adaptation::Ecap::XactionRep::master(const AdapterXaction
&x
)
49 Adaptation::Ecap::XactionRep::service()
51 Must(theService
!= NULL
);
56 Adaptation::Ecap::XactionRep::start()
60 if (theVirginRep
.raw().body_pipe
!= NULL
)
61 canAccessVb
= true; /// assumes nobody is consuming; \todo check
65 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*> (theCauseRep
?
66 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
68 Adaptation::History::Pointer ah
= request
->adaptLogHistory();
70 // retrying=false because ecap never retries transactions
71 adaptHistoryId
= ah
->recordXactStart(service().cfg().key
, current_time
, false);
78 Adaptation::Ecap::XactionRep::swanSong()
80 // clear body_pipes, if any
81 // this code does not maintain proxying* and canAccessVb states; should it?
83 if (theAnswerRep
!= NULL
) {
84 BodyPipe::Pointer body_pipe
= answer().body_pipe
;
85 if (body_pipe
!= NULL
) {
86 Must(body_pipe
->stillProducing(this));
87 stopProducingFor(body_pipe
, false);
92 BodyPipe::Pointer body_pipe
= theVirginRep
.raw().body_pipe
;
93 if (body_pipe
!= NULL
) {
94 Must(body_pipe
->stillConsuming(this));
95 stopConsumingFrom(body_pipe
);
101 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*>(theCauseRep
?
102 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
104 Adaptation::History::Pointer ah
= request
->adaptLogHistory();
105 if (ah
!= NULL
&& adaptHistoryId
>= 0)
106 ah
->recordXactFinish(adaptHistoryId
);
108 Adaptation::Initiate::swanSong();
112 Adaptation::Ecap::XactionRep::virgin()
117 const libecap::Message
&
118 Adaptation::Ecap::XactionRep::cause()
120 Must(theCauseRep
!= NULL
);
125 Adaptation::Ecap::XactionRep::adapted()
127 Must(theAnswerRep
!= NULL
);
128 return *theAnswerRep
;
131 Adaptation::Message
&
132 Adaptation::Ecap::XactionRep::answer()
134 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
140 Adaptation::Ecap::XactionRep::terminateMaster()
143 AdapterXaction x
= theMaster
;
150 Adaptation::Ecap::XactionRep::doneAll() const
152 return proxyingVb
>= opComplete
&& proxyingAb
>= opComplete
&&
153 Adaptation::Initiate::doneAll();
156 // stops receiving virgin and enables auto-consumption
158 Adaptation::Ecap::XactionRep::dropVirgin(const char *reason
)
160 debugs(93,4, HERE
<< "because " << reason
<< "; status:" << status());
161 Must(proxyingVb
= opOn
);
163 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
165 Must(p
->stillConsuming(this));
166 stopConsumingFrom(p
);
167 p
->enableAutoConsumption();
168 proxyingVb
= opComplete
;
171 // called from adapter handler so does not inform adapter
175 Adaptation::Ecap::XactionRep::useVirgin()
177 debugs(93,3, HERE
<< status());
178 Must(proxyingAb
== opUndecided
);
179 proxyingAb
= opNever
;
181 BodyPipePointer
&vbody_pipe
= theVirginRep
.raw().body_pipe
;
183 HttpMsg
*clone
= theVirginRep
.raw().header
->clone();
184 // check that clone() copies the pipe so that we do not have to
185 Must(!vbody_pipe
== !clone
->body_pipe
);
187 if (proxyingVb
== opOn
) {
188 Must(vbody_pipe
->stillConsuming(this));
189 // if libecap consumed, we cannot shortcircuit
190 Must(!vbody_pipe
->consumedSize());
191 stopConsumingFrom(vbody_pipe
);
193 proxyingVb
= opComplete
;
194 } else if (proxyingVb
== opUndecided
) {
195 vbody_pipe
= NULL
; // it is not our pipe anymore
196 proxyingVb
= opNever
;
204 Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr
<libecap::Message
> &m
)
206 debugs(93,3, HERE
<< status());
209 Must(proxyingAb
== opUndecided
);
211 HttpMsg
*msg
= answer().header
;
212 if (!theAnswerRep
->body()) { // final, bodyless answer
213 proxyingAb
= opNever
;
215 } else { // got answer headers but need to handle body
217 Must(!msg
->body_pipe
); // only host can set body pipes
218 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
220 rep
->tieBody(this); // sets us as a producer
221 Must(msg
->body_pipe
!= NULL
); // check tieBody
225 debugs(93,4, HERE
<< "adapter will produce body" << status());
226 theMaster
->abMake(); // libecap will produce
231 Adaptation::Ecap::XactionRep::vbDiscard()
233 Must(proxyingVb
== opUndecided
);
234 // if adapter does not need vb, we do not need to send it
235 dropVirgin("vbDiscard");
236 Must(proxyingVb
== opNever
);
240 Adaptation::Ecap::XactionRep::vbMake()
242 Must(proxyingVb
== opUndecided
);
243 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
245 Must(p
->setConsumerIfNotLate(this)); // to make vb, we must receive vb
250 Adaptation::Ecap::XactionRep::vbStopMaking()
252 // if adapter does not need vb, we do not need to receive it
253 if (proxyingVb
== opOn
)
254 dropVirgin("vbStopMaking");
255 Must(proxyingVb
== opComplete
);
259 Adaptation::Ecap::XactionRep::vbMakeMore()
261 Must(proxyingVb
== opOn
); // cannot make more if done proxying
262 // we cannot guarantee more vb, but we can check that there is a chance
263 Must(!theVirginRep
.raw().body_pipe
->exhausted());
267 Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o
, libecap::size_type s
)
270 // We may not be proxyingVb yet. It should be OK, but see vbContentShift().
272 const BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
275 // TODO: make MemBuf use size_t?
276 const size_t haveSize
= static_cast<size_t>(p
->buf().contentSize());
278 // convert to Squid types; XXX: check for overflow
279 const uint64_t offset
= static_cast<uint64_t>(o
);
280 Must(offset
<= haveSize
); // equal iff at the end of content
282 // nsize means no size limit: all content starting from offset
283 const size_t size
= s
== libecap::nsize
?
284 haveSize
- offset
: static_cast<size_t>(s
);
286 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
287 return libecap::Area::FromTempBuffer(p
->buf().content() + offset
,
288 min(static_cast<size_t>(haveSize
- offset
), size
));
292 Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n
)
295 // We may not be proxyingVb yet. It should be OK now, but if BodyPipe
296 // consume() requirements change, we would have to return empty vbContent
297 // until the adapter registers as a consumer
299 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
301 const size_t size
= static_cast<size_t>(n
); // XXX: check for overflow
302 const size_t haveSize
= static_cast<size_t>(p
->buf().contentSize()); // TODO: make MemBuf use size_t?
303 p
->consume(min(size
, haveSize
));
307 Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd
)
309 Must(proxyingAb
== opOn
&& !abProductionFinished
);
310 abProductionFinished
= true;
311 abProductionAtEnd
= atEnd
; // store until ready to stop producing ourselves
312 debugs(93,5, HERE
<< "adapted body production ended");
317 Adaptation::Ecap::XactionRep::noteAbContentAvailable()
319 Must(proxyingAb
== opOn
&& !abProductionFinished
);
323 #if 0 /* XXX: implement */
325 Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize
&size
)
327 Must(answer().body_pipe
!= NULL
);
329 answer().body_pipe
->setBodySize(size
.value());
330 // else the piped body size is unknown by default
335 Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay
&d
)
337 debugs(93,3, HERE
<< "adapter needs time: " <<
338 d
.state
<< '/' << d
.progress
);
343 Adaptation::Ecap::XactionRep::adaptationAborted()
345 tellQueryAborted(true); // should eCAP support retries?
346 mustStop("adaptationAborted");
350 Adaptation::Ecap::XactionRep::callable() const
356 Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount
<BodyPipe
> bp
)
358 Must(proxyingAb
== opOn
);
363 Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount
<BodyPipe
> bp
)
365 Must(proxyingAb
== opOn
);
366 stopProducingFor(answer().body_pipe
, false);
368 theMaster
->abStopMaking();
369 proxyingAb
= opComplete
;
373 Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount
<BodyPipe
> bp
)
375 Must(proxyingVb
== opOn
);
377 theMaster
->noteVbContentAvailable();
381 Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount
<BodyPipe
> bp
)
383 Must(proxyingVb
== opOn
);
385 theMaster
->noteVbContentDone(true);
386 proxyingVb
= opComplete
;
390 Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount
<BodyPipe
> bp
)
392 Must(proxyingVb
== opOn
);
394 theMaster
->noteVbContentDone(false);
395 proxyingVb
= opComplete
;
399 Adaptation::Ecap::XactionRep::noteInitiatorAborted()
401 mustStop("initiator aborted");
404 // get content from the adapter and put it into the adapted pipe
406 Adaptation::Ecap::XactionRep::moveAbContent()
408 Must(proxyingAb
== opOn
);
409 const libecap::Area c
= theMaster
->abContent(0, libecap::nsize
);
410 debugs(93,5, HERE
<< "up to " << c
.size
<< " bytes");
411 if (c
.size
== 0 && abProductionFinished
) { // no ab now and in the future
412 stopProducingFor(answer().body_pipe
, abProductionAtEnd
);
413 proxyingAb
= opComplete
;
414 debugs(93,5, HERE
<< "last adapted body data retrieved");
415 } else if (c
.size
> 0) {
416 if (const size_t used
= answer().body_pipe
->putMoreData(c
.start
, c
.size
))
417 theMaster
->abContentShift(used
);
422 Adaptation::Ecap::XactionRep::status() const
429 if (proxyingVb
== opOn
) {
430 const BodyPipePointer
&vp
= theVirginRep
.raw().body_pipe
;
433 if (vp
!= NULL
&& vp
->stillConsuming(this)) {
435 buf
.append(vp
->status(), strlen(vp
->status())); // XXX
440 if (proxyingAb
== opOn
) {
441 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
443 const BodyPipePointer
&ap
= rep
->raw().body_pipe
;
444 if (ap
!= NULL
&& ap
->stillProducing(this)) {
445 buf
.append(" Ab", 3);
446 buf
.append(ap
->status(), strlen(ap
->status())); // XXX
448 buf
.append(" A.", 3);
451 buf
.Printf(" ecapx%d]", id
);
455 return buf
.content();