]> git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/ecap/XactionRep.cc
c167cd6acf73d5a43b4acffa48f4741a5686f60c
[thirdparty/squid.git] / src / adaptation / ecap / XactionRep.cc
1 /*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 93 eCAP Interface */
10
11 #include "squid.h"
12 #include "adaptation/Answer.h"
13 #include "adaptation/ecap/Config.h"
14 #include "adaptation/ecap/XactionRep.h"
15 #include "adaptation/Initiator.h"
16 #include "base/AsyncJobCalls.h"
17 #include "base/TextException.h"
18 #include "format/Format.h"
19 #include "HttpReply.h"
20 #include "MasterXaction.h"
21
22 #if HAVE_LIBECAP_COMMON_AREA_H
23 #include <libecap/common/area.h>
24 #endif
25 #if HAVE_LIBECAP_COMMON_DELAY_H
26 #include <libecap/common/delay.h>
27 #endif
28 #if HAVE_LIBECAP_COMMON_NAMED_VALUES_H
29 #include <libecap/common/named_values.h>
30 #endif
31 #if HAVE_LIBECAP_COMMON_NAMES_H
32 #include <libecap/common/names.h>
33 #endif
34
35 CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep, XactionRep);
36
37 /// a libecap Visitor for converting adapter transaction options to HttpHeader
38 class OptionsExtractor: public libecap::NamedValueVisitor
39 {
40 public:
41 typedef libecap::Name Name;
42 typedef libecap::Area Area;
43
44 OptionsExtractor(HttpHeader &aMeta): meta(aMeta) {}
45
46 // libecap::NamedValueVisitor API
47 void visit(const Name &name, const Area &value) override {
48 meta.putExt(name.image().c_str(), value.toString().c_str());
49 }
50
51 HttpHeader &meta; ///< where to put extracted options
52 };
53
54 Adaptation::Ecap::XactionRep::XactionRep(
55 Http::Message *virginHeader, HttpRequest *virginCause, AccessLogEntry::Pointer &alp,
56 const Adaptation::ServicePointer &aService):
57 AsyncJob("Adaptation::Ecap::XactionRep"),
58 Adaptation::Initiate("Adaptation::Ecap::XactionRep"),
59 theService(aService),
60 theVirginRep(virginHeader), theCauseRep(nullptr),
61 makingVb(opUndecided), proxyingAb(opUndecided),
62 adaptHistoryId(-1),
63 vbProductionFinished(false),
64 abProductionFinished(false), abProductionAtEnd(false),
65 al(alp)
66 {
67 if (virginCause)
68 theCauseRep = new MessageRep(virginCause);
69 }
70
71 Adaptation::Ecap::XactionRep::~XactionRep()
72 {
73 assert(!theMaster);
74 delete theCauseRep;
75 theAnswerRep.reset();
76 }
77
78 void
79 Adaptation::Ecap::XactionRep::master(const AdapterXaction &x)
80 {
81 Must(!theMaster);
82 Must(x);
83 theMaster = x;
84 }
85
86 Adaptation::Service &
87 Adaptation::Ecap::XactionRep::service()
88 {
89 Must(theService != nullptr);
90 return *theService;
91 }
92
93 const libecap::Area
94 Adaptation::Ecap::XactionRep::option(const libecap::Name &name) const
95 {
96 if (name == libecap::metaClientIp)
97 return clientIpValue();
98 if (name == libecap::metaUserName)
99 return usernameValue();
100 if (Adaptation::Config::masterx_shared_name && name == Adaptation::Config::masterx_shared_name)
101 return masterxSharedValue(name);
102
103 // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
104
105 // If the name is unknown, metaValue returns an empty area
106 return metaValue(name);
107 }
108
109 void
110 Adaptation::Ecap::XactionRep::visitEachOption(libecap::NamedValueVisitor &visitor) const
111 {
112 if (const libecap::Area value = clientIpValue())
113 visitor.visit(libecap::metaClientIp, value);
114 if (const libecap::Area value = usernameValue())
115 visitor.visit(libecap::metaUserName, value);
116
117 if (Adaptation::Config::masterx_shared_name) {
118 const libecap::Name name(Adaptation::Config::masterx_shared_name);
119 if (const libecap::Area value = masterxSharedValue(name))
120 visitor.visit(name, value);
121 }
122
123 visitEachMetaHeader(visitor);
124
125 // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
126 }
127
128 const libecap::Area
129 Adaptation::Ecap::XactionRep::clientIpValue() const
130 {
131 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
132 theCauseRep->raw().header : theVirginRep.raw().header);
133 Must(request);
134 // TODO: move this logic into HttpRequest::clientIp(bool) and
135 // HttpRequest::clientIpString(bool) and reuse everywhere
136 if (TheConfig.send_client_ip && request) {
137 Ip::Address client_addr;
138 #if FOLLOW_X_FORWARDED_FOR
139 if (TheConfig.use_indirect_client) {
140 client_addr = request->indirect_client_addr;
141 } else
142 #endif
143 client_addr = request->client_addr;
144 if (!client_addr.isAnyAddr() && !client_addr.isNoAddr()) {
145 char ntoabuf[MAX_IPSTRLEN] = "";
146 client_addr.toStr(ntoabuf,MAX_IPSTRLEN);
147 return libecap::Area::FromTempBuffer(ntoabuf, strlen(ntoabuf));
148 }
149 }
150 return libecap::Area();
151 }
152
153 const libecap::Area
154 Adaptation::Ecap::XactionRep::usernameValue() const
155 {
156 #if USE_AUTH
157 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
158 theCauseRep->raw().header : theVirginRep.raw().header);
159 Must(request);
160 if (request->auth_user_request != nullptr) {
161 if (char const *name = request->auth_user_request->username())
162 return libecap::Area::FromTempBuffer(name, strlen(name));
163 else if (request->extacl_user.size() > 0)
164 return libecap::Area::FromTempBuffer(request->extacl_user.rawBuf(),
165 request->extacl_user.size());
166 }
167 #endif
168 return libecap::Area();
169 }
170
171 const libecap::Area
172 Adaptation::Ecap::XactionRep::masterxSharedValue(const libecap::Name &sharedName) const
173 {
174 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
175 theCauseRep->raw().header : theVirginRep.raw().header);
176 Must(request);
177 if (sharedName.known()) { // must check to avoid empty names matching unset cfg
178 Adaptation::History::Pointer ah = request->adaptHistory(false);
179 if (ah != nullptr) {
180 String name, value;
181 if (ah->getXxRecord(name, value))
182 return libecap::Area::FromTempBuffer(value.rawBuf(), value.size());
183 }
184 }
185 return libecap::Area();
186 }
187
188 const libecap::Area
189 Adaptation::Ecap::XactionRep::metaValue(const libecap::Name &name) const
190 {
191 HttpRequest *request = dynamic_cast<HttpRequest*>(theCauseRep ?
192 theCauseRep->raw().header : theVirginRep.raw().header);
193 Must(request);
194 HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
195
196 if (name.known()) { // must check to avoid empty names matching unset cfg
197 for (auto h: Adaptation::Config::metaHeaders) {
198 if (name == h->key().toStdString()) {
199 SBuf matched;
200 if (h->match(request, reply, al, matched))
201 return libecap::Area::FromTempString(matched.toStdString());
202 else
203 return libecap::Area();
204 }
205 }
206 }
207
208 return libecap::Area();
209 }
210
211 void
212 Adaptation::Ecap::XactionRep::visitEachMetaHeader(libecap::NamedValueVisitor &visitor) const
213 {
214 HttpRequest *request = dynamic_cast<HttpRequest*>(theCauseRep ?
215 theCauseRep->raw().header : theVirginRep.raw().header);
216 Must(request);
217 HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
218
219 for (auto h: Adaptation::Config::metaHeaders) {
220 SBuf matched;
221 if (h->match(request, reply, al, matched)) {
222 const libecap::Name name(h->key().toStdString());
223 const libecap::Area value = libecap::Area::FromTempString(matched.toStdString());
224 visitor.visit(name, value);
225 }
226 }
227 }
228
229 void
230 Adaptation::Ecap::XactionRep::start()
231 {
232 Must(theMaster);
233
234 if (!theVirginRep.raw().body_pipe)
235 makingVb = opNever; // there is nothing to deliver
236
237 HttpRequest *request = dynamic_cast<HttpRequest*> (theCauseRep ?
238 theCauseRep->raw().header : theVirginRep.raw().header);
239 Must(request);
240
241 HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
242
243 Adaptation::History::Pointer ah = request->adaptLogHistory();
244 if (ah != nullptr) {
245 // retrying=false because ecap never retries transactions
246 adaptHistoryId = ah->recordXactStart(service().cfg().key, current_time, false);
247 SBuf matched;
248 for (auto h: Adaptation::Config::metaHeaders) {
249 if (h->match(request, reply, al, matched)) {
250 if (ah->metaHeaders == nullptr)
251 ah->metaHeaders = new NotePairs();
252 if (!ah->metaHeaders->hasPair(h->key(), matched))
253 ah->metaHeaders->add(h->key(), matched);
254 }
255 }
256 }
257
258 theMaster->start();
259 }
260
261 void
262 Adaptation::Ecap::XactionRep::swanSong()
263 {
264 // clear body_pipes, if any
265 // this code does not maintain proxying* and canAccessVb states; should it?
266
267 if (theAnswerRep) {
268 BodyPipe::Pointer body_pipe = answer().body_pipe;
269 if (body_pipe != nullptr) {
270 Must(body_pipe->stillProducing(this));
271 stopProducingFor(body_pipe, false);
272 }
273 }
274
275 BodyPipe::Pointer &body_pipe = theVirginRep.raw().body_pipe;
276 if (body_pipe != nullptr && body_pipe->stillConsuming(this))
277 stopConsumingFrom(body_pipe);
278
279 terminateMaster();
280
281 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
282 theCauseRep->raw().header : theVirginRep.raw().header);
283 Must(request);
284 Adaptation::History::Pointer ah = request->adaptLogHistory();
285 if (ah != nullptr && adaptHistoryId >= 0)
286 ah->recordXactFinish(adaptHistoryId);
287
288 Adaptation::Initiate::swanSong();
289 }
290
291 void
292 Adaptation::Ecap::XactionRep::resume()
293 {
294 // go async to gain exception protection and done()-based job destruction
295 typedef NullaryMemFunT<Adaptation::Ecap::XactionRep> Dialer;
296 AsyncCall::Pointer call = asyncCall(93, 5, "Adaptation::Ecap::XactionRep::doResume",
297 Dialer(this, &Adaptation::Ecap::XactionRep::doResume));
298 ScheduleCallHere(call);
299 }
300
301 /// the guts of libecap::host::Xaction::resume() API implementation
302 /// which just goes async in Adaptation::Ecap::XactionRep::resume().
303 void
304 Adaptation::Ecap::XactionRep::doResume()
305 {
306 Must(theMaster);
307 theMaster->resume();
308 }
309
310 libecap::Message &
311 Adaptation::Ecap::XactionRep::virgin()
312 {
313 return theVirginRep;
314 }
315
316 const libecap::Message &
317 Adaptation::Ecap::XactionRep::cause()
318 {
319 Must(theCauseRep != nullptr);
320 return *theCauseRep;
321 }
322
323 libecap::Message &
324 Adaptation::Ecap::XactionRep::adapted()
325 {
326 Must(theAnswerRep);
327 return *theAnswerRep;
328 }
329
330 Adaptation::Message &
331 Adaptation::Ecap::XactionRep::answer()
332 {
333 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
334 Must(rep);
335 return rep->raw();
336 }
337
338 void
339 Adaptation::Ecap::XactionRep::terminateMaster()
340 {
341 if (theMaster) {
342 AdapterXaction x = theMaster;
343 theMaster.reset();
344 x->stop();
345 }
346 }
347
348 bool
349 Adaptation::Ecap::XactionRep::doneAll() const
350 {
351 return makingVb >= opComplete && proxyingAb >= opComplete &&
352 Adaptation::Initiate::doneAll();
353 }
354
355 // stops receiving virgin and enables auto-consumption, dropping any vb bytes
356 void
357 Adaptation::Ecap::XactionRep::sinkVb(const char *reason)
358 {
359 debugs(93,4, "sink for " << reason << "; status:" << status());
360
361 // we reset raw().body_pipe when we are done, so use this one for checking
362 const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
363 if (permPipe != nullptr)
364 permPipe->enableAutoConsumption();
365
366 forgetVb(reason);
367 }
368
369 // stops receiving virgin but preserves it for others to use
370 void
371 Adaptation::Ecap::XactionRep::preserveVb(const char *reason)
372 {
373 debugs(93,4, "preserve for " << reason << "; status:" << status());
374
375 // we reset raw().body_pipe when we are done, so use this one for checking
376 const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
377 if (permPipe != nullptr) {
378 // if libecap consumed, we cannot preserve
379 Must(!permPipe->consumedSize());
380 }
381
382 forgetVb(reason);
383 }
384
385 // disassociates us from vb; the last step of sinking or preserving vb
386 void
387 Adaptation::Ecap::XactionRep::forgetVb(const char *reason)
388 {
389 debugs(93,9, "forget vb " << reason << "; status:" << status());
390
391 BodyPipePointer &p = theVirginRep.raw().body_pipe;
392 if (p != nullptr && p->stillConsuming(this))
393 stopConsumingFrom(p);
394
395 if (makingVb == opUndecided)
396 makingVb = opNever;
397 else if (makingVb == opOn)
398 makingVb = opComplete;
399 }
400
401 void
402 Adaptation::Ecap::XactionRep::useVirgin()
403 {
404 debugs(93,3, status());
405 Must(proxyingAb == opUndecided);
406 proxyingAb = opNever;
407
408 preserveVb("useVirgin");
409
410 Http::Message *clone = theVirginRep.raw().header->clone();
411 // check that clone() copies the pipe so that we do not have to
412 Must(!theVirginRep.raw().header->body_pipe == !clone->body_pipe);
413
414 updateHistory(clone);
415 sendAnswer(Answer::Forward(clone));
416 Must(done());
417 }
418
419 void
420 Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
421 {
422 debugs(93,3, status());
423 Must(m);
424 theAnswerRep = m;
425 Must(proxyingAb == opUndecided);
426
427 Http::Message *msg = answer().header;
428 updateSources(msg);
429 if (!theAnswerRep->body()) { // final, bodyless answer
430 proxyingAb = opNever;
431 updateHistory(msg);
432 sendAnswer(Answer::Forward(msg));
433 } else { // got answer headers but need to handle body
434 proxyingAb = opOn;
435 Must(!msg->body_pipe); // only host can set body pipes
436 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
437 Must(rep);
438 rep->tieBody(this); // sets us as a producer
439 Must(msg->body_pipe != nullptr); // check tieBody
440
441 updateHistory(msg);
442 sendAnswer(Answer::Forward(msg));
443
444 debugs(93,4, "adapter will produce body" << status());
445 theMaster->abMake(); // libecap will produce
446 }
447 }
448
449 void
450 Adaptation::Ecap::XactionRep::blockVirgin()
451 {
452 debugs(93,3, status());
453 Must(proxyingAb == opUndecided);
454 proxyingAb = opNever;
455
456 sinkVb("blockVirgin");
457
458 updateHistory(nullptr);
459 sendAnswer(Answer::Block(service().cfg().key));
460 Must(done());
461 }
462
463 /// Called just before sendAnswer() to record adapter meta-information
464 /// which may affect answer processing and may be needed for logging.
465 void
466 Adaptation::Ecap::XactionRep::updateHistory(Http::Message *adapted)
467 {
468 if (!theMaster) // all updates rely on being able to query the adapter
469 return;
470
471 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
472 theCauseRep->raw().header : theVirginRep.raw().header);
473 Must(request);
474
475 // TODO: move common ICAP/eCAP logic to Adaptation::Xaction or similar
476 // TODO: optimize Area-to-String conversion
477
478 // update the cross-transactional database if needed
479 if (const char *xxNameStr = Adaptation::Config::masterx_shared_name) {
480 Adaptation::History::Pointer ah = request->adaptHistory(true);
481 if (ah != nullptr) {
482 libecap::Name xxName(xxNameStr); // TODO: optimize?
483 if (const libecap::Area val = theMaster->option(xxName))
484 ah->updateXxRecord(xxNameStr, val.toString().c_str());
485 }
486 }
487
488 // update the adaptation plan if needed
489 if (service().cfg().routing) {
490 if (const libecap::Area services = theMaster->option(libecap::metaNextServices)) {
491 Adaptation::History::Pointer ah = request->adaptHistory(true);
492 if (ah != nullptr)
493 ah->updateNextServices(services.toString().c_str());
494 }
495 } // TODO: else warn (occasionally!) if we got libecap::metaNextServices
496
497 // Store received meta headers for adapt::<last_h logformat code use.
498 // If we already have stored headers from a previous adaptation transaction
499 // related to the same master transction, they will be replaced.
500 Adaptation::History::Pointer ah = request->adaptLogHistory();
501 if (ah != nullptr) {
502 HttpHeader meta(hoReply);
503 OptionsExtractor extractor(meta);
504 theMaster->visitEachOption(extractor);
505 ah->recordMeta(&meta);
506 }
507
508 // Add just-created history to the adapted/cloned request that lacks it.
509 if (HttpRequest *adaptedReq = dynamic_cast<HttpRequest*>(adapted))
510 adaptedReq->adaptHistoryImport(*request);
511 }
512
513 void
514 Adaptation::Ecap::XactionRep::vbDiscard()
515 {
516 Must(makingVb == opUndecided);
517 // if adapter does not need vb, we do not need to send it
518 sinkVb("vbDiscard");
519 Must(makingVb == opNever);
520 }
521
522 void
523 Adaptation::Ecap::XactionRep::vbMake()
524 {
525 Must(makingVb == opUndecided);
526 BodyPipePointer &p = theVirginRep.raw().body_pipe;
527 Must(p != nullptr);
528 Must(p->setConsumerIfNotLate(this)); // to deliver vb, we must receive vb
529 makingVb = opOn;
530 }
531
532 void
533 Adaptation::Ecap::XactionRep::vbStopMaking()
534 {
535 Must(makingVb == opOn);
536 // if adapter does not need vb, we do not need to receive it
537 sinkVb("vbStopMaking");
538 Must(makingVb == opComplete);
539 }
540
541 void
542 Adaptation::Ecap::XactionRep::vbMakeMore()
543 {
544 Must(makingVb == opOn); // cannot make more if done proxying
545 // we cannot guarantee more vb, but we can check that there is a chance
546 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
547 Must(p != nullptr && p->stillConsuming(this)); // we are plugged in
548 Must(!p->productionEnded() && p->mayNeedMoreData()); // and may get more
549 }
550
551 libecap::Area
552 Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
553 {
554 // We may not be makingVb yet. It should be OK, but see vbContentShift().
555
556 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
557 Must(p != nullptr);
558
559 // TODO: make MemBuf use size_t?
560 const size_t haveSize = static_cast<size_t>(p->buf().contentSize());
561
562 // convert to Squid types; XXX: check for overflow
563 const uint64_t offset = static_cast<uint64_t>(o);
564 Must(offset <= haveSize); // equal iff at the end of content
565
566 // nsize means no size limit: all content starting from offset
567 const size_t size = s == libecap::nsize ?
568 haveSize - offset : static_cast<size_t>(s);
569
570 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
571 return libecap::Area::FromTempBuffer(p->buf().content() + offset,
572 min(static_cast<size_t>(haveSize - offset), size));
573 }
574
575 void
576 Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n)
577 {
578 // We may not be makingVb yet. It should be OK now, but if BodyPipe
579 // consume() requirements change, we would have to return empty vbContent
580 // until the adapter registers as a consumer
581
582 BodyPipePointer &p = theVirginRep.raw().body_pipe;
583 Must(p != nullptr);
584 const size_t size = static_cast<size_t>(n); // XXX: check for overflow
585 const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
586 p->consume(min(size, haveSize));
587 }
588
589 void
590 Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd)
591 {
592 Must(proxyingAb == opOn && !abProductionFinished);
593 abProductionFinished = true;
594 abProductionAtEnd = atEnd; // store until ready to stop producing ourselves
595 debugs(93,5, "adapted body production ended");
596 moveAbContent();
597 }
598
599 void
600 Adaptation::Ecap::XactionRep::noteAbContentAvailable()
601 {
602 Must(proxyingAb == opOn && !abProductionFinished);
603 moveAbContent();
604 }
605
606 #if 0 /* XXX: implement */
607 void
608 Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
609 {
610 Must(answer().body_pipe != NULL);
611 if (size.known())
612 answer().body_pipe->setBodySize(size.value());
613 // else the piped body size is unknown by default
614 }
615 #endif
616
617 void
618 Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
619 {
620 debugs(93,3, "adapter needs time: " <<
621 d.state << '/' << d.progress);
622 // XXX: set timeout?
623 }
624
625 void
626 Adaptation::Ecap::XactionRep::adaptationAborted()
627 {
628 tellQueryAborted(true); // should eCAP support retries?
629 mustStop("adaptationAborted");
630 }
631
632 void
633 Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe>)
634 {
635 Must(proxyingAb == opOn);
636 moveAbContent();
637 }
638
639 void
640 Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe>)
641 {
642 Must(proxyingAb == opOn);
643 stopProducingFor(answer().body_pipe, false);
644 Must(theMaster);
645 theMaster->abStopMaking();
646 proxyingAb = opComplete;
647 }
648
649 void
650 Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe>)
651 {
652 Must(makingVb == opOn); // or we would not be registered as a consumer
653 Must(theMaster);
654 theMaster->noteVbContentAvailable();
655 }
656
657 void
658 Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe>)
659 {
660 Must(makingVb == opOn); // or we would not be registered as a consumer
661 Must(theMaster);
662 theMaster->noteVbContentDone(true);
663 vbProductionFinished = true;
664 }
665
666 void
667 Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe>)
668 {
669 Must(makingVb == opOn); // or we would not be registered as a consumer
670 Must(theMaster);
671 theMaster->noteVbContentDone(false);
672 vbProductionFinished = true;
673 }
674
675 void
676 Adaptation::Ecap::XactionRep::noteInitiatorAborted()
677 {
678 mustStop("initiator aborted");
679 }
680
681 // get content from the adapter and put it into the adapted pipe
682 void
683 Adaptation::Ecap::XactionRep::moveAbContent()
684 {
685 Must(proxyingAb == opOn);
686 const libecap::Area c = theMaster->abContent(0, libecap::nsize);
687 debugs(93,5, "up to " << c.size << " bytes");
688 if (c.size == 0 && abProductionFinished) { // no ab now and in the future
689 stopProducingFor(answer().body_pipe, abProductionAtEnd);
690 proxyingAb = opComplete;
691 debugs(93,5, "last adapted body data retrieved");
692 } else if (c.size > 0) {
693 if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
694 theMaster->abContentShift(used);
695 }
696 }
697
698 const char *
699 Adaptation::Ecap::XactionRep::status() const
700 {
701 static MemBuf buf;
702 buf.reset();
703
704 buf.append(" [", 2);
705
706 if (makingVb)
707 buf.appendf("M%d", static_cast<int>(makingVb));
708
709 const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
710 if (!vp)
711 buf.append(" !V", 3);
712 else if (vp->stillConsuming(const_cast<XactionRep*>(this)))
713 buf.append(" Vc", 3);
714 else
715 buf.append(" V?", 3);
716
717 if (vbProductionFinished)
718 buf.append(".", 1);
719
720 buf.appendf(" A%d", static_cast<int>(proxyingAb));
721
722 if (proxyingAb == opOn) {
723 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
724 Must(rep);
725 const BodyPipePointer &ap = rep->raw().body_pipe;
726 if (!ap)
727 buf.append(" !A", 3);
728 else if (ap->stillProducing(const_cast<XactionRep*>(this)))
729 buf.append(" Ap", 3);
730 else
731 buf.append(" A?", 3);
732 }
733
734 buf.appendf(" %s%u]", id.prefix(), id.value);
735
736 buf.terminate();
737
738 return buf.content();
739 }
740
741 void
742 Adaptation::Ecap::XactionRep::updateSources(Http::Message *adapted)
743 {
744 adapted->sources |= service().cfg().connectionEncryption ? Http::Message::srcEcaps : Http::Message::srcEcap;
745
746 // Update masterXaction object for adapted HTTP requests.
747 if (HttpRequest *adaptedReq = dynamic_cast<HttpRequest*>(adapted)) {
748 HttpRequest *request = dynamic_cast<HttpRequest*> (theCauseRep ?
749 theCauseRep->raw().header : theVirginRep.raw().header);
750 Must(request);
751 adaptedReq->masterXaction = request->masterXaction;
752 }
753 }
754