]> git.ipfire.org Git - thirdparty/squid.git/blame - src/adaptation/ecap/XactionRep.cc
Polished SGML formatting.
[thirdparty/squid.git] / src / adaptation / ecap / XactionRep.cc
CommitLineData
b510f3a1
AJ
1/*
2 * DEBUG: section 93 eCAP Interface
3 */
fdc96a39 4#include "squid.h"
4d0854d4
AR
5#include <libecap/common/area.h>
6#include <libecap/common/delay.h>
22fff3bf
AR
7#include <libecap/common/named_values.h>
8#include <libecap/common/names.h>
fdc96a39 9#include <libecap/adapter/xaction.h>
fdc96a39
AR
10#include "HttpRequest.h"
11#include "HttpReply.h"
3ff65596 12#include "SquidTime.h"
1f3c65fc 13#include "adaptation/ecap/XactionRep.h"
22fff3bf 14#include "adaptation/ecap/Config.h"
3af10ac0 15#include "adaptation/Initiator.h"
3d93a84d 16#include "base/TextException.h"
fdc96a39 17
574b508c 18CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep, XactionRep);
fdc96a39
AR
19
20
5038f9d8
AR
21/// a libecap Visitor for converting adapter transaction options to HttpHeader
22class OptionsExtractor: public libecap::NamedValueVisitor
23{
24public:
25 typedef libecap::Name Name;
26 typedef libecap::Area Area;
27
28 OptionsExtractor(HttpHeader &aMeta): meta(aMeta) {}
29
30 // libecap::NamedValueVisitor API
ec4d1a1d 31 virtual void visit(const Name &name, const Area &value) {
5038f9d8
AR
32 meta.putExt(name.image().c_str(), value.toString().c_str());
33 }
34
35 HttpHeader &meta; ///< where to put extracted options
36};
37
4299f876 38Adaptation::Ecap::XactionRep::XactionRep(
4cb2536f
A
39 HttpMsg *virginHeader, HttpRequest *virginCause,
40 const Adaptation::ServicePointer &aService):
574b508c 41 AsyncJob("Adaptation::Ecap::XactionRep"),
4299f876 42 Adaptation::Initiate("Adaptation::Ecap::XactionRep"),
a22e6cd3 43 theService(aService),
26ac0430 44 theVirginRep(virginHeader), theCauseRep(NULL),
e1e90d26 45 makingVb(opUndecided), proxyingAb(opUndecided),
3ff65596 46 adaptHistoryId(-1),
e1e90d26 47 vbProductionFinished(false),
7477a343 48 abProductionFinished(false), abProductionAtEnd(false)
fdc96a39 49{
027320b4 50 if (virginCause)
4d0854d4 51 theCauseRep = new MessageRep(virginCause);
fdc96a39
AR
52}
53
574b508c 54Adaptation::Ecap::XactionRep::~XactionRep()
fdc96a39
AR
55{
56 assert(!theMaster);
027320b4 57 delete theCauseRep;
4d0854d4 58 theAnswerRep.reset();
fdc96a39
AR
59}
60
61void
574b508c 62Adaptation::Ecap::XactionRep::master(const AdapterXaction &x)
fdc96a39
AR
63{
64 Must(!theMaster);
65 Must(x != NULL);
66 theMaster = x;
67}
68
a22e6cd3
AR
69Adaptation::Service &
70Adaptation::Ecap::XactionRep::service()
71{
72 Must(theService != NULL);
73 return *theService;
74}
75
22fff3bf
AR
76const libecap::Area
77Adaptation::Ecap::XactionRep::option(const libecap::Name &name) const
78{
79 if (name == libecap::metaClientIp)
80 return clientIpValue();
81 if (name == libecap::metaUserName)
82 return usernameValue();
5038f9d8
AR
83 if (name == Adaptation::Config::masterx_shared_name)
84 return masterxSharedValue(name);
85
86 // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
22fff3bf
AR
87 return libecap::Area();
88}
89
90void
91Adaptation::Ecap::XactionRep::visitEachOption(libecap::NamedValueVisitor &visitor) const
92{
93 if (const libecap::Area value = clientIpValue())
ec4d1a1d 94 visitor.visit(libecap::metaClientIp, value);
22fff3bf 95 if (const libecap::Area value = usernameValue())
ec4d1a1d 96 visitor.visit(libecap::metaUserName, value);
5038f9d8
AR
97
98 if (Adaptation::Config::masterx_shared_name) {
ec4d1a1d
A
99 const libecap::Name name(Adaptation::Config::masterx_shared_name);
100 if (const libecap::Area value = masterxSharedValue(name))
101 visitor.visit(name, value);
5038f9d8
AR
102 }
103
104 // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
22fff3bf
AR
105}
106
107const libecap::Area
108Adaptation::Ecap::XactionRep::clientIpValue() const
109{
110 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
111 theCauseRep->raw().header : theVirginRep.raw().header);
112 Must(request);
113 // TODO: move this logic into HttpRequest::clientIp(bool) and
114 // HttpRequest::clientIpString(bool) and reuse everywhere
115 if (TheConfig.send_client_ip && request) {
116 Ip::Address client_addr;
117#if FOLLOW_X_FORWARDED_FOR
118 if (TheConfig.use_indirect_client) {
119 client_addr = request->indirect_client_addr;
ec4d1a1d 120 } else
22fff3bf
AR
121#endif
122 client_addr = request->client_addr;
123 if (!client_addr.IsAnyAddr() && !client_addr.IsNoAddr()) {
124 char ntoabuf[MAX_IPSTRLEN] = "";
125 client_addr.NtoA(ntoabuf,MAX_IPSTRLEN);
126 return libecap::Area::FromTempBuffer(ntoabuf, strlen(ntoabuf));
127 }
ec4d1a1d 128 }
22fff3bf
AR
129 return libecap::Area();
130}
131
132const libecap::Area
133Adaptation::Ecap::XactionRep::usernameValue() const
134{
135 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
136 theCauseRep->raw().header : theVirginRep.raw().header);
137 Must(request);
138 if (request->auth_user_request != NULL) {
139 if (char const *name = request->auth_user_request->username())
140 return libecap::Area::FromTempBuffer(name, strlen(name));
ec4d1a1d 141 }
22fff3bf
AR
142 return libecap::Area();
143}
144
5038f9d8
AR
145const libecap::Area
146Adaptation::Ecap::XactionRep::masterxSharedValue(const libecap::Name &name) const
147{
148 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
149 theCauseRep->raw().header : theVirginRep.raw().header);
150 Must(request);
151 if (name.known()) { // must check to avoid empty names matching unset cfg
152 Adaptation::History::Pointer ah = request->adaptHistory(false);
153 if (ah != NULL) {
154 String name, value;
155 if (ah->getXxRecord(name, value))
156 return libecap::Area::FromTempBuffer(value.rawBuf(), value.size());
157 }
158 }
159 return libecap::Area();
160}
161
fdc96a39 162void
574b508c 163Adaptation::Ecap::XactionRep::start()
fdc96a39
AR
164{
165 Must(theMaster);
4d0854d4 166
e1e90d26
AR
167 if (!theVirginRep.raw().body_pipe)
168 makingVb = opNever; // there is nothing to deliver
4d0854d4 169
3ff65596 170 const HttpRequest *request = dynamic_cast<const HttpRequest*> (theCauseRep ?
e1381638 171 theCauseRep->raw().header : theVirginRep.raw().header);
3ff65596 172 Must(request);
a22e6cd3 173 Adaptation::History::Pointer ah = request->adaptLogHistory();
e1381638 174 if (ah != NULL) {
3ff65596
AR
175 // retrying=false because ecap never retries transactions
176 adaptHistoryId = ah->recordXactStart(service().cfg().key, current_time, false);
177 }
178
fdc96a39
AR
179 theMaster->start();
180}
181
182void
574b508c 183Adaptation::Ecap::XactionRep::swanSong()
fdc96a39 184{
506a0530 185 // clear body_pipes, if any
ea76d91e 186 // this code does not maintain proxying* and canAccessVb states; should it?
506a0530
AR
187
188 if (theAnswerRep != NULL) {
f1a768b2
AR
189 BodyPipe::Pointer body_pipe = answer().body_pipe;
190 if (body_pipe != NULL) {
191 Must(body_pipe->stillProducing(this));
192 stopProducingFor(body_pipe, false);
193 }
194 }
506a0530 195
e1e90d26
AR
196 BodyPipe::Pointer &body_pipe = theVirginRep.raw().body_pipe;
197 if (body_pipe != NULL && body_pipe->stillConsuming(this))
198 stopConsumingFrom(body_pipe);
506a0530 199
fdc96a39 200 terminateMaster();
3ff65596
AR
201
202 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
e1381638 203 theCauseRep->raw().header : theVirginRep.raw().header);
3ff65596 204 Must(request);
a22e6cd3 205 Adaptation::History::Pointer ah = request->adaptLogHistory();
3ff65596
AR
206 if (ah != NULL && adaptHistoryId >= 0)
207 ah->recordXactFinish(adaptHistoryId);
208
fdc96a39
AR
209 Adaptation::Initiate::swanSong();
210}
211
fdc96a39 212libecap::Message &
574b508c 213Adaptation::Ecap::XactionRep::virgin()
fdc96a39 214{
7b67e5b6 215 return theVirginRep;
fdc96a39
AR
216}
217
4d0854d4 218const libecap::Message &
574b508c 219Adaptation::Ecap::XactionRep::cause()
fdc96a39 220{
4d0854d4
AR
221 Must(theCauseRep != NULL);
222 return *theCauseRep;
fdc96a39
AR
223}
224
4d0854d4 225libecap::Message &
574b508c 226Adaptation::Ecap::XactionRep::adapted()
fdc96a39 227{
4d0854d4
AR
228 Must(theAnswerRep != NULL);
229 return *theAnswerRep;
230}
231
232Adaptation::Message &
574b508c 233Adaptation::Ecap::XactionRep::answer()
4d0854d4 234{
f1a768b2
AR
235 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
236 Must(rep);
4d0854d4
AR
237 return rep->raw();
238}
239
ea76d91e 240void
574b508c 241Adaptation::Ecap::XactionRep::terminateMaster()
4d0854d4
AR
242{
243 if (theMaster) {
ea76d91e
AR
244 AdapterXaction x = theMaster;
245 theMaster.reset();
246 x->stop();
f1a768b2 247 }
4d0854d4
AR
248}
249
4d0854d4 250bool
574b508c 251Adaptation::Ecap::XactionRep::doneAll() const
4d0854d4 252{
e1e90d26 253 return makingVb >= opComplete && proxyingAb >= opComplete &&
26ac0430 254 Adaptation::Initiate::doneAll();
4d0854d4
AR
255}
256
e1e90d26 257// stops receiving virgin and enables auto-consumption, dropping any vb bytes
4d0854d4 258void
e1e90d26 259Adaptation::Ecap::XactionRep::sinkVb(const char *reason)
4d0854d4 260{
e1e90d26 261 debugs(93,4, HERE << "sink for " << reason << "; status:" << status());
4d0854d4 262
e1e90d26
AR
263 // we reset raw().body_pipe when we are done, so use this one for checking
264 const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
265 if (permPipe != NULL)
266 permPipe->enableAutoConsumption();
3af10ac0 267
e1e90d26
AR
268 forgetVb(reason);
269}
270
271// stops receiving virgin but preserves it for others to use
272void
273Adaptation::Ecap::XactionRep::preserveVb(const char *reason)
274{
275 debugs(93,4, HERE << "preserve for " << reason << "; status:" << status());
276
277 // we reset raw().body_pipe when we are done, so use this one for checking
278 const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
279 if (permPipe != NULL) {
280 // if libecap consumed, we cannot preserve
281 Must(!permPipe->consumedSize());
3af10ac0
AR
282 }
283
e1e90d26
AR
284 forgetVb(reason);
285}
286
287// disassociates us from vb; the last step of sinking or preserving vb
288void
289Adaptation::Ecap::XactionRep::forgetVb(const char *reason)
290{
291 debugs(93,9, HERE << "forget vb " << reason << "; status:" << status());
ea76d91e 292
e1e90d26
AR
293 BodyPipePointer &p = theVirginRep.raw().body_pipe;
294 if (p != NULL && p->stillConsuming(this))
295 stopConsumingFrom(p);
296
297 if (makingVb == opUndecided)
298 makingVb = opNever;
299 else if (makingVb == opOn)
300 makingVb = opComplete;
fdc96a39
AR
301}
302
26ac0430 303void
574b508c 304Adaptation::Ecap::XactionRep::useVirgin()
fdc96a39 305{
4d0854d4 306 debugs(93,3, HERE << status());
ea76d91e
AR
307 Must(proxyingAb == opUndecided);
308 proxyingAb = opNever;
4d0854d4 309
e1e90d26 310 preserveVb("useVirgin");
2874d9e3
AR
311
312 HttpMsg *clone = theVirginRep.raw().header->clone();
313 // check that clone() copies the pipe so that we do not have to
e1e90d26 314 Must(!theVirginRep.raw().header->body_pipe == !clone->body_pipe);
ea76d91e 315
5038f9d8 316 updateHistory();
3af10ac0 317 sendAnswer(Answer::Forward(clone));
4d0854d4 318 Must(done());
fdc96a39
AR
319}
320
26ac0430 321void
574b508c 322Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
fdc96a39 323{
4d0854d4 324 debugs(93,3, HERE << status());
ea76d91e 325 Must(m);
4d0854d4 326 theAnswerRep = m;
ea76d91e
AR
327 Must(proxyingAb == opUndecided);
328
f1a768b2 329 HttpMsg *msg = answer().header;
ea76d91e
AR
330 if (!theAnswerRep->body()) { // final, bodyless answer
331 proxyingAb = opNever;
5038f9d8 332 updateHistory();
3af10ac0 333 sendAnswer(Answer::Forward(msg));
f1a768b2 334 } else { // got answer headers but need to handle body
ea76d91e 335 proxyingAb = opOn;
f1a768b2 336 Must(!msg->body_pipe); // only host can set body pipes
ea76d91e 337 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
f1a768b2
AR
338 Must(rep);
339 rep->tieBody(this); // sets us as a producer
340 Must(msg->body_pipe != NULL); // check tieBody
ea76d91e 341
5038f9d8 342 updateHistory();
3af10ac0 343 sendAnswer(Answer::Forward(msg));
ea76d91e 344
4d0854d4 345 debugs(93,4, HERE << "adapter will produce body" << status());
8679e6c2 346 theMaster->abMake(); // libecap will produce
4d0854d4 347 }
fdc96a39
AR
348}
349
3af10ac0
AR
350void
351Adaptation::Ecap::XactionRep::blockVirgin()
352{
353 debugs(93,3, HERE << status());
354 Must(proxyingAb == opUndecided);
355 proxyingAb = opNever;
356
e1e90d26 357 sinkVb("blockVirgin");
3af10ac0 358
5038f9d8 359 updateHistory();
3af10ac0
AR
360 sendAnswer(Answer::Block(service().cfg().key));
361 Must(done());
362}
363
5038f9d8
AR
364/// Called just before sendAnswer() to record adapter meta-information
365/// which may affect answer processing and may be needed for logging.
366void
367Adaptation::Ecap::XactionRep::updateHistory()
368{
369 if (!theMaster) // all updates rely on being able to query the adapter
370 return;
371
372 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
373 theCauseRep->raw().header : theVirginRep.raw().header);
374 Must(request);
375
376 // TODO: move common ICAP/eCAP logic to Adaptation::Xaction or similar
377 // TODO: optimize Area-to-String conversion
378
379 // update the cross-transactional database if needed
380 if (const char *xxNameStr = Adaptation::Config::masterx_shared_name) {
381 Adaptation::History::Pointer ah = request->adaptHistory(true);
382 if (ah != NULL) {
383 libecap::Name xxName(xxNameStr); // TODO: optimize?
384 if (const libecap::Area val = theMaster->option(xxName))
385 ah->updateXxRecord(xxNameStr, val.toString().c_str());
386 }
387 }
388
389 // update the adaptation plan if needed
390 if (service().cfg().routing) {
391 String services;
392 if (const libecap::Area services = theMaster->option(libecap::metaNextServices)) {
393 Adaptation::History::Pointer ah = request->adaptHistory(true);
394 if (ah != NULL)
395 ah->updateNextServices(services.toString().c_str());
396 }
397 } // TODO: else warn (occasionally!) if we got libecap::metaNextServices
398
399 // Store received meta headers for adapt::<last_h logformat code use.
400 // If we already have stored headers from a previous adaptation transaction
401 // related to the same master transction, they will be replaced.
402 Adaptation::History::Pointer ah = request->adaptLogHistory();
403 if (ah != NULL) {
404 HttpHeader meta(hoReply);
405 OptionsExtractor extractor(meta);
406 theMaster->visitEachOption(extractor);
407 ah->recordMeta(&meta);
408 }
409}
410
411
4d0854d4 412void
574b508c 413Adaptation::Ecap::XactionRep::vbDiscard()
fdc96a39 414{
e1e90d26 415 Must(makingVb == opUndecided);
8679e6c2 416 // if adapter does not need vb, we do not need to send it
e1e90d26
AR
417 sinkVb("vbDiscard");
418 Must(makingVb == opNever);
fdc96a39
AR
419}
420
4d0854d4 421void
574b508c 422Adaptation::Ecap::XactionRep::vbMake()
fdc96a39 423{
e1e90d26 424 Must(makingVb == opUndecided);
ea76d91e
AR
425 BodyPipePointer &p = theVirginRep.raw().body_pipe;
426 Must(p != NULL);
e1e90d26
AR
427 Must(p->setConsumerIfNotLate(this)); // to deliver vb, we must receive vb
428 makingVb = opOn;
fdc96a39
AR
429}
430
4d0854d4 431void
574b508c 432Adaptation::Ecap::XactionRep::vbStopMaking()
fdc96a39 433{
e1e90d26 434 Must(makingVb == opOn);
ea76d91e 435 // if adapter does not need vb, we do not need to receive it
e1e90d26
AR
436 sinkVb("vbStopMaking");
437 Must(makingVb == opComplete);
4d0854d4
AR
438}
439
440void
574b508c 441Adaptation::Ecap::XactionRep::vbMakeMore()
4d0854d4 442{
e1e90d26 443 Must(makingVb == opOn); // cannot make more if done proxying
ea76d91e 444 // we cannot guarantee more vb, but we can check that there is a chance
e1e90d26
AR
445 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
446 Must(p != NULL && p->stillConsuming(this)); // we are plugged in
447 Must(!p->productionEnded() && p->mayNeedMoreData()); // and may get more
4d0854d4
AR
448}
449
8679e6c2 450libecap::Area
574b508c 451Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
4d0854d4 452{
e1e90d26 453 // We may not be makingVb yet. It should be OK, but see vbContentShift().
ea76d91e 454
8679e6c2 455 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
ea76d91e
AR
456 Must(p != NULL);
457
458 // TODO: make MemBuf use size_t?
459 const size_t haveSize = static_cast<size_t>(p->buf().contentSize());
4d0854d4 460
8679e6c2
AR
461 // convert to Squid types; XXX: check for overflow
462 const uint64_t offset = static_cast<uint64_t>(o);
463 Must(offset <= haveSize); // equal iff at the end of content
464
465 // nsize means no size limit: all content starting from offset
466 const size_t size = s == libecap::nsize ?
26ac0430 467 haveSize - offset : static_cast<size_t>(s);
8679e6c2 468
8679e6c2
AR
469 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
470 return libecap::Area::FromTempBuffer(p->buf().content() + offset,
26ac0430 471 min(static_cast<size_t>(haveSize - offset), size));
4d0854d4
AR
472}
473
474void
574b508c 475Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n)
4d0854d4 476{
e1e90d26 477 // We may not be makingVb yet. It should be OK now, but if BodyPipe
ea76d91e
AR
478 // consume() requirements change, we would have to return empty vbContent
479 // until the adapter registers as a consumer
480
8679e6c2
AR
481 BodyPipePointer &p = theVirginRep.raw().body_pipe;
482 Must(p != NULL);
483 const size_t size = static_cast<size_t>(n); // XXX: check for overflow
484 const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
485 p->consume(min(size, haveSize));
4d0854d4
AR
486}
487
488void
574b508c 489Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd)
4d0854d4 490{
7477a343
AR
491 Must(proxyingAb == opOn && !abProductionFinished);
492 abProductionFinished = true;
493 abProductionAtEnd = atEnd; // store until ready to stop producing ourselves
494 debugs(93,5, HERE << "adapted body production ended");
495 moveAbContent();
4d0854d4
AR
496}
497
8679e6c2 498void
574b508c 499Adaptation::Ecap::XactionRep::noteAbContentAvailable()
4d0854d4 500{
7477a343 501 Must(proxyingAb == opOn && !abProductionFinished);
8679e6c2 502 moveAbContent();
4d0854d4
AR
503}
504
ea76d91e 505#if 0 /* XXX: implement */
4d0854d4 506void
574b508c 507Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
4d0854d4
AR
508{
509 Must(answer().body_pipe != NULL);
8679e6c2
AR
510 if (size.known())
511 answer().body_pipe->setBodySize(size.value());
512 // else the piped body size is unknown by default
4d0854d4 513}
8679e6c2 514#endif
4d0854d4
AR
515
516void
574b508c 517Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
4d0854d4
AR
518{
519 debugs(93,3, HERE << "adapter needs time: " <<
26ac0430 520 d.state << '/' << d.progress);
4d0854d4 521 // XXX: set timeout?
fdc96a39
AR
522}
523
26ac0430 524void
574b508c 525Adaptation::Ecap::XactionRep::adaptationAborted()
fdc96a39 526{
fdc96a39 527 tellQueryAborted(true); // should eCAP support retries?
ea76d91e 528 mustStop("adaptationAborted");
fdc96a39
AR
529}
530
8679e6c2 531bool
574b508c 532Adaptation::Ecap::XactionRep::callable() const
8679e6c2
AR
533{
534 return !done();
535}
536
26ac0430 537void
574b508c 538Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
fdc96a39 539{
ea76d91e
AR
540 Must(proxyingAb == opOn);
541 moveAbContent();
fdc96a39
AR
542}
543
26ac0430 544void
574b508c 545Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
fdc96a39 546{
ea76d91e
AR
547 Must(proxyingAb == opOn);
548 stopProducingFor(answer().body_pipe, false);
549 Must(theMaster);
550 theMaster->abStopMaking();
551 proxyingAb = opComplete;
fdc96a39
AR
552}
553
554void
574b508c 555Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
fdc96a39 556{
e1e90d26 557 Must(makingVb == opOn); // or we would not be registered as a consumer
fdc96a39 558 Must(theMaster);
8679e6c2 559 theMaster->noteVbContentAvailable();
fdc96a39
AR
560}
561
562void
574b508c 563Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
fdc96a39 564{
e1e90d26 565 Must(makingVb == opOn); // or we would not be registered as a consumer
fdc96a39 566 Must(theMaster);
8679e6c2 567 theMaster->noteVbContentDone(true);
e1e90d26 568 vbProductionFinished = true;
fdc96a39
AR
569}
570
571void
574b508c 572Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
fdc96a39 573{
e1e90d26 574 Must(makingVb == opOn); // or we would not be registered as a consumer
8679e6c2
AR
575 Must(theMaster);
576 theMaster->noteVbContentDone(false);
e1e90d26 577 vbProductionFinished = true;
fdc96a39
AR
578}
579
580void
574b508c 581Adaptation::Ecap::XactionRep::noteInitiatorAborted()
fdc96a39
AR
582{
583 mustStop("initiator aborted");
584}
585
8679e6c2
AR
586// get content from the adapter and put it into the adapted pipe
587void
574b508c 588Adaptation::Ecap::XactionRep::moveAbContent()
8679e6c2 589{
ea76d91e 590 Must(proxyingAb == opOn);
8679e6c2 591 const libecap::Area c = theMaster->abContent(0, libecap::nsize);
7477a343
AR
592 debugs(93,5, HERE << "up to " << c.size << " bytes");
593 if (c.size == 0 && abProductionFinished) { // no ab now and in the future
594 stopProducingFor(answer().body_pipe, abProductionAtEnd);
595 proxyingAb = opComplete;
596 debugs(93,5, HERE << "last adapted body data retrieved");
e1381638 597 } else if (c.size > 0) {
7477a343
AR
598 if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
599 theMaster->abContentShift(used);
600 }
8679e6c2
AR
601}
602
603const char *
574b508c 604Adaptation::Ecap::XactionRep::status() const
fdc96a39 605{
4d0854d4
AR
606 static MemBuf buf;
607 buf.reset();
608
609 buf.append(" [", 2);
610
e1e90d26
AR
611 if (makingVb)
612 buf.Printf("M%d", static_cast<int>(makingVb));
613
614 const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
615 if (!vp)
616 buf.append(" !V", 3);
ec4d1a1d 617 else if (vp->stillConsuming(const_cast<XactionRep*>(this)))
e1e90d26
AR
618 buf.append(" Vc", 3);
619 else
620 buf.append(" V?", 3);
621
622 if (vbProductionFinished)
623 buf.append(".", 1);
624
625
626 buf.Printf(" A%d", static_cast<int>(proxyingAb));
506a0530 627
ea76d91e
AR
628 if (proxyingAb == opOn) {
629 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
630 Must(rep);
f1a768b2 631 const BodyPipePointer &ap = rep->raw().body_pipe;
e1e90d26
AR
632 if (!ap)
633 buf.append(" !A", 3);
634 else if (ap->stillProducing(const_cast<XactionRep*>(this)))
635 buf.append(" Ap", 3);
636 else
637 buf.append(" A?", 3);
f1a768b2 638 }
4d0854d4 639
52ed047a 640 buf.Printf(" %s%u]", id.Prefix, id.value);
4d0854d4
AR
641
642 buf.terminate();
643
644 return buf.content();
fdc96a39 645}