]> git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/ecap/XactionRep.cc
Merged from parent (trunk 11270, circa 3.2.0.5+)
[thirdparty/squid.git] / src / adaptation / ecap / XactionRep.cc
1 /*
2 * DEBUG: section 93 eCAP Interface
3 */
4 #include "squid.h"
5 #include <libecap/common/area.h>
6 #include <libecap/common/delay.h>
7 #include <libecap/common/named_values.h>
8 #include <libecap/common/names.h>
9 #include <libecap/adapter/xaction.h>
10 #include "HttpRequest.h"
11 #include "HttpReply.h"
12 #include "SquidTime.h"
13 #include "adaptation/ecap/XactionRep.h"
14 #include "adaptation/ecap/Config.h"
15 #include "adaptation/Initiator.h"
16 #include "base/TextException.h"
17
18 CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep, XactionRep);
19
20
21 /// a libecap Visitor for converting adapter transaction options to HttpHeader
22 class OptionsExtractor: public libecap::NamedValueVisitor
23 {
24 public:
25 typedef libecap::Name Name;
26 typedef libecap::Area Area;
27
28 OptionsExtractor(HttpHeader &aMeta): meta(aMeta) {}
29
30 // libecap::NamedValueVisitor API
31 virtual void visit(const Name &name, const Area &value)
32 {
33 meta.putExt(name.image().c_str(), value.toString().c_str());
34 }
35
36 HttpHeader &meta; ///< where to put extracted options
37 };
38
39 Adaptation::Ecap::XactionRep::XactionRep(
40 HttpMsg *virginHeader, HttpRequest *virginCause,
41 const Adaptation::ServicePointer &aService):
42 AsyncJob("Adaptation::Ecap::XactionRep"),
43 Adaptation::Initiate("Adaptation::Ecap::XactionRep"),
44 theService(aService),
45 theVirginRep(virginHeader), theCauseRep(NULL),
46 makingVb(opUndecided), proxyingAb(opUndecided),
47 adaptHistoryId(-1),
48 vbProductionFinished(false),
49 abProductionFinished(false), abProductionAtEnd(false)
50 {
51 if (virginCause)
52 theCauseRep = new MessageRep(virginCause);
53 }
54
55 Adaptation::Ecap::XactionRep::~XactionRep()
56 {
57 assert(!theMaster);
58 delete theCauseRep;
59 theAnswerRep.reset();
60 }
61
62 void
63 Adaptation::Ecap::XactionRep::master(const AdapterXaction &x)
64 {
65 Must(!theMaster);
66 Must(x != NULL);
67 theMaster = x;
68 }
69
70 Adaptation::Service &
71 Adaptation::Ecap::XactionRep::service()
72 {
73 Must(theService != NULL);
74 return *theService;
75 }
76
77 const libecap::Area
78 Adaptation::Ecap::XactionRep::option(const libecap::Name &name) const
79 {
80 if (name == libecap::metaClientIp)
81 return clientIpValue();
82 if (name == libecap::metaUserName)
83 return usernameValue();
84 if (name == Adaptation::Config::masterx_shared_name)
85 return masterxSharedValue(name);
86
87 // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
88 return libecap::Area();
89 }
90
91 void
92 Adaptation::Ecap::XactionRep::visitEachOption(libecap::NamedValueVisitor &visitor) const
93 {
94 if (const libecap::Area value = clientIpValue())
95 visitor.visit(libecap::metaClientIp, value);
96 if (const libecap::Area value = usernameValue())
97 visitor.visit(libecap::metaUserName, value);
98
99 if (Adaptation::Config::masterx_shared_name) {
100 const libecap::Name name(Adaptation::Config::masterx_shared_name);
101 if (const libecap::Area value = masterxSharedValue(name))
102 visitor.visit(name, value);
103 }
104
105 // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
106 }
107
108 const libecap::Area
109 Adaptation::Ecap::XactionRep::clientIpValue() const
110 {
111 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
112 theCauseRep->raw().header : theVirginRep.raw().header);
113 Must(request);
114 // TODO: move this logic into HttpRequest::clientIp(bool) and
115 // HttpRequest::clientIpString(bool) and reuse everywhere
116 if (TheConfig.send_client_ip && request) {
117 Ip::Address client_addr;
118 #if FOLLOW_X_FORWARDED_FOR
119 if (TheConfig.use_indirect_client) {
120 client_addr = request->indirect_client_addr;
121 } else
122 #endif
123 client_addr = request->client_addr;
124 if (!client_addr.IsAnyAddr() && !client_addr.IsNoAddr()) {
125 char ntoabuf[MAX_IPSTRLEN] = "";
126 client_addr.NtoA(ntoabuf,MAX_IPSTRLEN);
127 return libecap::Area::FromTempBuffer(ntoabuf, strlen(ntoabuf));
128 }
129 }
130 return libecap::Area();
131 }
132
133 const libecap::Area
134 Adaptation::Ecap::XactionRep::usernameValue() const
135 {
136 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
137 theCauseRep->raw().header : theVirginRep.raw().header);
138 Must(request);
139 if (request->auth_user_request != NULL) {
140 if (char const *name = request->auth_user_request->username())
141 return libecap::Area::FromTempBuffer(name, strlen(name));
142 }
143 return libecap::Area();
144 }
145
146 const libecap::Area
147 Adaptation::Ecap::XactionRep::masterxSharedValue(const libecap::Name &name) const
148 {
149 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
150 theCauseRep->raw().header : theVirginRep.raw().header);
151 Must(request);
152 if (name.known()) { // must check to avoid empty names matching unset cfg
153 Adaptation::History::Pointer ah = request->adaptHistory(false);
154 if (ah != NULL) {
155 String name, value;
156 if (ah->getXxRecord(name, value))
157 return libecap::Area::FromTempBuffer(value.rawBuf(), value.size());
158 }
159 }
160 return libecap::Area();
161 }
162
163 void
164 Adaptation::Ecap::XactionRep::start()
165 {
166 Must(theMaster);
167
168 if (!theVirginRep.raw().body_pipe)
169 makingVb = opNever; // there is nothing to deliver
170
171 const HttpRequest *request = dynamic_cast<const HttpRequest*> (theCauseRep ?
172 theCauseRep->raw().header : theVirginRep.raw().header);
173 Must(request);
174 Adaptation::History::Pointer ah = request->adaptLogHistory();
175 if (ah != NULL) {
176 // retrying=false because ecap never retries transactions
177 adaptHistoryId = ah->recordXactStart(service().cfg().key, current_time, false);
178 }
179
180 theMaster->start();
181 }
182
183 void
184 Adaptation::Ecap::XactionRep::swanSong()
185 {
186 // clear body_pipes, if any
187 // this code does not maintain proxying* and canAccessVb states; should it?
188
189 if (theAnswerRep != NULL) {
190 BodyPipe::Pointer body_pipe = answer().body_pipe;
191 if (body_pipe != NULL) {
192 Must(body_pipe->stillProducing(this));
193 stopProducingFor(body_pipe, false);
194 }
195 }
196
197 BodyPipe::Pointer &body_pipe = theVirginRep.raw().body_pipe;
198 if (body_pipe != NULL && body_pipe->stillConsuming(this))
199 stopConsumingFrom(body_pipe);
200
201 terminateMaster();
202
203 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
204 theCauseRep->raw().header : theVirginRep.raw().header);
205 Must(request);
206 Adaptation::History::Pointer ah = request->adaptLogHistory();
207 if (ah != NULL && adaptHistoryId >= 0)
208 ah->recordXactFinish(adaptHistoryId);
209
210 Adaptation::Initiate::swanSong();
211 }
212
213 libecap::Message &
214 Adaptation::Ecap::XactionRep::virgin()
215 {
216 return theVirginRep;
217 }
218
219 const libecap::Message &
220 Adaptation::Ecap::XactionRep::cause()
221 {
222 Must(theCauseRep != NULL);
223 return *theCauseRep;
224 }
225
226 libecap::Message &
227 Adaptation::Ecap::XactionRep::adapted()
228 {
229 Must(theAnswerRep != NULL);
230 return *theAnswerRep;
231 }
232
233 Adaptation::Message &
234 Adaptation::Ecap::XactionRep::answer()
235 {
236 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
237 Must(rep);
238 return rep->raw();
239 }
240
241 void
242 Adaptation::Ecap::XactionRep::terminateMaster()
243 {
244 if (theMaster) {
245 AdapterXaction x = theMaster;
246 theMaster.reset();
247 x->stop();
248 }
249 }
250
251 bool
252 Adaptation::Ecap::XactionRep::doneAll() const
253 {
254 return makingVb >= opComplete && proxyingAb >= opComplete &&
255 Adaptation::Initiate::doneAll();
256 }
257
258 // stops receiving virgin and enables auto-consumption, dropping any vb bytes
259 void
260 Adaptation::Ecap::XactionRep::sinkVb(const char *reason)
261 {
262 debugs(93,4, HERE << "sink for " << reason << "; status:" << status());
263
264 // we reset raw().body_pipe when we are done, so use this one for checking
265 const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
266 if (permPipe != NULL)
267 permPipe->enableAutoConsumption();
268
269 forgetVb(reason);
270 }
271
272 // stops receiving virgin but preserves it for others to use
273 void
274 Adaptation::Ecap::XactionRep::preserveVb(const char *reason)
275 {
276 debugs(93,4, HERE << "preserve for " << reason << "; status:" << status());
277
278 // we reset raw().body_pipe when we are done, so use this one for checking
279 const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
280 if (permPipe != NULL) {
281 // if libecap consumed, we cannot preserve
282 Must(!permPipe->consumedSize());
283 }
284
285 forgetVb(reason);
286 }
287
288 // disassociates us from vb; the last step of sinking or preserving vb
289 void
290 Adaptation::Ecap::XactionRep::forgetVb(const char *reason)
291 {
292 debugs(93,9, HERE << "forget vb " << reason << "; status:" << status());
293
294 BodyPipePointer &p = theVirginRep.raw().body_pipe;
295 if (p != NULL && p->stillConsuming(this))
296 stopConsumingFrom(p);
297
298 if (makingVb == opUndecided)
299 makingVb = opNever;
300 else if (makingVb == opOn)
301 makingVb = opComplete;
302 }
303
304 void
305 Adaptation::Ecap::XactionRep::useVirgin()
306 {
307 debugs(93,3, HERE << status());
308 Must(proxyingAb == opUndecided);
309 proxyingAb = opNever;
310
311 preserveVb("useVirgin");
312
313 HttpMsg *clone = theVirginRep.raw().header->clone();
314 // check that clone() copies the pipe so that we do not have to
315 Must(!theVirginRep.raw().header->body_pipe == !clone->body_pipe);
316
317 updateHistory();
318 sendAnswer(Answer::Forward(clone));
319 Must(done());
320 }
321
322 void
323 Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
324 {
325 debugs(93,3, HERE << status());
326 Must(m);
327 theAnswerRep = m;
328 Must(proxyingAb == opUndecided);
329
330 HttpMsg *msg = answer().header;
331 if (!theAnswerRep->body()) { // final, bodyless answer
332 proxyingAb = opNever;
333 updateHistory();
334 sendAnswer(Answer::Forward(msg));
335 } else { // got answer headers but need to handle body
336 proxyingAb = opOn;
337 Must(!msg->body_pipe); // only host can set body pipes
338 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
339 Must(rep);
340 rep->tieBody(this); // sets us as a producer
341 Must(msg->body_pipe != NULL); // check tieBody
342
343 updateHistory();
344 sendAnswer(Answer::Forward(msg));
345
346 debugs(93,4, HERE << "adapter will produce body" << status());
347 theMaster->abMake(); // libecap will produce
348 }
349 }
350
351 void
352 Adaptation::Ecap::XactionRep::blockVirgin()
353 {
354 debugs(93,3, HERE << status());
355 Must(proxyingAb == opUndecided);
356 proxyingAb = opNever;
357
358 sinkVb("blockVirgin");
359
360 updateHistory();
361 sendAnswer(Answer::Block(service().cfg().key));
362 Must(done());
363 }
364
365 /// Called just before sendAnswer() to record adapter meta-information
366 /// which may affect answer processing and may be needed for logging.
367 void
368 Adaptation::Ecap::XactionRep::updateHistory()
369 {
370 if (!theMaster) // all updates rely on being able to query the adapter
371 return;
372
373 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
374 theCauseRep->raw().header : theVirginRep.raw().header);
375 Must(request);
376
377 // TODO: move common ICAP/eCAP logic to Adaptation::Xaction or similar
378 // TODO: optimize Area-to-String conversion
379
380 // update the cross-transactional database if needed
381 if (const char *xxNameStr = Adaptation::Config::masterx_shared_name) {
382 Adaptation::History::Pointer ah = request->adaptHistory(true);
383 if (ah != NULL) {
384 libecap::Name xxName(xxNameStr); // TODO: optimize?
385 if (const libecap::Area val = theMaster->option(xxName))
386 ah->updateXxRecord(xxNameStr, val.toString().c_str());
387 }
388 }
389
390 // update the adaptation plan if needed
391 if (service().cfg().routing) {
392 String services;
393 if (const libecap::Area services = theMaster->option(libecap::metaNextServices)) {
394 Adaptation::History::Pointer ah = request->adaptHistory(true);
395 if (ah != NULL)
396 ah->updateNextServices(services.toString().c_str());
397 }
398 } // TODO: else warn (occasionally!) if we got libecap::metaNextServices
399
400 // Store received meta headers for adapt::<last_h logformat code use.
401 // If we already have stored headers from a previous adaptation transaction
402 // related to the same master transction, they will be replaced.
403 Adaptation::History::Pointer ah = request->adaptLogHistory();
404 if (ah != NULL) {
405 HttpHeader meta(hoReply);
406 OptionsExtractor extractor(meta);
407 theMaster->visitEachOption(extractor);
408 ah->recordMeta(&meta);
409 }
410 }
411
412
413 void
414 Adaptation::Ecap::XactionRep::vbDiscard()
415 {
416 Must(makingVb == opUndecided);
417 // if adapter does not need vb, we do not need to send it
418 sinkVb("vbDiscard");
419 Must(makingVb == opNever);
420 }
421
422 void
423 Adaptation::Ecap::XactionRep::vbMake()
424 {
425 Must(makingVb == opUndecided);
426 BodyPipePointer &p = theVirginRep.raw().body_pipe;
427 Must(p != NULL);
428 Must(p->setConsumerIfNotLate(this)); // to deliver vb, we must receive vb
429 makingVb = opOn;
430 }
431
432 void
433 Adaptation::Ecap::XactionRep::vbStopMaking()
434 {
435 Must(makingVb == opOn);
436 // if adapter does not need vb, we do not need to receive it
437 sinkVb("vbStopMaking");
438 Must(makingVb == opComplete);
439 }
440
441 void
442 Adaptation::Ecap::XactionRep::vbMakeMore()
443 {
444 Must(makingVb == opOn); // cannot make more if done proxying
445 // we cannot guarantee more vb, but we can check that there is a chance
446 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
447 Must(p != NULL && p->stillConsuming(this)); // we are plugged in
448 Must(!p->productionEnded() && p->mayNeedMoreData()); // and may get more
449 }
450
451 libecap::Area
452 Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
453 {
454 // We may not be makingVb yet. It should be OK, but see vbContentShift().
455
456 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
457 Must(p != NULL);
458
459 // TODO: make MemBuf use size_t?
460 const size_t haveSize = static_cast<size_t>(p->buf().contentSize());
461
462 // convert to Squid types; XXX: check for overflow
463 const uint64_t offset = static_cast<uint64_t>(o);
464 Must(offset <= haveSize); // equal iff at the end of content
465
466 // nsize means no size limit: all content starting from offset
467 const size_t size = s == libecap::nsize ?
468 haveSize - offset : static_cast<size_t>(s);
469
470 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
471 return libecap::Area::FromTempBuffer(p->buf().content() + offset,
472 min(static_cast<size_t>(haveSize - offset), size));
473 }
474
475 void
476 Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n)
477 {
478 // We may not be makingVb yet. It should be OK now, but if BodyPipe
479 // consume() requirements change, we would have to return empty vbContent
480 // until the adapter registers as a consumer
481
482 BodyPipePointer &p = theVirginRep.raw().body_pipe;
483 Must(p != NULL);
484 const size_t size = static_cast<size_t>(n); // XXX: check for overflow
485 const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
486 p->consume(min(size, haveSize));
487 }
488
489 void
490 Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd)
491 {
492 Must(proxyingAb == opOn && !abProductionFinished);
493 abProductionFinished = true;
494 abProductionAtEnd = atEnd; // store until ready to stop producing ourselves
495 debugs(93,5, HERE << "adapted body production ended");
496 moveAbContent();
497 }
498
499 void
500 Adaptation::Ecap::XactionRep::noteAbContentAvailable()
501 {
502 Must(proxyingAb == opOn && !abProductionFinished);
503 moveAbContent();
504 }
505
506 #if 0 /* XXX: implement */
507 void
508 Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
509 {
510 Must(answer().body_pipe != NULL);
511 if (size.known())
512 answer().body_pipe->setBodySize(size.value());
513 // else the piped body size is unknown by default
514 }
515 #endif
516
517 void
518 Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
519 {
520 debugs(93,3, HERE << "adapter needs time: " <<
521 d.state << '/' << d.progress);
522 // XXX: set timeout?
523 }
524
525 void
526 Adaptation::Ecap::XactionRep::adaptationAborted()
527 {
528 tellQueryAborted(true); // should eCAP support retries?
529 mustStop("adaptationAborted");
530 }
531
532 bool
533 Adaptation::Ecap::XactionRep::callable() const
534 {
535 return !done();
536 }
537
538 void
539 Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
540 {
541 Must(proxyingAb == opOn);
542 moveAbContent();
543 }
544
545 void
546 Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
547 {
548 Must(proxyingAb == opOn);
549 stopProducingFor(answer().body_pipe, false);
550 Must(theMaster);
551 theMaster->abStopMaking();
552 proxyingAb = opComplete;
553 }
554
555 void
556 Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
557 {
558 Must(makingVb == opOn); // or we would not be registered as a consumer
559 Must(theMaster);
560 theMaster->noteVbContentAvailable();
561 }
562
563 void
564 Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
565 {
566 Must(makingVb == opOn); // or we would not be registered as a consumer
567 Must(theMaster);
568 theMaster->noteVbContentDone(true);
569 vbProductionFinished = true;
570 }
571
572 void
573 Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
574 {
575 Must(makingVb == opOn); // or we would not be registered as a consumer
576 Must(theMaster);
577 theMaster->noteVbContentDone(false);
578 vbProductionFinished = true;
579 }
580
581 void
582 Adaptation::Ecap::XactionRep::noteInitiatorAborted()
583 {
584 mustStop("initiator aborted");
585 }
586
587 // get content from the adapter and put it into the adapted pipe
588 void
589 Adaptation::Ecap::XactionRep::moveAbContent()
590 {
591 Must(proxyingAb == opOn);
592 const libecap::Area c = theMaster->abContent(0, libecap::nsize);
593 debugs(93,5, HERE << "up to " << c.size << " bytes");
594 if (c.size == 0 && abProductionFinished) { // no ab now and in the future
595 stopProducingFor(answer().body_pipe, abProductionAtEnd);
596 proxyingAb = opComplete;
597 debugs(93,5, HERE << "last adapted body data retrieved");
598 } else if (c.size > 0) {
599 if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
600 theMaster->abContentShift(used);
601 }
602 }
603
604 const char *
605 Adaptation::Ecap::XactionRep::status() const
606 {
607 static MemBuf buf;
608 buf.reset();
609
610 buf.append(" [", 2);
611
612 if (makingVb)
613 buf.Printf("M%d", static_cast<int>(makingVb));
614
615 const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
616 if (!vp)
617 buf.append(" !V", 3);
618 else
619 if (vp->stillConsuming(const_cast<XactionRep*>(this)))
620 buf.append(" Vc", 3);
621 else
622 buf.append(" V?", 3);
623
624 if (vbProductionFinished)
625 buf.append(".", 1);
626
627
628 buf.Printf(" A%d", static_cast<int>(proxyingAb));
629
630 if (proxyingAb == opOn) {
631 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
632 Must(rep);
633 const BodyPipePointer &ap = rep->raw().body_pipe;
634 if (!ap)
635 buf.append(" !A", 3);
636 else if (ap->stillProducing(const_cast<XactionRep*>(this)))
637 buf.append(" Ap", 3);
638 else
639 buf.append(" A?", 3);
640 }
641
642 buf.Printf(" %s%u]", id.Prefix, id.value);
643
644 buf.terminate();
645
646 return buf.content();
647 }