]> git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/ecap/XactionRep.cc
Merged from trunk 13172.
[thirdparty/squid.git] / src / adaptation / ecap / XactionRep.cc
1 /*
2 * DEBUG: section 93 eCAP Interface
3 */
4 #include "squid.h"
5 #include <libecap/common/area.h>
6 #include <libecap/common/delay.h>
7 #include <libecap/common/named_values.h>
8 #include <libecap/common/names.h>
9 #include <libecap/adapter/xaction.h>
10 #include "adaptation/Answer.h"
11 #include "adaptation/ecap/Config.h"
12 #include "adaptation/ecap/XactionRep.h"
13 #include "adaptation/Initiator.h"
14 #include "base/AsyncJobCalls.h"
15 #include "base/TextException.h"
16 #include "format/Format.h"
17 #include "HttpReply.h"
18 #include "HttpRequest.h"
19 #include "SquidTime.h"
20
21 CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep, XactionRep);
22
23 /// a libecap Visitor for converting adapter transaction options to HttpHeader
24 class OptionsExtractor: public libecap::NamedValueVisitor
25 {
26 public:
27 typedef libecap::Name Name;
28 typedef libecap::Area Area;
29
30 OptionsExtractor(HttpHeader &aMeta): meta(aMeta) {}
31
32 // libecap::NamedValueVisitor API
33 virtual void visit(const Name &name, const Area &value) {
34 meta.putExt(name.image().c_str(), value.toString().c_str());
35 }
36
37 HttpHeader &meta; ///< where to put extracted options
38 };
39
40 Adaptation::Ecap::XactionRep::XactionRep(
41 HttpMsg *virginHeader, HttpRequest *virginCause, AccessLogEntry::Pointer &alp,
42 const Adaptation::ServicePointer &aService):
43 AsyncJob("Adaptation::Ecap::XactionRep"),
44 Adaptation::Initiate("Adaptation::Ecap::XactionRep"),
45 theService(aService),
46 theVirginRep(virginHeader), theCauseRep(NULL),
47 makingVb(opUndecided), proxyingAb(opUndecided),
48 adaptHistoryId(-1),
49 vbProductionFinished(false),
50 abProductionFinished(false), abProductionAtEnd(false),
51 al(alp)
52 {
53 if (virginCause)
54 theCauseRep = new MessageRep(virginCause);
55 }
56
57 Adaptation::Ecap::XactionRep::~XactionRep()
58 {
59 assert(!theMaster);
60 delete theCauseRep;
61 theAnswerRep.reset();
62 }
63
64 void
65 Adaptation::Ecap::XactionRep::master(const AdapterXaction &x)
66 {
67 Must(!theMaster);
68 Must(x != NULL);
69 theMaster = x;
70 }
71
72 Adaptation::Service &
73 Adaptation::Ecap::XactionRep::service()
74 {
75 Must(theService != NULL);
76 return *theService;
77 }
78
79 const libecap::Area
80 Adaptation::Ecap::XactionRep::option(const libecap::Name &name) const
81 {
82 if (name == libecap::metaClientIp)
83 return clientIpValue();
84 if (name == libecap::metaUserName)
85 return usernameValue();
86 if (Adaptation::Config::masterx_shared_name && name == Adaptation::Config::masterx_shared_name)
87 return masterxSharedValue(name);
88
89 // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
90
91 // If the name is unknown, metaValue returns an emtpy area
92 return metaValue(name);
93 }
94
95 void
96 Adaptation::Ecap::XactionRep::visitEachOption(libecap::NamedValueVisitor &visitor) const
97 {
98 if (const libecap::Area value = clientIpValue())
99 visitor.visit(libecap::metaClientIp, value);
100 if (const libecap::Area value = usernameValue())
101 visitor.visit(libecap::metaUserName, value);
102
103 if (Adaptation::Config::masterx_shared_name) {
104 const libecap::Name name(Adaptation::Config::masterx_shared_name);
105 if (const libecap::Area value = masterxSharedValue(name))
106 visitor.visit(name, value);
107 }
108
109 visitEachMetaHeader(visitor);
110
111 // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
112 }
113
114 const libecap::Area
115 Adaptation::Ecap::XactionRep::clientIpValue() const
116 {
117 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
118 theCauseRep->raw().header : theVirginRep.raw().header);
119 Must(request);
120 // TODO: move this logic into HttpRequest::clientIp(bool) and
121 // HttpRequest::clientIpString(bool) and reuse everywhere
122 if (TheConfig.send_client_ip && request) {
123 Ip::Address client_addr;
124 #if FOLLOW_X_FORWARDED_FOR
125 if (TheConfig.use_indirect_client) {
126 client_addr = request->indirect_client_addr;
127 } else
128 #endif
129 client_addr = request->client_addr;
130 if (!client_addr.isAnyAddr() && !client_addr.isNoAddr()) {
131 char ntoabuf[MAX_IPSTRLEN] = "";
132 client_addr.toStr(ntoabuf,MAX_IPSTRLEN);
133 return libecap::Area::FromTempBuffer(ntoabuf, strlen(ntoabuf));
134 }
135 }
136 return libecap::Area();
137 }
138
139 const libecap::Area
140 Adaptation::Ecap::XactionRep::usernameValue() const
141 {
142 #if USE_AUTH
143 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
144 theCauseRep->raw().header : theVirginRep.raw().header);
145 Must(request);
146 if (request->auth_user_request != NULL) {
147 if (char const *name = request->auth_user_request->username())
148 return libecap::Area::FromTempBuffer(name, strlen(name));
149 else if (request->extacl_user.size() > 0)
150 return libecap::Area::FromTempBuffer(request->extacl_user.rawBuf(),
151 request->extacl_user.size());
152 }
153 #endif
154 return libecap::Area();
155 }
156
157 const libecap::Area
158 Adaptation::Ecap::XactionRep::masterxSharedValue(const libecap::Name &name) const
159 {
160 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
161 theCauseRep->raw().header : theVirginRep.raw().header);
162 Must(request);
163 if (name.known()) { // must check to avoid empty names matching unset cfg
164 Adaptation::History::Pointer ah = request->adaptHistory(false);
165 if (ah != NULL) {
166 String name, value;
167 if (ah->getXxRecord(name, value))
168 return libecap::Area::FromTempBuffer(value.rawBuf(), value.size());
169 }
170 }
171 return libecap::Area();
172 }
173
174 const libecap::Area
175 Adaptation::Ecap::XactionRep::metaValue(const libecap::Name &name) const
176 {
177 HttpRequest *request = dynamic_cast<HttpRequest*>(theCauseRep ?
178 theCauseRep->raw().header : theVirginRep.raw().header);
179 Must(request);
180 HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
181
182 if (name.known()) { // must check to avoid empty names matching unset cfg
183 typedef Notes::iterator ACAMLI;
184 for (ACAMLI i = Adaptation::Config::metaHeaders.begin(); i != Adaptation::Config::metaHeaders.end(); ++i) {
185 if (name == (*i)->key.termedBuf()) {
186 if (const char *value = (*i)->match(request, reply, al))
187 return libecap::Area::FromTempString(value);
188 else
189 return libecap::Area();
190 }
191 }
192 }
193
194 return libecap::Area();
195 }
196
197 void
198 Adaptation::Ecap::XactionRep::visitEachMetaHeader(libecap::NamedValueVisitor &visitor) const
199 {
200 HttpRequest *request = dynamic_cast<HttpRequest*>(theCauseRep ?
201 theCauseRep->raw().header : theVirginRep.raw().header);
202 Must(request);
203 HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
204
205 typedef Notes::iterator ACAMLI;
206 for (ACAMLI i = Adaptation::Config::metaHeaders.begin(); i != Adaptation::Config::metaHeaders.end(); ++i) {
207 const char *v = (*i)->match(request, reply, al);
208 if (v) {
209 const libecap::Name name((*i)->key.termedBuf());
210 const libecap::Area value = libecap::Area::FromTempString(v);
211 visitor.visit(name, value);
212 }
213 }
214 }
215
216 void
217 Adaptation::Ecap::XactionRep::start()
218 {
219 Must(theMaster);
220
221 if (!theVirginRep.raw().body_pipe)
222 makingVb = opNever; // there is nothing to deliver
223
224 HttpRequest *request = dynamic_cast<HttpRequest*> (theCauseRep ?
225 theCauseRep->raw().header : theVirginRep.raw().header);
226 Must(request);
227
228 HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
229
230 Adaptation::History::Pointer ah = request->adaptLogHistory();
231 if (ah != NULL) {
232 // retrying=false because ecap never retries transactions
233 adaptHistoryId = ah->recordXactStart(service().cfg().key, current_time, false);
234 typedef Notes::iterator ACAMLI;
235 for (ACAMLI i = Adaptation::Config::metaHeaders.begin(); i != Adaptation::Config::metaHeaders.end(); ++i) {
236 const char *v = (*i)->match(request, reply, al);
237 if (v) {
238 if (ah->metaHeaders == NULL)
239 ah->metaHeaders = new NotePairs();
240 if (!ah->metaHeaders->hasPair((*i)->key.termedBuf(), v))
241 ah->metaHeaders->add((*i)->key.termedBuf(), v);
242 }
243 }
244 }
245
246 theMaster->start();
247 }
248
249 void
250 Adaptation::Ecap::XactionRep::swanSong()
251 {
252 // clear body_pipes, if any
253 // this code does not maintain proxying* and canAccessVb states; should it?
254
255 if (theAnswerRep != NULL) {
256 BodyPipe::Pointer body_pipe = answer().body_pipe;
257 if (body_pipe != NULL) {
258 Must(body_pipe->stillProducing(this));
259 stopProducingFor(body_pipe, false);
260 }
261 }
262
263 BodyPipe::Pointer &body_pipe = theVirginRep.raw().body_pipe;
264 if (body_pipe != NULL && body_pipe->stillConsuming(this))
265 stopConsumingFrom(body_pipe);
266
267 terminateMaster();
268
269 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
270 theCauseRep->raw().header : theVirginRep.raw().header);
271 Must(request);
272 Adaptation::History::Pointer ah = request->adaptLogHistory();
273 if (ah != NULL && adaptHistoryId >= 0)
274 ah->recordXactFinish(adaptHistoryId);
275
276 Adaptation::Initiate::swanSong();
277 }
278
279 void
280 Adaptation::Ecap::XactionRep::resume()
281 {
282 // go async to gain exception protection and done()-based job destruction
283 typedef NullaryMemFunT<Adaptation::Ecap::XactionRep> Dialer;
284 AsyncCall::Pointer call = asyncCall(93, 5, "Adaptation::Ecap::XactionRep::doResume",
285 Dialer(this, &Adaptation::Ecap::XactionRep::doResume));
286 ScheduleCallHere(call);
287 }
288
289 /// the guts of libecap::host::Xaction::resume() API implementation
290 /// which just goes async in Adaptation::Ecap::XactionRep::resume().
291 void
292 Adaptation::Ecap::XactionRep::doResume()
293 {
294 Must(theMaster);
295 theMaster->resume();
296 }
297
298 libecap::Message &
299 Adaptation::Ecap::XactionRep::virgin()
300 {
301 return theVirginRep;
302 }
303
304 const libecap::Message &
305 Adaptation::Ecap::XactionRep::cause()
306 {
307 Must(theCauseRep != NULL);
308 return *theCauseRep;
309 }
310
311 libecap::Message &
312 Adaptation::Ecap::XactionRep::adapted()
313 {
314 Must(theAnswerRep != NULL);
315 return *theAnswerRep;
316 }
317
318 Adaptation::Message &
319 Adaptation::Ecap::XactionRep::answer()
320 {
321 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
322 Must(rep);
323 return rep->raw();
324 }
325
326 void
327 Adaptation::Ecap::XactionRep::terminateMaster()
328 {
329 if (theMaster) {
330 AdapterXaction x = theMaster;
331 theMaster.reset();
332 x->stop();
333 }
334 }
335
336 bool
337 Adaptation::Ecap::XactionRep::doneAll() const
338 {
339 return makingVb >= opComplete && proxyingAb >= opComplete &&
340 Adaptation::Initiate::doneAll();
341 }
342
343 // stops receiving virgin and enables auto-consumption, dropping any vb bytes
344 void
345 Adaptation::Ecap::XactionRep::sinkVb(const char *reason)
346 {
347 debugs(93,4, HERE << "sink for " << reason << "; status:" << status());
348
349 // we reset raw().body_pipe when we are done, so use this one for checking
350 const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
351 if (permPipe != NULL)
352 permPipe->enableAutoConsumption();
353
354 forgetVb(reason);
355 }
356
357 // stops receiving virgin but preserves it for others to use
358 void
359 Adaptation::Ecap::XactionRep::preserveVb(const char *reason)
360 {
361 debugs(93,4, HERE << "preserve for " << reason << "; status:" << status());
362
363 // we reset raw().body_pipe when we are done, so use this one for checking
364 const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
365 if (permPipe != NULL) {
366 // if libecap consumed, we cannot preserve
367 Must(!permPipe->consumedSize());
368 }
369
370 forgetVb(reason);
371 }
372
373 // disassociates us from vb; the last step of sinking or preserving vb
374 void
375 Adaptation::Ecap::XactionRep::forgetVb(const char *reason)
376 {
377 debugs(93,9, HERE << "forget vb " << reason << "; status:" << status());
378
379 BodyPipePointer &p = theVirginRep.raw().body_pipe;
380 if (p != NULL && p->stillConsuming(this))
381 stopConsumingFrom(p);
382
383 if (makingVb == opUndecided)
384 makingVb = opNever;
385 else if (makingVb == opOn)
386 makingVb = opComplete;
387 }
388
389 void
390 Adaptation::Ecap::XactionRep::useVirgin()
391 {
392 debugs(93,3, HERE << status());
393 Must(proxyingAb == opUndecided);
394 proxyingAb = opNever;
395
396 preserveVb("useVirgin");
397
398 HttpMsg *clone = theVirginRep.raw().header->clone();
399 // check that clone() copies the pipe so that we do not have to
400 Must(!theVirginRep.raw().header->body_pipe == !clone->body_pipe);
401
402 updateHistory(clone);
403 sendAnswer(Answer::Forward(clone));
404 Must(done());
405 }
406
407 void
408 Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
409 {
410 debugs(93,3, HERE << status());
411 Must(m);
412 theAnswerRep = m;
413 Must(proxyingAb == opUndecided);
414
415 HttpMsg *msg = answer().header;
416 if (!theAnswerRep->body()) { // final, bodyless answer
417 proxyingAb = opNever;
418 updateHistory(msg);
419 sendAnswer(Answer::Forward(msg));
420 } else { // got answer headers but need to handle body
421 proxyingAb = opOn;
422 Must(!msg->body_pipe); // only host can set body pipes
423 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
424 Must(rep);
425 rep->tieBody(this); // sets us as a producer
426 Must(msg->body_pipe != NULL); // check tieBody
427
428 updateHistory(msg);
429 sendAnswer(Answer::Forward(msg));
430
431 debugs(93,4, HERE << "adapter will produce body" << status());
432 theMaster->abMake(); // libecap will produce
433 }
434 }
435
436 void
437 Adaptation::Ecap::XactionRep::blockVirgin()
438 {
439 debugs(93,3, HERE << status());
440 Must(proxyingAb == opUndecided);
441 proxyingAb = opNever;
442
443 sinkVb("blockVirgin");
444
445 updateHistory(NULL);
446 sendAnswer(Answer::Block(service().cfg().key));
447 Must(done());
448 }
449
450 /// Called just before sendAnswer() to record adapter meta-information
451 /// which may affect answer processing and may be needed for logging.
452 void
453 Adaptation::Ecap::XactionRep::updateHistory(HttpMsg *adapted)
454 {
455 if (!theMaster) // all updates rely on being able to query the adapter
456 return;
457
458 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
459 theCauseRep->raw().header : theVirginRep.raw().header);
460 Must(request);
461
462 // TODO: move common ICAP/eCAP logic to Adaptation::Xaction or similar
463 // TODO: optimize Area-to-String conversion
464
465 // update the cross-transactional database if needed
466 if (const char *xxNameStr = Adaptation::Config::masterx_shared_name) {
467 Adaptation::History::Pointer ah = request->adaptHistory(true);
468 if (ah != NULL) {
469 libecap::Name xxName(xxNameStr); // TODO: optimize?
470 if (const libecap::Area val = theMaster->option(xxName))
471 ah->updateXxRecord(xxNameStr, val.toString().c_str());
472 }
473 }
474
475 // update the adaptation plan if needed
476 if (service().cfg().routing) {
477 String services;
478 if (const libecap::Area services = theMaster->option(libecap::metaNextServices)) {
479 Adaptation::History::Pointer ah = request->adaptHistory(true);
480 if (ah != NULL)
481 ah->updateNextServices(services.toString().c_str());
482 }
483 } // TODO: else warn (occasionally!) if we got libecap::metaNextServices
484
485 // Store received meta headers for adapt::<last_h logformat code use.
486 // If we already have stored headers from a previous adaptation transaction
487 // related to the same master transction, they will be replaced.
488 Adaptation::History::Pointer ah = request->adaptLogHistory();
489 if (ah != NULL) {
490 HttpHeader meta(hoReply);
491 OptionsExtractor extractor(meta);
492 theMaster->visitEachOption(extractor);
493 ah->recordMeta(&meta);
494 }
495
496 // Add just-created history to the adapted/cloned request that lacks it.
497 if (HttpRequest *adaptedReq = dynamic_cast<HttpRequest*>(adapted))
498 adaptedReq->adaptHistoryImport(*request);
499 }
500
501 void
502 Adaptation::Ecap::XactionRep::vbDiscard()
503 {
504 Must(makingVb == opUndecided);
505 // if adapter does not need vb, we do not need to send it
506 sinkVb("vbDiscard");
507 Must(makingVb == opNever);
508 }
509
510 void
511 Adaptation::Ecap::XactionRep::vbMake()
512 {
513 Must(makingVb == opUndecided);
514 BodyPipePointer &p = theVirginRep.raw().body_pipe;
515 Must(p != NULL);
516 Must(p->setConsumerIfNotLate(this)); // to deliver vb, we must receive vb
517 makingVb = opOn;
518 }
519
520 void
521 Adaptation::Ecap::XactionRep::vbStopMaking()
522 {
523 Must(makingVb == opOn);
524 // if adapter does not need vb, we do not need to receive it
525 sinkVb("vbStopMaking");
526 Must(makingVb == opComplete);
527 }
528
529 void
530 Adaptation::Ecap::XactionRep::vbMakeMore()
531 {
532 Must(makingVb == opOn); // cannot make more if done proxying
533 // we cannot guarantee more vb, but we can check that there is a chance
534 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
535 Must(p != NULL && p->stillConsuming(this)); // we are plugged in
536 Must(!p->productionEnded() && p->mayNeedMoreData()); // and may get more
537 }
538
539 libecap::Area
540 Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
541 {
542 // We may not be makingVb yet. It should be OK, but see vbContentShift().
543
544 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
545 Must(p != NULL);
546
547 // TODO: make MemBuf use size_t?
548 const size_t haveSize = static_cast<size_t>(p->buf().contentSize());
549
550 // convert to Squid types; XXX: check for overflow
551 const uint64_t offset = static_cast<uint64_t>(o);
552 Must(offset <= haveSize); // equal iff at the end of content
553
554 // nsize means no size limit: all content starting from offset
555 const size_t size = s == libecap::nsize ?
556 haveSize - offset : static_cast<size_t>(s);
557
558 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
559 return libecap::Area::FromTempBuffer(p->buf().content() + offset,
560 min(static_cast<size_t>(haveSize - offset), size));
561 }
562
563 void
564 Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n)
565 {
566 // We may not be makingVb yet. It should be OK now, but if BodyPipe
567 // consume() requirements change, we would have to return empty vbContent
568 // until the adapter registers as a consumer
569
570 BodyPipePointer &p = theVirginRep.raw().body_pipe;
571 Must(p != NULL);
572 const size_t size = static_cast<size_t>(n); // XXX: check for overflow
573 const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
574 p->consume(min(size, haveSize));
575 }
576
577 void
578 Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd)
579 {
580 Must(proxyingAb == opOn && !abProductionFinished);
581 abProductionFinished = true;
582 abProductionAtEnd = atEnd; // store until ready to stop producing ourselves
583 debugs(93,5, HERE << "adapted body production ended");
584 moveAbContent();
585 }
586
587 void
588 Adaptation::Ecap::XactionRep::noteAbContentAvailable()
589 {
590 Must(proxyingAb == opOn && !abProductionFinished);
591 moveAbContent();
592 }
593
594 #if 0 /* XXX: implement */
595 void
596 Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
597 {
598 Must(answer().body_pipe != NULL);
599 if (size.known())
600 answer().body_pipe->setBodySize(size.value());
601 // else the piped body size is unknown by default
602 }
603 #endif
604
605 void
606 Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
607 {
608 debugs(93,3, HERE << "adapter needs time: " <<
609 d.state << '/' << d.progress);
610 // XXX: set timeout?
611 }
612
613 void
614 Adaptation::Ecap::XactionRep::adaptationAborted()
615 {
616 tellQueryAborted(true); // should eCAP support retries?
617 mustStop("adaptationAborted");
618 }
619
620 void
621 Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
622 {
623 Must(proxyingAb == opOn);
624 moveAbContent();
625 }
626
627 void
628 Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
629 {
630 Must(proxyingAb == opOn);
631 stopProducingFor(answer().body_pipe, false);
632 Must(theMaster);
633 theMaster->abStopMaking();
634 proxyingAb = opComplete;
635 }
636
637 void
638 Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
639 {
640 Must(makingVb == opOn); // or we would not be registered as a consumer
641 Must(theMaster);
642 theMaster->noteVbContentAvailable();
643 }
644
645 void
646 Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
647 {
648 Must(makingVb == opOn); // or we would not be registered as a consumer
649 Must(theMaster);
650 theMaster->noteVbContentDone(true);
651 vbProductionFinished = true;
652 }
653
654 void
655 Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
656 {
657 Must(makingVb == opOn); // or we would not be registered as a consumer
658 Must(theMaster);
659 theMaster->noteVbContentDone(false);
660 vbProductionFinished = true;
661 }
662
663 void
664 Adaptation::Ecap::XactionRep::noteInitiatorAborted()
665 {
666 mustStop("initiator aborted");
667 }
668
669 // get content from the adapter and put it into the adapted pipe
670 void
671 Adaptation::Ecap::XactionRep::moveAbContent()
672 {
673 Must(proxyingAb == opOn);
674 const libecap::Area c = theMaster->abContent(0, libecap::nsize);
675 debugs(93,5, HERE << "up to " << c.size << " bytes");
676 if (c.size == 0 && abProductionFinished) { // no ab now and in the future
677 stopProducingFor(answer().body_pipe, abProductionAtEnd);
678 proxyingAb = opComplete;
679 debugs(93,5, HERE << "last adapted body data retrieved");
680 } else if (c.size > 0) {
681 if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
682 theMaster->abContentShift(used);
683 }
684 }
685
686 const char *
687 Adaptation::Ecap::XactionRep::status() const
688 {
689 static MemBuf buf;
690 buf.reset();
691
692 buf.append(" [", 2);
693
694 if (makingVb)
695 buf.Printf("M%d", static_cast<int>(makingVb));
696
697 const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
698 if (!vp)
699 buf.append(" !V", 3);
700 else if (vp->stillConsuming(const_cast<XactionRep*>(this)))
701 buf.append(" Vc", 3);
702 else
703 buf.append(" V?", 3);
704
705 if (vbProductionFinished)
706 buf.append(".", 1);
707
708 buf.Printf(" A%d", static_cast<int>(proxyingAb));
709
710 if (proxyingAb == opOn) {
711 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
712 Must(rep);
713 const BodyPipePointer &ap = rep->raw().body_pipe;
714 if (!ap)
715 buf.append(" !A", 3);
716 else if (ap->stillProducing(const_cast<XactionRep*>(this)))
717 buf.append(" Ap", 3);
718 else
719 buf.append(" A?", 3);
720 }
721
722 buf.Printf(" %s%u]", id.Prefix, id.value);
723
724 buf.terminate();
725
726 return buf.content();
727 }