]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/ecap/XactionRep.cc
2 * DEBUG: section 93 eCAP Interface
5 #include <libecap/common/area.h>
6 #include <libecap/common/delay.h>
7 #include <libecap/common/named_values.h>
8 #include <libecap/common/names.h>
9 #include <libecap/adapter/xaction.h>
10 #include "HttpRequest.h"
11 #include "HttpReply.h"
12 #include "SquidTime.h"
13 #include "adaptation/ecap/XactionRep.h"
14 #include "adaptation/ecap/Config.h"
15 #include "adaptation/Initiator.h"
16 #include "base/TextException.h"
18 CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep
, XactionRep
);
21 /// a libecap Visitor for converting adapter transaction options to HttpHeader
22 class OptionsExtractor
: public libecap::NamedValueVisitor
25 typedef libecap::Name Name
;
26 typedef libecap::Area Area
;
28 OptionsExtractor(HttpHeader
&aMeta
): meta(aMeta
) {}
30 // libecap::NamedValueVisitor API
31 virtual void visit(const Name
&name
, const Area
&value
)
33 meta
.putExt(name
.image().c_str(), value
.toString().c_str());
36 HttpHeader
&meta
; ///< where to put extracted options
39 Adaptation::Ecap::XactionRep::XactionRep(
40 HttpMsg
*virginHeader
, HttpRequest
*virginCause
,
41 const Adaptation::ServicePointer
&aService
):
42 AsyncJob("Adaptation::Ecap::XactionRep"),
43 Adaptation::Initiate("Adaptation::Ecap::XactionRep"),
45 theVirginRep(virginHeader
), theCauseRep(NULL
),
46 makingVb(opUndecided
), proxyingAb(opUndecided
),
48 vbProductionFinished(false),
49 abProductionFinished(false), abProductionAtEnd(false)
52 theCauseRep
= new MessageRep(virginCause
);
55 Adaptation::Ecap::XactionRep::~XactionRep()
63 Adaptation::Ecap::XactionRep::master(const AdapterXaction
&x
)
71 Adaptation::Ecap::XactionRep::service()
73 Must(theService
!= NULL
);
78 Adaptation::Ecap::XactionRep::option(const libecap::Name
&name
) const
80 if (name
== libecap::metaClientIp
)
81 return clientIpValue();
82 if (name
== libecap::metaUserName
)
83 return usernameValue();
84 if (name
== Adaptation::Config::masterx_shared_name
)
85 return masterxSharedValue(name
);
87 // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
88 return libecap::Area();
92 Adaptation::Ecap::XactionRep::visitEachOption(libecap::NamedValueVisitor
&visitor
) const
94 if (const libecap::Area value
= clientIpValue())
95 visitor
.visit(libecap::metaClientIp
, value
);
96 if (const libecap::Area value
= usernameValue())
97 visitor
.visit(libecap::metaUserName
, value
);
99 if (Adaptation::Config::masterx_shared_name
) {
100 const libecap::Name
name(Adaptation::Config::masterx_shared_name
);
101 if (const libecap::Area value
= masterxSharedValue(name
))
102 visitor
.visit(name
, value
);
105 // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
109 Adaptation::Ecap::XactionRep::clientIpValue() const
111 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*>(theCauseRep
?
112 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
114 // TODO: move this logic into HttpRequest::clientIp(bool) and
115 // HttpRequest::clientIpString(bool) and reuse everywhere
116 if (TheConfig
.send_client_ip
&& request
) {
117 Ip::Address client_addr
;
118 #if FOLLOW_X_FORWARDED_FOR
119 if (TheConfig
.use_indirect_client
) {
120 client_addr
= request
->indirect_client_addr
;
123 client_addr
= request
->client_addr
;
124 if (!client_addr
.IsAnyAddr() && !client_addr
.IsNoAddr()) {
125 char ntoabuf
[MAX_IPSTRLEN
] = "";
126 client_addr
.NtoA(ntoabuf
,MAX_IPSTRLEN
);
127 return libecap::Area::FromTempBuffer(ntoabuf
, strlen(ntoabuf
));
130 return libecap::Area();
134 Adaptation::Ecap::XactionRep::usernameValue() const
136 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*>(theCauseRep
?
137 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
139 if (request
->auth_user_request
!= NULL
) {
140 if (char const *name
= request
->auth_user_request
->username())
141 return libecap::Area::FromTempBuffer(name
, strlen(name
));
143 return libecap::Area();
147 Adaptation::Ecap::XactionRep::masterxSharedValue(const libecap::Name
&name
) const
149 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*>(theCauseRep
?
150 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
152 if (name
.known()) { // must check to avoid empty names matching unset cfg
153 Adaptation::History::Pointer ah
= request
->adaptHistory(false);
156 if (ah
->getXxRecord(name
, value
))
157 return libecap::Area::FromTempBuffer(value
.rawBuf(), value
.size());
160 return libecap::Area();
164 Adaptation::Ecap::XactionRep::start()
168 if (!theVirginRep
.raw().body_pipe
)
169 makingVb
= opNever
; // there is nothing to deliver
171 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*> (theCauseRep
?
172 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
174 Adaptation::History::Pointer ah
= request
->adaptLogHistory();
176 // retrying=false because ecap never retries transactions
177 adaptHistoryId
= ah
->recordXactStart(service().cfg().key
, current_time
, false);
184 Adaptation::Ecap::XactionRep::swanSong()
186 // clear body_pipes, if any
187 // this code does not maintain proxying* and canAccessVb states; should it?
189 if (theAnswerRep
!= NULL
) {
190 BodyPipe::Pointer body_pipe
= answer().body_pipe
;
191 if (body_pipe
!= NULL
) {
192 Must(body_pipe
->stillProducing(this));
193 stopProducingFor(body_pipe
, false);
197 BodyPipe::Pointer
&body_pipe
= theVirginRep
.raw().body_pipe
;
198 if (body_pipe
!= NULL
&& body_pipe
->stillConsuming(this))
199 stopConsumingFrom(body_pipe
);
203 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*>(theCauseRep
?
204 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
206 Adaptation::History::Pointer ah
= request
->adaptLogHistory();
207 if (ah
!= NULL
&& adaptHistoryId
>= 0)
208 ah
->recordXactFinish(adaptHistoryId
);
210 Adaptation::Initiate::swanSong();
214 Adaptation::Ecap::XactionRep::virgin()
219 const libecap::Message
&
220 Adaptation::Ecap::XactionRep::cause()
222 Must(theCauseRep
!= NULL
);
227 Adaptation::Ecap::XactionRep::adapted()
229 Must(theAnswerRep
!= NULL
);
230 return *theAnswerRep
;
233 Adaptation::Message
&
234 Adaptation::Ecap::XactionRep::answer()
236 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
242 Adaptation::Ecap::XactionRep::terminateMaster()
245 AdapterXaction x
= theMaster
;
252 Adaptation::Ecap::XactionRep::doneAll() const
254 return makingVb
>= opComplete
&& proxyingAb
>= opComplete
&&
255 Adaptation::Initiate::doneAll();
258 // stops receiving virgin and enables auto-consumption, dropping any vb bytes
260 Adaptation::Ecap::XactionRep::sinkVb(const char *reason
)
262 debugs(93,4, HERE
<< "sink for " << reason
<< "; status:" << status());
264 // we reset raw().body_pipe when we are done, so use this one for checking
265 const BodyPipePointer
&permPipe
= theVirginRep
.raw().header
->body_pipe
;
266 if (permPipe
!= NULL
)
267 permPipe
->enableAutoConsumption();
272 // stops receiving virgin but preserves it for others to use
274 Adaptation::Ecap::XactionRep::preserveVb(const char *reason
)
276 debugs(93,4, HERE
<< "preserve for " << reason
<< "; status:" << status());
278 // we reset raw().body_pipe when we are done, so use this one for checking
279 const BodyPipePointer
&permPipe
= theVirginRep
.raw().header
->body_pipe
;
280 if (permPipe
!= NULL
) {
281 // if libecap consumed, we cannot preserve
282 Must(!permPipe
->consumedSize());
288 // disassociates us from vb; the last step of sinking or preserving vb
290 Adaptation::Ecap::XactionRep::forgetVb(const char *reason
)
292 debugs(93,9, HERE
<< "forget vb " << reason
<< "; status:" << status());
294 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
295 if (p
!= NULL
&& p
->stillConsuming(this))
296 stopConsumingFrom(p
);
298 if (makingVb
== opUndecided
)
300 else if (makingVb
== opOn
)
301 makingVb
= opComplete
;
305 Adaptation::Ecap::XactionRep::useVirgin()
307 debugs(93,3, HERE
<< status());
308 Must(proxyingAb
== opUndecided
);
309 proxyingAb
= opNever
;
311 preserveVb("useVirgin");
313 HttpMsg
*clone
= theVirginRep
.raw().header
->clone();
314 // check that clone() copies the pipe so that we do not have to
315 Must(!theVirginRep
.raw().header
->body_pipe
== !clone
->body_pipe
);
318 sendAnswer(Answer::Forward(clone
));
323 Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr
<libecap::Message
> &m
)
325 debugs(93,3, HERE
<< status());
328 Must(proxyingAb
== opUndecided
);
330 HttpMsg
*msg
= answer().header
;
331 if (!theAnswerRep
->body()) { // final, bodyless answer
332 proxyingAb
= opNever
;
334 sendAnswer(Answer::Forward(msg
));
335 } else { // got answer headers but need to handle body
337 Must(!msg
->body_pipe
); // only host can set body pipes
338 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
340 rep
->tieBody(this); // sets us as a producer
341 Must(msg
->body_pipe
!= NULL
); // check tieBody
344 sendAnswer(Answer::Forward(msg
));
346 debugs(93,4, HERE
<< "adapter will produce body" << status());
347 theMaster
->abMake(); // libecap will produce
352 Adaptation::Ecap::XactionRep::blockVirgin()
354 debugs(93,3, HERE
<< status());
355 Must(proxyingAb
== opUndecided
);
356 proxyingAb
= opNever
;
358 sinkVb("blockVirgin");
361 sendAnswer(Answer::Block(service().cfg().key
));
365 /// Called just before sendAnswer() to record adapter meta-information
366 /// which may affect answer processing and may be needed for logging.
368 Adaptation::Ecap::XactionRep::updateHistory()
370 if (!theMaster
) // all updates rely on being able to query the adapter
373 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*>(theCauseRep
?
374 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
377 // TODO: move common ICAP/eCAP logic to Adaptation::Xaction or similar
378 // TODO: optimize Area-to-String conversion
380 // update the cross-transactional database if needed
381 if (const char *xxNameStr
= Adaptation::Config::masterx_shared_name
) {
382 Adaptation::History::Pointer ah
= request
->adaptHistory(true);
384 libecap::Name
xxName(xxNameStr
); // TODO: optimize?
385 if (const libecap::Area val
= theMaster
->option(xxName
))
386 ah
->updateXxRecord(xxNameStr
, val
.toString().c_str());
390 // update the adaptation plan if needed
391 if (service().cfg().routing
) {
393 if (const libecap::Area services
= theMaster
->option(libecap::metaNextServices
)) {
394 Adaptation::History::Pointer ah
= request
->adaptHistory(true);
396 ah
->updateNextServices(services
.toString().c_str());
398 } // TODO: else warn (occasionally!) if we got libecap::metaNextServices
400 // Store received meta headers for adapt::<last_h logformat code use.
401 // If we already have stored headers from a previous adaptation transaction
402 // related to the same master transction, they will be replaced.
403 Adaptation::History::Pointer ah
= request
->adaptLogHistory();
405 HttpHeader
meta(hoReply
);
406 OptionsExtractor
extractor(meta
);
407 theMaster
->visitEachOption(extractor
);
408 ah
->recordMeta(&meta
);
414 Adaptation::Ecap::XactionRep::vbDiscard()
416 Must(makingVb
== opUndecided
);
417 // if adapter does not need vb, we do not need to send it
419 Must(makingVb
== opNever
);
423 Adaptation::Ecap::XactionRep::vbMake()
425 Must(makingVb
== opUndecided
);
426 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
428 Must(p
->setConsumerIfNotLate(this)); // to deliver vb, we must receive vb
433 Adaptation::Ecap::XactionRep::vbStopMaking()
435 Must(makingVb
== opOn
);
436 // if adapter does not need vb, we do not need to receive it
437 sinkVb("vbStopMaking");
438 Must(makingVb
== opComplete
);
442 Adaptation::Ecap::XactionRep::vbMakeMore()
444 Must(makingVb
== opOn
); // cannot make more if done proxying
445 // we cannot guarantee more vb, but we can check that there is a chance
446 const BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
447 Must(p
!= NULL
&& p
->stillConsuming(this)); // we are plugged in
448 Must(!p
->productionEnded() && p
->mayNeedMoreData()); // and may get more
452 Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o
, libecap::size_type s
)
454 // We may not be makingVb yet. It should be OK, but see vbContentShift().
456 const BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
459 // TODO: make MemBuf use size_t?
460 const size_t haveSize
= static_cast<size_t>(p
->buf().contentSize());
462 // convert to Squid types; XXX: check for overflow
463 const uint64_t offset
= static_cast<uint64_t>(o
);
464 Must(offset
<= haveSize
); // equal iff at the end of content
466 // nsize means no size limit: all content starting from offset
467 const size_t size
= s
== libecap::nsize
?
468 haveSize
- offset
: static_cast<size_t>(s
);
470 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
471 return libecap::Area::FromTempBuffer(p
->buf().content() + offset
,
472 min(static_cast<size_t>(haveSize
- offset
), size
));
476 Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n
)
478 // We may not be makingVb yet. It should be OK now, but if BodyPipe
479 // consume() requirements change, we would have to return empty vbContent
480 // until the adapter registers as a consumer
482 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
484 const size_t size
= static_cast<size_t>(n
); // XXX: check for overflow
485 const size_t haveSize
= static_cast<size_t>(p
->buf().contentSize()); // TODO: make MemBuf use size_t?
486 p
->consume(min(size
, haveSize
));
490 Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd
)
492 Must(proxyingAb
== opOn
&& !abProductionFinished
);
493 abProductionFinished
= true;
494 abProductionAtEnd
= atEnd
; // store until ready to stop producing ourselves
495 debugs(93,5, HERE
<< "adapted body production ended");
500 Adaptation::Ecap::XactionRep::noteAbContentAvailable()
502 Must(proxyingAb
== opOn
&& !abProductionFinished
);
506 #if 0 /* XXX: implement */
508 Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize
&size
)
510 Must(answer().body_pipe
!= NULL
);
512 answer().body_pipe
->setBodySize(size
.value());
513 // else the piped body size is unknown by default
518 Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay
&d
)
520 debugs(93,3, HERE
<< "adapter needs time: " <<
521 d
.state
<< '/' << d
.progress
);
526 Adaptation::Ecap::XactionRep::adaptationAborted()
528 tellQueryAborted(true); // should eCAP support retries?
529 mustStop("adaptationAborted");
533 Adaptation::Ecap::XactionRep::callable() const
539 Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount
<BodyPipe
> bp
)
541 Must(proxyingAb
== opOn
);
546 Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount
<BodyPipe
> bp
)
548 Must(proxyingAb
== opOn
);
549 stopProducingFor(answer().body_pipe
, false);
551 theMaster
->abStopMaking();
552 proxyingAb
= opComplete
;
556 Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount
<BodyPipe
> bp
)
558 Must(makingVb
== opOn
); // or we would not be registered as a consumer
560 theMaster
->noteVbContentAvailable();
564 Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount
<BodyPipe
> bp
)
566 Must(makingVb
== opOn
); // or we would not be registered as a consumer
568 theMaster
->noteVbContentDone(true);
569 vbProductionFinished
= true;
573 Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount
<BodyPipe
> bp
)
575 Must(makingVb
== opOn
); // or we would not be registered as a consumer
577 theMaster
->noteVbContentDone(false);
578 vbProductionFinished
= true;
582 Adaptation::Ecap::XactionRep::noteInitiatorAborted()
584 mustStop("initiator aborted");
587 // get content from the adapter and put it into the adapted pipe
589 Adaptation::Ecap::XactionRep::moveAbContent()
591 Must(proxyingAb
== opOn
);
592 const libecap::Area c
= theMaster
->abContent(0, libecap::nsize
);
593 debugs(93,5, HERE
<< "up to " << c
.size
<< " bytes");
594 if (c
.size
== 0 && abProductionFinished
) { // no ab now and in the future
595 stopProducingFor(answer().body_pipe
, abProductionAtEnd
);
596 proxyingAb
= opComplete
;
597 debugs(93,5, HERE
<< "last adapted body data retrieved");
598 } else if (c
.size
> 0) {
599 if (const size_t used
= answer().body_pipe
->putMoreData(c
.start
, c
.size
))
600 theMaster
->abContentShift(used
);
605 Adaptation::Ecap::XactionRep::status() const
613 buf
.Printf("M%d", static_cast<int>(makingVb
));
615 const BodyPipePointer
&vp
= theVirginRep
.raw().body_pipe
;
617 buf
.append(" !V", 3);
619 if (vp
->stillConsuming(const_cast<XactionRep
*>(this)))
620 buf
.append(" Vc", 3);
622 buf
.append(" V?", 3);
624 if (vbProductionFinished
)
628 buf
.Printf(" A%d", static_cast<int>(proxyingAb
));
630 if (proxyingAb
== opOn
) {
631 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
633 const BodyPipePointer
&ap
= rep
->raw().body_pipe
;
635 buf
.append(" !A", 3);
636 else if (ap
->stillProducing(const_cast<XactionRep
*>(this)))
637 buf
.append(" Ap", 3);
639 buf
.append(" A?", 3);
642 buf
.Printf(" %s%u]", id
.Prefix
, id
.value
);
646 return buf
.content();