]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/ecap/XactionRep.cc
2 #include <libecap/common/area.h>
3 #include <libecap/common/delay.h>
4 #include <libecap/adapter/xaction.h>
5 #include "TextException.h"
6 #include "HttpRequest.h"
8 #include "adaptation/ecap/XactionRep.h"
10 CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep
, XactionRep
);
13 Adaptation::Ecap::XactionRep::XactionRep(Adaptation::Initiator
*anInitiator
,
14 HttpMsg
*virginHeader
, HttpRequest
*virginCause
,
15 const Adaptation::ServicePointer
&aService
):
16 AsyncJob("Adaptation::Ecap::XactionRep"),
17 Adaptation::Initiate("Adaptation::Ecap::XactionRep", anInitiator
, aService
),
18 theVirginRep(virginHeader
), theCauseRep(NULL
),
19 proxyingVb(opUndecided
), proxyingAb(opUndecided
), canAccessVb(false)
22 theCauseRep
= new MessageRep(virginCause
);
25 Adaptation::Ecap::XactionRep::~XactionRep()
33 Adaptation::Ecap::XactionRep::master(const AdapterXaction
&x
)
41 Adaptation::Ecap::XactionRep::start()
45 if (theVirginRep
.raw().body_pipe
!= NULL
)
46 canAccessVb
= true; /// assumes nobody is consuming; \todo check
54 Adaptation::Ecap::XactionRep::swanSong()
56 // clear body_pipes, if any
57 // this code does not maintain proxying* and canAccessVb states; should it?
59 if (theAnswerRep
!= NULL
) {
60 BodyPipe::Pointer body_pipe
= answer().body_pipe
;
61 if (body_pipe
!= NULL
) {
62 Must(body_pipe
->stillProducing(this));
63 stopProducingFor(body_pipe
, false);
68 BodyPipe::Pointer body_pipe
= theVirginRep
.raw().body_pipe
;
69 if (body_pipe
!= NULL
) {
70 Must(body_pipe
->stillConsuming(this));
71 stopConsumingFrom(body_pipe
);
76 Adaptation::Initiate::swanSong();
80 Adaptation::Ecap::XactionRep::virgin()
85 const libecap::Message
&
86 Adaptation::Ecap::XactionRep::cause()
88 Must(theCauseRep
!= NULL
);
93 Adaptation::Ecap::XactionRep::adapted()
95 Must(theAnswerRep
!= NULL
);
100 Adaptation::Ecap::XactionRep::answer()
102 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
108 Adaptation::Ecap::XactionRep::terminateMaster()
111 AdapterXaction x
= theMaster
;
118 Adaptation::Ecap::XactionRep::doneAll() const
120 return proxyingVb
>= opComplete
&& proxyingAb
>= opComplete
&&
121 Adaptation::Initiate::doneAll();
124 // stops receiving virgin and enables auto-consumption
126 Adaptation::Ecap::XactionRep::dropVirgin(const char *reason
)
128 debugs(93,4, HERE
<< "because " << reason
<< "; status:" << status());
129 Must(proxyingVb
= opOn
);
131 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
133 Must(p
->stillConsuming(this));
134 stopConsumingFrom(p
);
135 p
->enableAutoConsumption();
136 proxyingVb
= opComplete
;
139 // called from adapter handler so does not inform adapter
143 Adaptation::Ecap::XactionRep::useVirgin()
145 debugs(93,3, HERE
<< status());
146 Must(proxyingAb
== opUndecided
);
147 proxyingAb
= opNever
;
149 BodyPipePointer
&vbody_pipe
= theVirginRep
.raw().body_pipe
;
151 HttpMsg
*clone
= theVirginRep
.raw().header
->clone();
152 // check that clone() copies the pipe so that we do not have to
153 Must(!vbody_pipe
== !clone
->body_pipe
);
155 if (proxyingVb
== opOn
) {
156 Must(vbody_pipe
->stillConsuming(this));
157 // if libecap consumed, we cannot shortcircuit
158 Must(!vbody_pipe
->consumedSize());
159 stopConsumingFrom(vbody_pipe
);
161 proxyingVb
= opComplete
;
163 if (proxyingVb
== opUndecided
) {
164 vbody_pipe
= NULL
; // it is not our pipe anymore
165 proxyingVb
= opNever
;
173 Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr
<libecap::Message
> &m
)
175 debugs(93,3, HERE
<< status());
178 Must(proxyingAb
== opUndecided
);
180 HttpMsg
*msg
= answer().header
;
181 if (!theAnswerRep
->body()) { // final, bodyless answer
182 proxyingAb
= opNever
;
184 } else { // got answer headers but need to handle body
186 Must(!msg
->body_pipe
); // only host can set body pipes
187 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
189 rep
->tieBody(this); // sets us as a producer
190 Must(msg
->body_pipe
!= NULL
); // check tieBody
194 debugs(93,4, HERE
<< "adapter will produce body" << status());
195 theMaster
->abMake(); // libecap will produce
200 Adaptation::Ecap::XactionRep::vbDiscard()
202 Must(proxyingVb
== opUndecided
);
203 // if adapter does not need vb, we do not need to send it
204 dropVirgin("vbDiscard");
205 Must(proxyingVb
== opNever
);
209 Adaptation::Ecap::XactionRep::vbMake()
211 Must(proxyingVb
== opUndecided
);
212 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
214 Must(p
->setConsumerIfNotLate(this)); // to make vb, we must receive vb
219 Adaptation::Ecap::XactionRep::vbStopMaking()
221 // if adapter does not need vb, we do not need to receive it
222 if (proxyingVb
== opOn
)
223 dropVirgin("vbStopMaking");
224 Must(proxyingVb
== opComplete
);
228 Adaptation::Ecap::XactionRep::vbMakeMore()
230 Must(proxyingVb
== opOn
); // cannot make more if done proxying
231 // we cannot guarantee more vb, but we can check that there is a chance
232 Must(!theVirginRep
.raw().body_pipe
->exhausted());
236 Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o
, libecap::size_type s
)
239 // We may not be proxyingVb yet. It should be OK, but see vbContentShift().
241 const BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
244 // TODO: make MemBuf use size_t?
245 const size_t haveSize
= static_cast<size_t>(p
->buf().contentSize());
247 // convert to Squid types; XXX: check for overflow
248 const uint64_t offset
= static_cast<uint64_t>(o
);
249 Must(offset
<= haveSize
); // equal iff at the end of content
251 // nsize means no size limit: all content starting from offset
252 const size_t size
= s
== libecap::nsize
?
253 haveSize
- offset
: static_cast<size_t>(s
);
255 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
256 return libecap::Area::FromTempBuffer(p
->buf().content() + offset
,
257 min(static_cast<size_t>(haveSize
- offset
), size
));
261 Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n
)
264 // We may not be proxyingVb yet. It should be OK now, but if BodyPipe
265 // consume() requirements change, we would have to return empty vbContent
266 // until the adapter registers as a consumer
268 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
270 const size_t size
= static_cast<size_t>(n
); // XXX: check for overflow
271 const size_t haveSize
= static_cast<size_t>(p
->buf().contentSize()); // TODO: make MemBuf use size_t?
272 p
->consume(min(size
, haveSize
));
276 Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd
)
278 Must(proxyingAb
== opOn
);
279 stopProducingFor(answer().body_pipe
, atEnd
);
280 proxyingAb
= opComplete
;
284 Adaptation::Ecap::XactionRep::noteAbContentAvailable()
286 Must(proxyingAb
== opOn
);
290 #if 0 /* XXX: implement */
292 Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize
&size
)
294 Must(answer().body_pipe
!= NULL
);
296 answer().body_pipe
->setBodySize(size
.value());
297 // else the piped body size is unknown by default
302 Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay
&d
)
304 debugs(93,3, HERE
<< "adapter needs time: " <<
305 d
.state
<< '/' << d
.progress
);
310 Adaptation::Ecap::XactionRep::adaptationAborted()
312 tellQueryAborted(true); // should eCAP support retries?
313 mustStop("adaptationAborted");
317 Adaptation::Ecap::XactionRep::callable() const
323 Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount
<BodyPipe
> bp
)
325 Must(proxyingAb
== opOn
);
330 Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount
<BodyPipe
> bp
)
332 Must(proxyingAb
== opOn
);
333 stopProducingFor(answer().body_pipe
, false);
335 theMaster
->abStopMaking();
336 proxyingAb
= opComplete
;
340 Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount
<BodyPipe
> bp
)
342 Must(proxyingVb
== opOn
);
344 theMaster
->noteVbContentAvailable();
348 Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount
<BodyPipe
> bp
)
350 Must(proxyingVb
== opOn
);
352 theMaster
->noteVbContentDone(true);
353 proxyingVb
= opComplete
;
357 Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount
<BodyPipe
> bp
)
359 Must(proxyingVb
== opOn
);
361 theMaster
->noteVbContentDone(false);
362 proxyingVb
= opComplete
;
366 Adaptation::Ecap::XactionRep::noteInitiatorAborted()
368 mustStop("initiator aborted");
371 // get content from the adapter and put it into the adapted pipe
373 Adaptation::Ecap::XactionRep::moveAbContent()
375 Must(proxyingAb
== opOn
);
376 const libecap::Area c
= theMaster
->abContent(0, libecap::nsize
);
377 debugs(93,5, HERE
<< " up to " << c
.size
<< " bytes");
378 if (const size_t used
= answer().body_pipe
->putMoreData(c
.start
, c
.size
))
379 theMaster
->abContentShift(used
);
383 Adaptation::Ecap::XactionRep::status() const
390 if (proxyingVb
== opOn
) {
391 const BodyPipePointer
&vp
= theVirginRep
.raw().body_pipe
;
394 if (vp
!= NULL
&& vp
->stillConsuming(this)) {
396 buf
.append(vp
->status(), strlen(vp
->status())); // XXX
401 if (proxyingAb
== opOn
) {
402 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
404 const BodyPipePointer
&ap
= rep
->raw().body_pipe
;
405 if (ap
!= NULL
&& ap
->stillProducing(this)) {
406 buf
.append(" Ab", 3);
407 buf
.append(ap
->status(), strlen(ap
->status())); // XXX
409 buf
.append(" A.", 3);
412 buf
.Printf(" ecapx%d]", id
);
416 return buf
.content();