]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/ecap/XactionRep.cc
2 * DEBUG: section 93 eCAP Interface
5 #include <libecap/common/area.h>
6 #include <libecap/common/delay.h>
7 #include <libecap/common/named_values.h>
8 #include <libecap/common/names.h>
9 #include <libecap/adapter/xaction.h>
10 #include "adaptation/Answer.h"
11 #include "adaptation/ecap/Config.h"
12 #include "adaptation/ecap/XactionRep.h"
13 #include "adaptation/Initiator.h"
14 #include "base/AsyncJobCalls.h"
15 #include "base/TextException.h"
16 #include "format/Format.h"
17 #include "HttpReply.h"
18 #include "HttpRequest.h"
19 #include "SquidTime.h"
21 CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep
, XactionRep
);
23 /// a libecap Visitor for converting adapter transaction options to HttpHeader
24 class OptionsExtractor
: public libecap::NamedValueVisitor
27 typedef libecap::Name Name
;
28 typedef libecap::Area Area
;
30 OptionsExtractor(HttpHeader
&aMeta
): meta(aMeta
) {}
32 // libecap::NamedValueVisitor API
33 virtual void visit(const Name
&name
, const Area
&value
) {
34 meta
.putExt(name
.image().c_str(), value
.toString().c_str());
37 HttpHeader
&meta
; ///< where to put extracted options
40 Adaptation::Ecap::XactionRep::XactionRep(
41 HttpMsg
*virginHeader
, HttpRequest
*virginCause
, AccessLogEntry::Pointer
&alp
,
42 const Adaptation::ServicePointer
&aService
):
43 AsyncJob("Adaptation::Ecap::XactionRep"),
44 Adaptation::Initiate("Adaptation::Ecap::XactionRep"),
46 theVirginRep(virginHeader
), theCauseRep(NULL
),
47 makingVb(opUndecided
), proxyingAb(opUndecided
),
49 vbProductionFinished(false),
50 abProductionFinished(false), abProductionAtEnd(false),
54 theCauseRep
= new MessageRep(virginCause
);
57 Adaptation::Ecap::XactionRep::~XactionRep()
65 Adaptation::Ecap::XactionRep::master(const AdapterXaction
&x
)
73 Adaptation::Ecap::XactionRep::service()
75 Must(theService
!= NULL
);
80 Adaptation::Ecap::XactionRep::option(const libecap::Name
&name
) const
82 if (name
== libecap::metaClientIp
)
83 return clientIpValue();
84 if (name
== libecap::metaUserName
)
85 return usernameValue();
86 if (Adaptation::Config::masterx_shared_name
&& name
== Adaptation::Config::masterx_shared_name
)
87 return masterxSharedValue(name
);
89 // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
91 // If the name is unknown, metaValue returns an emtpy area
92 return metaValue(name
);
96 Adaptation::Ecap::XactionRep::visitEachOption(libecap::NamedValueVisitor
&visitor
) const
98 if (const libecap::Area value
= clientIpValue())
99 visitor
.visit(libecap::metaClientIp
, value
);
100 if (const libecap::Area value
= usernameValue())
101 visitor
.visit(libecap::metaUserName
, value
);
103 if (Adaptation::Config::masterx_shared_name
) {
104 const libecap::Name
name(Adaptation::Config::masterx_shared_name
);
105 if (const libecap::Area value
= masterxSharedValue(name
))
106 visitor
.visit(name
, value
);
109 visitEachMetaHeader(visitor
);
111 // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
115 Adaptation::Ecap::XactionRep::clientIpValue() const
117 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*>(theCauseRep
?
118 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
120 // TODO: move this logic into HttpRequest::clientIp(bool) and
121 // HttpRequest::clientIpString(bool) and reuse everywhere
122 if (TheConfig
.send_client_ip
&& request
) {
123 Ip::Address client_addr
;
124 #if FOLLOW_X_FORWARDED_FOR
125 if (TheConfig
.use_indirect_client
) {
126 client_addr
= request
->indirect_client_addr
;
129 client_addr
= request
->client_addr
;
130 if (!client_addr
.isAnyAddr() && !client_addr
.isNoAddr()) {
131 char ntoabuf
[MAX_IPSTRLEN
] = "";
132 client_addr
.toStr(ntoabuf
,MAX_IPSTRLEN
);
133 return libecap::Area::FromTempBuffer(ntoabuf
, strlen(ntoabuf
));
136 return libecap::Area();
140 Adaptation::Ecap::XactionRep::usernameValue() const
143 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*>(theCauseRep
?
144 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
146 if (request
->auth_user_request
!= NULL
) {
147 if (char const *name
= request
->auth_user_request
->username())
148 return libecap::Area::FromTempBuffer(name
, strlen(name
));
149 else if (request
->extacl_user
.size() > 0)
150 return libecap::Area::FromTempBuffer(request
->extacl_user
.rawBuf(),
151 request
->extacl_user
.size());
154 return libecap::Area();
158 Adaptation::Ecap::XactionRep::masterxSharedValue(const libecap::Name
&name
) const
160 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*>(theCauseRep
?
161 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
163 if (name
.known()) { // must check to avoid empty names matching unset cfg
164 Adaptation::History::Pointer ah
= request
->adaptHistory(false);
167 if (ah
->getXxRecord(name
, value
))
168 return libecap::Area::FromTempBuffer(value
.rawBuf(), value
.size());
171 return libecap::Area();
175 Adaptation::Ecap::XactionRep::metaValue(const libecap::Name
&name
) const
177 HttpRequest
*request
= dynamic_cast<HttpRequest
*>(theCauseRep
?
178 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
180 HttpReply
*reply
= dynamic_cast<HttpReply
*>(theVirginRep
.raw().header
);
182 if (name
.known()) { // must check to avoid empty names matching unset cfg
183 typedef Notes::iterator ACAMLI
;
184 for (ACAMLI i
= Adaptation::Config::metaHeaders
.begin(); i
!= Adaptation::Config::metaHeaders
.end(); ++i
) {
185 if (name
== (*i
)->key
.termedBuf()) {
186 if (const char *value
= (*i
)->match(request
, reply
, al
))
187 return libecap::Area::FromTempString(value
);
189 return libecap::Area();
194 return libecap::Area();
198 Adaptation::Ecap::XactionRep::visitEachMetaHeader(libecap::NamedValueVisitor
&visitor
) const
200 HttpRequest
*request
= dynamic_cast<HttpRequest
*>(theCauseRep
?
201 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
203 HttpReply
*reply
= dynamic_cast<HttpReply
*>(theVirginRep
.raw().header
);
205 typedef Notes::iterator ACAMLI
;
206 for (ACAMLI i
= Adaptation::Config::metaHeaders
.begin(); i
!= Adaptation::Config::metaHeaders
.end(); ++i
) {
207 const char *v
= (*i
)->match(request
, reply
, al
);
209 const libecap::Name
name((*i
)->key
.termedBuf());
210 const libecap::Area value
= libecap::Area::FromTempString(v
);
211 visitor
.visit(name
, value
);
217 Adaptation::Ecap::XactionRep::start()
221 if (!theVirginRep
.raw().body_pipe
)
222 makingVb
= opNever
; // there is nothing to deliver
224 HttpRequest
*request
= dynamic_cast<HttpRequest
*> (theCauseRep
?
225 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
228 HttpReply
*reply
= dynamic_cast<HttpReply
*>(theVirginRep
.raw().header
);
230 Adaptation::History::Pointer ah
= request
->adaptLogHistory();
232 // retrying=false because ecap never retries transactions
233 adaptHistoryId
= ah
->recordXactStart(service().cfg().key
, current_time
, false);
234 typedef Notes::iterator ACAMLI
;
235 for (ACAMLI i
= Adaptation::Config::metaHeaders
.begin(); i
!= Adaptation::Config::metaHeaders
.end(); ++i
) {
236 const char *v
= (*i
)->match(request
, reply
, al
);
238 if (ah
->metaHeaders
== NULL
)
239 ah
->metaHeaders
= new NotePairs();
240 if (!ah
->metaHeaders
->hasPair((*i
)->key
.termedBuf(), v
))
241 ah
->metaHeaders
->add((*i
)->key
.termedBuf(), v
);
250 Adaptation::Ecap::XactionRep::swanSong()
252 // clear body_pipes, if any
253 // this code does not maintain proxying* and canAccessVb states; should it?
255 if (theAnswerRep
!= NULL
) {
256 BodyPipe::Pointer body_pipe
= answer().body_pipe
;
257 if (body_pipe
!= NULL
) {
258 Must(body_pipe
->stillProducing(this));
259 stopProducingFor(body_pipe
, false);
263 BodyPipe::Pointer
&body_pipe
= theVirginRep
.raw().body_pipe
;
264 if (body_pipe
!= NULL
&& body_pipe
->stillConsuming(this))
265 stopConsumingFrom(body_pipe
);
269 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*>(theCauseRep
?
270 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
272 Adaptation::History::Pointer ah
= request
->adaptLogHistory();
273 if (ah
!= NULL
&& adaptHistoryId
>= 0)
274 ah
->recordXactFinish(adaptHistoryId
);
276 Adaptation::Initiate::swanSong();
280 Adaptation::Ecap::XactionRep::resume()
282 // go async to gain exception protection and done()-based job destruction
283 typedef NullaryMemFunT
<Adaptation::Ecap::XactionRep
> Dialer
;
284 AsyncCall::Pointer call
= asyncCall(93, 5, "Adaptation::Ecap::XactionRep::doResume",
285 Dialer(this, &Adaptation::Ecap::XactionRep::doResume
));
286 ScheduleCallHere(call
);
289 /// the guts of libecap::host::Xaction::resume() API implementation
290 /// which just goes async in Adaptation::Ecap::XactionRep::resume().
292 Adaptation::Ecap::XactionRep::doResume()
299 Adaptation::Ecap::XactionRep::virgin()
304 const libecap::Message
&
305 Adaptation::Ecap::XactionRep::cause()
307 Must(theCauseRep
!= NULL
);
312 Adaptation::Ecap::XactionRep::adapted()
314 Must(theAnswerRep
!= NULL
);
315 return *theAnswerRep
;
318 Adaptation::Message
&
319 Adaptation::Ecap::XactionRep::answer()
321 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
327 Adaptation::Ecap::XactionRep::terminateMaster()
330 AdapterXaction x
= theMaster
;
337 Adaptation::Ecap::XactionRep::doneAll() const
339 return makingVb
>= opComplete
&& proxyingAb
>= opComplete
&&
340 Adaptation::Initiate::doneAll();
343 // stops receiving virgin and enables auto-consumption, dropping any vb bytes
345 Adaptation::Ecap::XactionRep::sinkVb(const char *reason
)
347 debugs(93,4, HERE
<< "sink for " << reason
<< "; status:" << status());
349 // we reset raw().body_pipe when we are done, so use this one for checking
350 const BodyPipePointer
&permPipe
= theVirginRep
.raw().header
->body_pipe
;
351 if (permPipe
!= NULL
)
352 permPipe
->enableAutoConsumption();
357 // stops receiving virgin but preserves it for others to use
359 Adaptation::Ecap::XactionRep::preserveVb(const char *reason
)
361 debugs(93,4, HERE
<< "preserve for " << reason
<< "; status:" << status());
363 // we reset raw().body_pipe when we are done, so use this one for checking
364 const BodyPipePointer
&permPipe
= theVirginRep
.raw().header
->body_pipe
;
365 if (permPipe
!= NULL
) {
366 // if libecap consumed, we cannot preserve
367 Must(!permPipe
->consumedSize());
373 // disassociates us from vb; the last step of sinking or preserving vb
375 Adaptation::Ecap::XactionRep::forgetVb(const char *reason
)
377 debugs(93,9, HERE
<< "forget vb " << reason
<< "; status:" << status());
379 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
380 if (p
!= NULL
&& p
->stillConsuming(this))
381 stopConsumingFrom(p
);
383 if (makingVb
== opUndecided
)
385 else if (makingVb
== opOn
)
386 makingVb
= opComplete
;
390 Adaptation::Ecap::XactionRep::useVirgin()
392 debugs(93,3, HERE
<< status());
393 Must(proxyingAb
== opUndecided
);
394 proxyingAb
= opNever
;
396 preserveVb("useVirgin");
398 HttpMsg
*clone
= theVirginRep
.raw().header
->clone();
399 // check that clone() copies the pipe so that we do not have to
400 Must(!theVirginRep
.raw().header
->body_pipe
== !clone
->body_pipe
);
402 updateHistory(clone
);
403 sendAnswer(Answer::Forward(clone
));
408 Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr
<libecap::Message
> &m
)
410 debugs(93,3, HERE
<< status());
413 Must(proxyingAb
== opUndecided
);
415 HttpMsg
*msg
= answer().header
;
416 if (!theAnswerRep
->body()) { // final, bodyless answer
417 proxyingAb
= opNever
;
419 sendAnswer(Answer::Forward(msg
));
420 } else { // got answer headers but need to handle body
422 Must(!msg
->body_pipe
); // only host can set body pipes
423 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
425 rep
->tieBody(this); // sets us as a producer
426 Must(msg
->body_pipe
!= NULL
); // check tieBody
429 sendAnswer(Answer::Forward(msg
));
431 debugs(93,4, HERE
<< "adapter will produce body" << status());
432 theMaster
->abMake(); // libecap will produce
437 Adaptation::Ecap::XactionRep::blockVirgin()
439 debugs(93,3, HERE
<< status());
440 Must(proxyingAb
== opUndecided
);
441 proxyingAb
= opNever
;
443 sinkVb("blockVirgin");
446 sendAnswer(Answer::Block(service().cfg().key
));
450 /// Called just before sendAnswer() to record adapter meta-information
451 /// which may affect answer processing and may be needed for logging.
453 Adaptation::Ecap::XactionRep::updateHistory(HttpMsg
*adapted
)
455 if (!theMaster
) // all updates rely on being able to query the adapter
458 const HttpRequest
*request
= dynamic_cast<const HttpRequest
*>(theCauseRep
?
459 theCauseRep
->raw().header
: theVirginRep
.raw().header
);
462 // TODO: move common ICAP/eCAP logic to Adaptation::Xaction or similar
463 // TODO: optimize Area-to-String conversion
465 // update the cross-transactional database if needed
466 if (const char *xxNameStr
= Adaptation::Config::masterx_shared_name
) {
467 Adaptation::History::Pointer ah
= request
->adaptHistory(true);
469 libecap::Name
xxName(xxNameStr
); // TODO: optimize?
470 if (const libecap::Area val
= theMaster
->option(xxName
))
471 ah
->updateXxRecord(xxNameStr
, val
.toString().c_str());
475 // update the adaptation plan if needed
476 if (service().cfg().routing
) {
478 if (const libecap::Area services
= theMaster
->option(libecap::metaNextServices
)) {
479 Adaptation::History::Pointer ah
= request
->adaptHistory(true);
481 ah
->updateNextServices(services
.toString().c_str());
483 } // TODO: else warn (occasionally!) if we got libecap::metaNextServices
485 // Store received meta headers for adapt::<last_h logformat code use.
486 // If we already have stored headers from a previous adaptation transaction
487 // related to the same master transction, they will be replaced.
488 Adaptation::History::Pointer ah
= request
->adaptLogHistory();
490 HttpHeader
meta(hoReply
);
491 OptionsExtractor
extractor(meta
);
492 theMaster
->visitEachOption(extractor
);
493 ah
->recordMeta(&meta
);
496 // Add just-created history to the adapted/cloned request that lacks it.
497 if (HttpRequest
*adaptedReq
= dynamic_cast<HttpRequest
*>(adapted
))
498 adaptedReq
->adaptHistoryImport(*request
);
502 Adaptation::Ecap::XactionRep::vbDiscard()
504 Must(makingVb
== opUndecided
);
505 // if adapter does not need vb, we do not need to send it
507 Must(makingVb
== opNever
);
511 Adaptation::Ecap::XactionRep::vbMake()
513 Must(makingVb
== opUndecided
);
514 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
516 Must(p
->setConsumerIfNotLate(this)); // to deliver vb, we must receive vb
521 Adaptation::Ecap::XactionRep::vbStopMaking()
523 Must(makingVb
== opOn
);
524 // if adapter does not need vb, we do not need to receive it
525 sinkVb("vbStopMaking");
526 Must(makingVb
== opComplete
);
530 Adaptation::Ecap::XactionRep::vbMakeMore()
532 Must(makingVb
== opOn
); // cannot make more if done proxying
533 // we cannot guarantee more vb, but we can check that there is a chance
534 const BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
535 Must(p
!= NULL
&& p
->stillConsuming(this)); // we are plugged in
536 Must(!p
->productionEnded() && p
->mayNeedMoreData()); // and may get more
540 Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o
, libecap::size_type s
)
542 // We may not be makingVb yet. It should be OK, but see vbContentShift().
544 const BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
547 // TODO: make MemBuf use size_t?
548 const size_t haveSize
= static_cast<size_t>(p
->buf().contentSize());
550 // convert to Squid types; XXX: check for overflow
551 const uint64_t offset
= static_cast<uint64_t>(o
);
552 Must(offset
<= haveSize
); // equal iff at the end of content
554 // nsize means no size limit: all content starting from offset
555 const size_t size
= s
== libecap::nsize
?
556 haveSize
- offset
: static_cast<size_t>(s
);
558 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
559 return libecap::Area::FromTempBuffer(p
->buf().content() + offset
,
560 min(static_cast<size_t>(haveSize
- offset
), size
));
564 Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n
)
566 // We may not be makingVb yet. It should be OK now, but if BodyPipe
567 // consume() requirements change, we would have to return empty vbContent
568 // until the adapter registers as a consumer
570 BodyPipePointer
&p
= theVirginRep
.raw().body_pipe
;
572 const size_t size
= static_cast<size_t>(n
); // XXX: check for overflow
573 const size_t haveSize
= static_cast<size_t>(p
->buf().contentSize()); // TODO: make MemBuf use size_t?
574 p
->consume(min(size
, haveSize
));
578 Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd
)
580 Must(proxyingAb
== opOn
&& !abProductionFinished
);
581 abProductionFinished
= true;
582 abProductionAtEnd
= atEnd
; // store until ready to stop producing ourselves
583 debugs(93,5, HERE
<< "adapted body production ended");
588 Adaptation::Ecap::XactionRep::noteAbContentAvailable()
590 Must(proxyingAb
== opOn
&& !abProductionFinished
);
594 #if 0 /* XXX: implement */
596 Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize
&size
)
598 Must(answer().body_pipe
!= NULL
);
600 answer().body_pipe
->setBodySize(size
.value());
601 // else the piped body size is unknown by default
606 Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay
&d
)
608 debugs(93,3, HERE
<< "adapter needs time: " <<
609 d
.state
<< '/' << d
.progress
);
614 Adaptation::Ecap::XactionRep::adaptationAborted()
616 tellQueryAborted(true); // should eCAP support retries?
617 mustStop("adaptationAborted");
621 Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount
<BodyPipe
> bp
)
623 Must(proxyingAb
== opOn
);
628 Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount
<BodyPipe
> bp
)
630 Must(proxyingAb
== opOn
);
631 stopProducingFor(answer().body_pipe
, false);
633 theMaster
->abStopMaking();
634 proxyingAb
= opComplete
;
638 Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount
<BodyPipe
> bp
)
640 Must(makingVb
== opOn
); // or we would not be registered as a consumer
642 theMaster
->noteVbContentAvailable();
646 Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount
<BodyPipe
> bp
)
648 Must(makingVb
== opOn
); // or we would not be registered as a consumer
650 theMaster
->noteVbContentDone(true);
651 vbProductionFinished
= true;
655 Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount
<BodyPipe
> bp
)
657 Must(makingVb
== opOn
); // or we would not be registered as a consumer
659 theMaster
->noteVbContentDone(false);
660 vbProductionFinished
= true;
664 Adaptation::Ecap::XactionRep::noteInitiatorAborted()
666 mustStop("initiator aborted");
669 // get content from the adapter and put it into the adapted pipe
671 Adaptation::Ecap::XactionRep::moveAbContent()
673 Must(proxyingAb
== opOn
);
674 const libecap::Area c
= theMaster
->abContent(0, libecap::nsize
);
675 debugs(93,5, HERE
<< "up to " << c
.size
<< " bytes");
676 if (c
.size
== 0 && abProductionFinished
) { // no ab now and in the future
677 stopProducingFor(answer().body_pipe
, abProductionAtEnd
);
678 proxyingAb
= opComplete
;
679 debugs(93,5, HERE
<< "last adapted body data retrieved");
680 } else if (c
.size
> 0) {
681 if (const size_t used
= answer().body_pipe
->putMoreData(c
.start
, c
.size
))
682 theMaster
->abContentShift(used
);
687 Adaptation::Ecap::XactionRep::status() const
695 buf
.Printf("M%d", static_cast<int>(makingVb
));
697 const BodyPipePointer
&vp
= theVirginRep
.raw().body_pipe
;
699 buf
.append(" !V", 3);
700 else if (vp
->stillConsuming(const_cast<XactionRep
*>(this)))
701 buf
.append(" Vc", 3);
703 buf
.append(" V?", 3);
705 if (vbProductionFinished
)
708 buf
.Printf(" A%d", static_cast<int>(proxyingAb
));
710 if (proxyingAb
== opOn
) {
711 MessageRep
*rep
= dynamic_cast<MessageRep
*>(theAnswerRep
.get());
713 const BodyPipePointer
&ap
= rep
->raw().body_pipe
;
715 buf
.append(" !A", 3);
716 else if (ap
->stillProducing(const_cast<XactionRep
*>(this)))
717 buf
.append(" Ap", 3);
719 buf
.append(" A?", 3);
722 buf
.Printf(" %s%u]", id
.Prefix
, id
.value
);
726 return buf
.content();