]> git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/ecap/XactionRep.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / adaptation / ecap / XactionRep.cc
1 /*
2 * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 93 eCAP Interface */
10
11 #include "squid.h"
12 #include <libecap/common/area.h>
13 #include <libecap/common/delay.h>
14 #include <libecap/common/named_values.h>
15 #include <libecap/common/names.h>
16 #include <libecap/adapter/xaction.h>
17 #include "adaptation/Answer.h"
18 #include "adaptation/ecap/Config.h"
19 #include "adaptation/ecap/XactionRep.h"
20 #include "adaptation/Initiator.h"
21 #include "base/AsyncJobCalls.h"
22 #include "base/TextException.h"
23 #include "format/Format.h"
24 #include "HttpReply.h"
25 #include "HttpRequest.h"
26 #include "SquidTime.h"
27
28 CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep, XactionRep);
29
30 /// a libecap Visitor for converting adapter transaction options to HttpHeader
31 class OptionsExtractor: public libecap::NamedValueVisitor
32 {
33 public:
34 typedef libecap::Name Name;
35 typedef libecap::Area Area;
36
37 OptionsExtractor(HttpHeader &aMeta): meta(aMeta) {}
38
39 // libecap::NamedValueVisitor API
40 virtual void visit(const Name &name, const Area &value) {
41 meta.putExt(name.image().c_str(), value.toString().c_str());
42 }
43
44 HttpHeader &meta; ///< where to put extracted options
45 };
46
47 Adaptation::Ecap::XactionRep::XactionRep(
48 HttpMsg *virginHeader, HttpRequest *virginCause, AccessLogEntry::Pointer &alp,
49 const Adaptation::ServicePointer &aService):
50 AsyncJob("Adaptation::Ecap::XactionRep"),
51 Adaptation::Initiate("Adaptation::Ecap::XactionRep"),
52 theService(aService),
53 theVirginRep(virginHeader), theCauseRep(NULL),
54 makingVb(opUndecided), proxyingAb(opUndecided),
55 adaptHistoryId(-1),
56 vbProductionFinished(false),
57 abProductionFinished(false), abProductionAtEnd(false),
58 al(alp)
59 {
60 if (virginCause)
61 theCauseRep = new MessageRep(virginCause);
62 }
63
64 Adaptation::Ecap::XactionRep::~XactionRep()
65 {
66 assert(!theMaster);
67 delete theCauseRep;
68 theAnswerRep.reset();
69 }
70
71 void
72 Adaptation::Ecap::XactionRep::master(const AdapterXaction &x)
73 {
74 Must(!theMaster);
75 Must(x);
76 theMaster = x;
77 }
78
79 Adaptation::Service &
80 Adaptation::Ecap::XactionRep::service()
81 {
82 Must(theService != NULL);
83 return *theService;
84 }
85
86 const libecap::Area
87 Adaptation::Ecap::XactionRep::option(const libecap::Name &name) const
88 {
89 if (name == libecap::metaClientIp)
90 return clientIpValue();
91 if (name == libecap::metaUserName)
92 return usernameValue();
93 if (Adaptation::Config::masterx_shared_name && name == Adaptation::Config::masterx_shared_name)
94 return masterxSharedValue(name);
95
96 // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
97
98 // If the name is unknown, metaValue returns an emtpy area
99 return metaValue(name);
100 }
101
102 void
103 Adaptation::Ecap::XactionRep::visitEachOption(libecap::NamedValueVisitor &visitor) const
104 {
105 if (const libecap::Area value = clientIpValue())
106 visitor.visit(libecap::metaClientIp, value);
107 if (const libecap::Area value = usernameValue())
108 visitor.visit(libecap::metaUserName, value);
109
110 if (Adaptation::Config::masterx_shared_name) {
111 const libecap::Name name(Adaptation::Config::masterx_shared_name);
112 if (const libecap::Area value = masterxSharedValue(name))
113 visitor.visit(name, value);
114 }
115
116 visitEachMetaHeader(visitor);
117
118 // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
119 }
120
121 const libecap::Area
122 Adaptation::Ecap::XactionRep::clientIpValue() const
123 {
124 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
125 theCauseRep->raw().header : theVirginRep.raw().header);
126 Must(request);
127 // TODO: move this logic into HttpRequest::clientIp(bool) and
128 // HttpRequest::clientIpString(bool) and reuse everywhere
129 if (TheConfig.send_client_ip && request) {
130 Ip::Address client_addr;
131 #if FOLLOW_X_FORWARDED_FOR
132 if (TheConfig.use_indirect_client) {
133 client_addr = request->indirect_client_addr;
134 } else
135 #endif
136 client_addr = request->client_addr;
137 if (!client_addr.isAnyAddr() && !client_addr.isNoAddr()) {
138 char ntoabuf[MAX_IPSTRLEN] = "";
139 client_addr.toStr(ntoabuf,MAX_IPSTRLEN);
140 return libecap::Area::FromTempBuffer(ntoabuf, strlen(ntoabuf));
141 }
142 }
143 return libecap::Area();
144 }
145
146 const libecap::Area
147 Adaptation::Ecap::XactionRep::usernameValue() const
148 {
149 #if USE_AUTH
150 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
151 theCauseRep->raw().header : theVirginRep.raw().header);
152 Must(request);
153 if (request->auth_user_request != NULL) {
154 if (char const *name = request->auth_user_request->username())
155 return libecap::Area::FromTempBuffer(name, strlen(name));
156 else if (request->extacl_user.size() > 0)
157 return libecap::Area::FromTempBuffer(request->extacl_user.rawBuf(),
158 request->extacl_user.size());
159 }
160 #endif
161 return libecap::Area();
162 }
163
164 const libecap::Area
165 Adaptation::Ecap::XactionRep::masterxSharedValue(const libecap::Name &name) const
166 {
167 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
168 theCauseRep->raw().header : theVirginRep.raw().header);
169 Must(request);
170 if (name.known()) { // must check to avoid empty names matching unset cfg
171 Adaptation::History::Pointer ah = request->adaptHistory(false);
172 if (ah != NULL) {
173 String name, value;
174 if (ah->getXxRecord(name, value))
175 return libecap::Area::FromTempBuffer(value.rawBuf(), value.size());
176 }
177 }
178 return libecap::Area();
179 }
180
181 const libecap::Area
182 Adaptation::Ecap::XactionRep::metaValue(const libecap::Name &name) const
183 {
184 HttpRequest *request = dynamic_cast<HttpRequest*>(theCauseRep ?
185 theCauseRep->raw().header : theVirginRep.raw().header);
186 Must(request);
187 HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
188
189 if (name.known()) { // must check to avoid empty names matching unset cfg
190 typedef Notes::iterator ACAMLI;
191 for (ACAMLI i = Adaptation::Config::metaHeaders.begin(); i != Adaptation::Config::metaHeaders.end(); ++i) {
192 if (name == (*i)->key.termedBuf()) {
193 if (const char *value = (*i)->match(request, reply, al))
194 return libecap::Area::FromTempString(value);
195 else
196 return libecap::Area();
197 }
198 }
199 }
200
201 return libecap::Area();
202 }
203
204 void
205 Adaptation::Ecap::XactionRep::visitEachMetaHeader(libecap::NamedValueVisitor &visitor) const
206 {
207 HttpRequest *request = dynamic_cast<HttpRequest*>(theCauseRep ?
208 theCauseRep->raw().header : theVirginRep.raw().header);
209 Must(request);
210 HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
211
212 typedef Notes::iterator ACAMLI;
213 for (ACAMLI i = Adaptation::Config::metaHeaders.begin(); i != Adaptation::Config::metaHeaders.end(); ++i) {
214 const char *v = (*i)->match(request, reply, al);
215 if (v) {
216 const libecap::Name name((*i)->key.termedBuf());
217 const libecap::Area value = libecap::Area::FromTempString(v);
218 visitor.visit(name, value);
219 }
220 }
221 }
222
223 void
224 Adaptation::Ecap::XactionRep::start()
225 {
226 Must(theMaster);
227
228 if (!theVirginRep.raw().body_pipe)
229 makingVb = opNever; // there is nothing to deliver
230
231 HttpRequest *request = dynamic_cast<HttpRequest*> (theCauseRep ?
232 theCauseRep->raw().header : theVirginRep.raw().header);
233 Must(request);
234
235 HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
236
237 Adaptation::History::Pointer ah = request->adaptLogHistory();
238 if (ah != NULL) {
239 // retrying=false because ecap never retries transactions
240 adaptHistoryId = ah->recordXactStart(service().cfg().key, current_time, false);
241 typedef Notes::iterator ACAMLI;
242 for (ACAMLI i = Adaptation::Config::metaHeaders.begin(); i != Adaptation::Config::metaHeaders.end(); ++i) {
243 const char *v = (*i)->match(request, reply, al);
244 if (v) {
245 if (ah->metaHeaders == NULL)
246 ah->metaHeaders = new NotePairs();
247 if (!ah->metaHeaders->hasPair((*i)->key.termedBuf(), v))
248 ah->metaHeaders->add((*i)->key.termedBuf(), v);
249 }
250 }
251 }
252
253 theMaster->start();
254 }
255
256 void
257 Adaptation::Ecap::XactionRep::swanSong()
258 {
259 // clear body_pipes, if any
260 // this code does not maintain proxying* and canAccessVb states; should it?
261
262 if (theAnswerRep) {
263 BodyPipe::Pointer body_pipe = answer().body_pipe;
264 if (body_pipe != NULL) {
265 Must(body_pipe->stillProducing(this));
266 stopProducingFor(body_pipe, false);
267 }
268 }
269
270 BodyPipe::Pointer &body_pipe = theVirginRep.raw().body_pipe;
271 if (body_pipe != NULL && body_pipe->stillConsuming(this))
272 stopConsumingFrom(body_pipe);
273
274 terminateMaster();
275
276 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
277 theCauseRep->raw().header : theVirginRep.raw().header);
278 Must(request);
279 Adaptation::History::Pointer ah = request->adaptLogHistory();
280 if (ah != NULL && adaptHistoryId >= 0)
281 ah->recordXactFinish(adaptHistoryId);
282
283 Adaptation::Initiate::swanSong();
284 }
285
286 void
287 Adaptation::Ecap::XactionRep::resume()
288 {
289 // go async to gain exception protection and done()-based job destruction
290 typedef NullaryMemFunT<Adaptation::Ecap::XactionRep> Dialer;
291 AsyncCall::Pointer call = asyncCall(93, 5, "Adaptation::Ecap::XactionRep::doResume",
292 Dialer(this, &Adaptation::Ecap::XactionRep::doResume));
293 ScheduleCallHere(call);
294 }
295
296 /// the guts of libecap::host::Xaction::resume() API implementation
297 /// which just goes async in Adaptation::Ecap::XactionRep::resume().
298 void
299 Adaptation::Ecap::XactionRep::doResume()
300 {
301 Must(theMaster);
302 theMaster->resume();
303 }
304
305 libecap::Message &
306 Adaptation::Ecap::XactionRep::virgin()
307 {
308 return theVirginRep;
309 }
310
311 const libecap::Message &
312 Adaptation::Ecap::XactionRep::cause()
313 {
314 Must(theCauseRep != NULL);
315 return *theCauseRep;
316 }
317
318 libecap::Message &
319 Adaptation::Ecap::XactionRep::adapted()
320 {
321 Must(theAnswerRep);
322 return *theAnswerRep;
323 }
324
325 Adaptation::Message &
326 Adaptation::Ecap::XactionRep::answer()
327 {
328 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
329 Must(rep);
330 return rep->raw();
331 }
332
333 void
334 Adaptation::Ecap::XactionRep::terminateMaster()
335 {
336 if (theMaster) {
337 AdapterXaction x = theMaster;
338 theMaster.reset();
339 x->stop();
340 }
341 }
342
343 bool
344 Adaptation::Ecap::XactionRep::doneAll() const
345 {
346 return makingVb >= opComplete && proxyingAb >= opComplete &&
347 Adaptation::Initiate::doneAll();
348 }
349
350 // stops receiving virgin and enables auto-consumption, dropping any vb bytes
351 void
352 Adaptation::Ecap::XactionRep::sinkVb(const char *reason)
353 {
354 debugs(93,4, HERE << "sink for " << reason << "; status:" << status());
355
356 // we reset raw().body_pipe when we are done, so use this one for checking
357 const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
358 if (permPipe != NULL)
359 permPipe->enableAutoConsumption();
360
361 forgetVb(reason);
362 }
363
364 // stops receiving virgin but preserves it for others to use
365 void
366 Adaptation::Ecap::XactionRep::preserveVb(const char *reason)
367 {
368 debugs(93,4, HERE << "preserve for " << reason << "; status:" << status());
369
370 // we reset raw().body_pipe when we are done, so use this one for checking
371 const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
372 if (permPipe != NULL) {
373 // if libecap consumed, we cannot preserve
374 Must(!permPipe->consumedSize());
375 }
376
377 forgetVb(reason);
378 }
379
380 // disassociates us from vb; the last step of sinking or preserving vb
381 void
382 Adaptation::Ecap::XactionRep::forgetVb(const char *reason)
383 {
384 debugs(93,9, HERE << "forget vb " << reason << "; status:" << status());
385
386 BodyPipePointer &p = theVirginRep.raw().body_pipe;
387 if (p != NULL && p->stillConsuming(this))
388 stopConsumingFrom(p);
389
390 if (makingVb == opUndecided)
391 makingVb = opNever;
392 else if (makingVb == opOn)
393 makingVb = opComplete;
394 }
395
396 void
397 Adaptation::Ecap::XactionRep::useVirgin()
398 {
399 debugs(93,3, HERE << status());
400 Must(proxyingAb == opUndecided);
401 proxyingAb = opNever;
402
403 preserveVb("useVirgin");
404
405 HttpMsg *clone = theVirginRep.raw().header->clone();
406 // check that clone() copies the pipe so that we do not have to
407 Must(!theVirginRep.raw().header->body_pipe == !clone->body_pipe);
408
409 updateHistory(clone);
410 sendAnswer(Answer::Forward(clone));
411 Must(done());
412 }
413
414 void
415 Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
416 {
417 debugs(93,3, HERE << status());
418 Must(m);
419 theAnswerRep = m;
420 Must(proxyingAb == opUndecided);
421
422 HttpMsg *msg = answer().header;
423 updateSources(msg);
424 if (!theAnswerRep->body()) { // final, bodyless answer
425 proxyingAb = opNever;
426 updateHistory(msg);
427 sendAnswer(Answer::Forward(msg));
428 } else { // got answer headers but need to handle body
429 proxyingAb = opOn;
430 Must(!msg->body_pipe); // only host can set body pipes
431 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
432 Must(rep);
433 rep->tieBody(this); // sets us as a producer
434 Must(msg->body_pipe != NULL); // check tieBody
435
436 updateHistory(msg);
437 sendAnswer(Answer::Forward(msg));
438
439 debugs(93,4, HERE << "adapter will produce body" << status());
440 theMaster->abMake(); // libecap will produce
441 }
442 }
443
444 void
445 Adaptation::Ecap::XactionRep::blockVirgin()
446 {
447 debugs(93,3, HERE << status());
448 Must(proxyingAb == opUndecided);
449 proxyingAb = opNever;
450
451 sinkVb("blockVirgin");
452
453 updateHistory(NULL);
454 sendAnswer(Answer::Block(service().cfg().key));
455 Must(done());
456 }
457
458 /// Called just before sendAnswer() to record adapter meta-information
459 /// which may affect answer processing and may be needed for logging.
460 void
461 Adaptation::Ecap::XactionRep::updateHistory(HttpMsg *adapted)
462 {
463 if (!theMaster) // all updates rely on being able to query the adapter
464 return;
465
466 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
467 theCauseRep->raw().header : theVirginRep.raw().header);
468 Must(request);
469
470 // TODO: move common ICAP/eCAP logic to Adaptation::Xaction or similar
471 // TODO: optimize Area-to-String conversion
472
473 // update the cross-transactional database if needed
474 if (const char *xxNameStr = Adaptation::Config::masterx_shared_name) {
475 Adaptation::History::Pointer ah = request->adaptHistory(true);
476 if (ah != NULL) {
477 libecap::Name xxName(xxNameStr); // TODO: optimize?
478 if (const libecap::Area val = theMaster->option(xxName))
479 ah->updateXxRecord(xxNameStr, val.toString().c_str());
480 }
481 }
482
483 // update the adaptation plan if needed
484 if (service().cfg().routing) {
485 String services;
486 if (const libecap::Area services = theMaster->option(libecap::metaNextServices)) {
487 Adaptation::History::Pointer ah = request->adaptHistory(true);
488 if (ah != NULL)
489 ah->updateNextServices(services.toString().c_str());
490 }
491 } // TODO: else warn (occasionally!) if we got libecap::metaNextServices
492
493 // Store received meta headers for adapt::<last_h logformat code use.
494 // If we already have stored headers from a previous adaptation transaction
495 // related to the same master transction, they will be replaced.
496 Adaptation::History::Pointer ah = request->adaptLogHistory();
497 if (ah != NULL) {
498 HttpHeader meta(hoReply);
499 OptionsExtractor extractor(meta);
500 theMaster->visitEachOption(extractor);
501 ah->recordMeta(&meta);
502 }
503
504 // Add just-created history to the adapted/cloned request that lacks it.
505 if (HttpRequest *adaptedReq = dynamic_cast<HttpRequest*>(adapted))
506 adaptedReq->adaptHistoryImport(*request);
507 }
508
509 void
510 Adaptation::Ecap::XactionRep::vbDiscard()
511 {
512 Must(makingVb == opUndecided);
513 // if adapter does not need vb, we do not need to send it
514 sinkVb("vbDiscard");
515 Must(makingVb == opNever);
516 }
517
518 void
519 Adaptation::Ecap::XactionRep::vbMake()
520 {
521 Must(makingVb == opUndecided);
522 BodyPipePointer &p = theVirginRep.raw().body_pipe;
523 Must(p != NULL);
524 Must(p->setConsumerIfNotLate(this)); // to deliver vb, we must receive vb
525 makingVb = opOn;
526 }
527
528 void
529 Adaptation::Ecap::XactionRep::vbStopMaking()
530 {
531 Must(makingVb == opOn);
532 // if adapter does not need vb, we do not need to receive it
533 sinkVb("vbStopMaking");
534 Must(makingVb == opComplete);
535 }
536
537 void
538 Adaptation::Ecap::XactionRep::vbMakeMore()
539 {
540 Must(makingVb == opOn); // cannot make more if done proxying
541 // we cannot guarantee more vb, but we can check that there is a chance
542 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
543 Must(p != NULL && p->stillConsuming(this)); // we are plugged in
544 Must(!p->productionEnded() && p->mayNeedMoreData()); // and may get more
545 }
546
547 libecap::Area
548 Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
549 {
550 // We may not be makingVb yet. It should be OK, but see vbContentShift().
551
552 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
553 Must(p != NULL);
554
555 // TODO: make MemBuf use size_t?
556 const size_t haveSize = static_cast<size_t>(p->buf().contentSize());
557
558 // convert to Squid types; XXX: check for overflow
559 const uint64_t offset = static_cast<uint64_t>(o);
560 Must(offset <= haveSize); // equal iff at the end of content
561
562 // nsize means no size limit: all content starting from offset
563 const size_t size = s == libecap::nsize ?
564 haveSize - offset : static_cast<size_t>(s);
565
566 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
567 return libecap::Area::FromTempBuffer(p->buf().content() + offset,
568 min(static_cast<size_t>(haveSize - offset), size));
569 }
570
571 void
572 Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n)
573 {
574 // We may not be makingVb yet. It should be OK now, but if BodyPipe
575 // consume() requirements change, we would have to return empty vbContent
576 // until the adapter registers as a consumer
577
578 BodyPipePointer &p = theVirginRep.raw().body_pipe;
579 Must(p != NULL);
580 const size_t size = static_cast<size_t>(n); // XXX: check for overflow
581 const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
582 p->consume(min(size, haveSize));
583 }
584
585 void
586 Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd)
587 {
588 Must(proxyingAb == opOn && !abProductionFinished);
589 abProductionFinished = true;
590 abProductionAtEnd = atEnd; // store until ready to stop producing ourselves
591 debugs(93,5, HERE << "adapted body production ended");
592 moveAbContent();
593 }
594
595 void
596 Adaptation::Ecap::XactionRep::noteAbContentAvailable()
597 {
598 Must(proxyingAb == opOn && !abProductionFinished);
599 moveAbContent();
600 }
601
602 #if 0 /* XXX: implement */
603 void
604 Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
605 {
606 Must(answer().body_pipe != NULL);
607 if (size.known())
608 answer().body_pipe->setBodySize(size.value());
609 // else the piped body size is unknown by default
610 }
611 #endif
612
613 void
614 Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
615 {
616 debugs(93,3, HERE << "adapter needs time: " <<
617 d.state << '/' << d.progress);
618 // XXX: set timeout?
619 }
620
621 void
622 Adaptation::Ecap::XactionRep::adaptationAborted()
623 {
624 tellQueryAborted(true); // should eCAP support retries?
625 mustStop("adaptationAborted");
626 }
627
628 void
629 Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
630 {
631 Must(proxyingAb == opOn);
632 moveAbContent();
633 }
634
635 void
636 Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
637 {
638 Must(proxyingAb == opOn);
639 stopProducingFor(answer().body_pipe, false);
640 Must(theMaster);
641 theMaster->abStopMaking();
642 proxyingAb = opComplete;
643 }
644
645 void
646 Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
647 {
648 Must(makingVb == opOn); // or we would not be registered as a consumer
649 Must(theMaster);
650 theMaster->noteVbContentAvailable();
651 }
652
653 void
654 Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
655 {
656 Must(makingVb == opOn); // or we would not be registered as a consumer
657 Must(theMaster);
658 theMaster->noteVbContentDone(true);
659 vbProductionFinished = true;
660 }
661
662 void
663 Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
664 {
665 Must(makingVb == opOn); // or we would not be registered as a consumer
666 Must(theMaster);
667 theMaster->noteVbContentDone(false);
668 vbProductionFinished = true;
669 }
670
671 void
672 Adaptation::Ecap::XactionRep::noteInitiatorAborted()
673 {
674 mustStop("initiator aborted");
675 }
676
677 // get content from the adapter and put it into the adapted pipe
678 void
679 Adaptation::Ecap::XactionRep::moveAbContent()
680 {
681 Must(proxyingAb == opOn);
682 const libecap::Area c = theMaster->abContent(0, libecap::nsize);
683 debugs(93,5, HERE << "up to " << c.size << " bytes");
684 if (c.size == 0 && abProductionFinished) { // no ab now and in the future
685 stopProducingFor(answer().body_pipe, abProductionAtEnd);
686 proxyingAb = opComplete;
687 debugs(93,5, HERE << "last adapted body data retrieved");
688 } else if (c.size > 0) {
689 if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
690 theMaster->abContentShift(used);
691 }
692 }
693
694 const char *
695 Adaptation::Ecap::XactionRep::status() const
696 {
697 static MemBuf buf;
698 buf.reset();
699
700 buf.append(" [", 2);
701
702 if (makingVb)
703 buf.appendf("M%d", static_cast<int>(makingVb));
704
705 const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
706 if (!vp)
707 buf.append(" !V", 3);
708 else if (vp->stillConsuming(const_cast<XactionRep*>(this)))
709 buf.append(" Vc", 3);
710 else
711 buf.append(" V?", 3);
712
713 if (vbProductionFinished)
714 buf.append(".", 1);
715
716 buf.appendf(" A%d", static_cast<int>(proxyingAb));
717
718 if (proxyingAb == opOn) {
719 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
720 Must(rep);
721 const BodyPipePointer &ap = rep->raw().body_pipe;
722 if (!ap)
723 buf.append(" !A", 3);
724 else if (ap->stillProducing(const_cast<XactionRep*>(this)))
725 buf.append(" Ap", 3);
726 else
727 buf.append(" A?", 3);
728 }
729
730 buf.appendf(" %s%u]", id.prefix(), id.value);
731
732 buf.terminate();
733
734 return buf.content();
735 }
736
737 void
738 Adaptation::Ecap::XactionRep::updateSources(HttpMsg *adapted)
739 {
740 adapted->sources |= service().cfg().connectionEncryption ? HttpMsg::srcEcaps : HttpMsg::srcEcap;
741 }
742