]> git.ipfire.org Git - thirdparty/squid.git/blame - src/adaptation/ecap/XactionRep.cc
Updated copyright for icons/SN.png
[thirdparty/squid.git] / src / adaptation / ecap / XactionRep.cc
CommitLineData
b510f3a1
AJ
1/*
2 * DEBUG: section 93 eCAP Interface
3 */
582c2af2 4#include "squid.h"
4d0854d4
AR
5#include <libecap/common/area.h>
6#include <libecap/common/delay.h>
22fff3bf
AR
7#include <libecap/common/named_values.h>
8#include <libecap/common/names.h>
fdc96a39 9#include <libecap/adapter/xaction.h>
fdc96a39
AR
10#include "HttpRequest.h"
11#include "HttpReply.h"
3ff65596 12#include "SquidTime.h"
1adcebc3 13#include "adaptation/Answer.h"
1f3c65fc 14#include "adaptation/ecap/XactionRep.h"
22fff3bf 15#include "adaptation/ecap/Config.h"
3af10ac0 16#include "adaptation/Initiator.h"
3d93a84d 17#include "base/TextException.h"
fdc96a39 18
574b508c 19CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep, XactionRep);
fdc96a39 20
5038f9d8
AR
21/// a libecap Visitor for converting adapter transaction options to HttpHeader
22class OptionsExtractor: public libecap::NamedValueVisitor
23{
24public:
25 typedef libecap::Name Name;
26 typedef libecap::Area Area;
27
28 OptionsExtractor(HttpHeader &aMeta): meta(aMeta) {}
29
30 // libecap::NamedValueVisitor API
ec4d1a1d 31 virtual void visit(const Name &name, const Area &value) {
5038f9d8
AR
32 meta.putExt(name.image().c_str(), value.toString().c_str());
33 }
34
35 HttpHeader &meta; ///< where to put extracted options
36};
37
4299f876 38Adaptation::Ecap::XactionRep::XactionRep(
4cb2536f
A
39 HttpMsg *virginHeader, HttpRequest *virginCause,
40 const Adaptation::ServicePointer &aService):
574b508c 41 AsyncJob("Adaptation::Ecap::XactionRep"),
4299f876 42 Adaptation::Initiate("Adaptation::Ecap::XactionRep"),
a22e6cd3 43 theService(aService),
26ac0430 44 theVirginRep(virginHeader), theCauseRep(NULL),
e1e90d26 45 makingVb(opUndecided), proxyingAb(opUndecided),
3ff65596 46 adaptHistoryId(-1),
e1e90d26 47 vbProductionFinished(false),
7477a343 48 abProductionFinished(false), abProductionAtEnd(false)
fdc96a39 49{
027320b4 50 if (virginCause)
4d0854d4 51 theCauseRep = new MessageRep(virginCause);
fdc96a39
AR
52}
53
574b508c 54Adaptation::Ecap::XactionRep::~XactionRep()
fdc96a39
AR
55{
56 assert(!theMaster);
027320b4 57 delete theCauseRep;
4d0854d4 58 theAnswerRep.reset();
fdc96a39
AR
59}
60
61void
574b508c 62Adaptation::Ecap::XactionRep::master(const AdapterXaction &x)
fdc96a39
AR
63{
64 Must(!theMaster);
65 Must(x != NULL);
66 theMaster = x;
67}
68
a22e6cd3
AR
69Adaptation::Service &
70Adaptation::Ecap::XactionRep::service()
71{
72 Must(theService != NULL);
73 return *theService;
74}
75
22fff3bf
AR
76const libecap::Area
77Adaptation::Ecap::XactionRep::option(const libecap::Name &name) const
78{
79 if (name == libecap::metaClientIp)
80 return clientIpValue();
81 if (name == libecap::metaUserName)
82 return usernameValue();
71be37e0 83 if (Adaptation::Config::masterx_shared_name && name == Adaptation::Config::masterx_shared_name)
5038f9d8
AR
84 return masterxSharedValue(name);
85
86 // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
71be37e0
CT
87
88 // If the name is unknown, metaValue returns an emtpy area
89 return metaValue(name);
22fff3bf
AR
90}
91
92void
93Adaptation::Ecap::XactionRep::visitEachOption(libecap::NamedValueVisitor &visitor) const
94{
95 if (const libecap::Area value = clientIpValue())
ec4d1a1d 96 visitor.visit(libecap::metaClientIp, value);
22fff3bf 97 if (const libecap::Area value = usernameValue())
ec4d1a1d 98 visitor.visit(libecap::metaUserName, value);
5038f9d8
AR
99
100 if (Adaptation::Config::masterx_shared_name) {
ec4d1a1d
A
101 const libecap::Name name(Adaptation::Config::masterx_shared_name);
102 if (const libecap::Area value = masterxSharedValue(name))
103 visitor.visit(name, value);
5038f9d8 104 }
71ee0835 105
71be37e0 106 visitEachMetaHeader(visitor);
5038f9d8
AR
107
108 // TODO: metaServerIp, metaAuthenticatedUser, and metaAuthenticatedGroups
22fff3bf
AR
109}
110
111const libecap::Area
112Adaptation::Ecap::XactionRep::clientIpValue() const
113{
114 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
115 theCauseRep->raw().header : theVirginRep.raw().header);
116 Must(request);
117 // TODO: move this logic into HttpRequest::clientIp(bool) and
118 // HttpRequest::clientIpString(bool) and reuse everywhere
119 if (TheConfig.send_client_ip && request) {
120 Ip::Address client_addr;
121#if FOLLOW_X_FORWARDED_FOR
122 if (TheConfig.use_indirect_client) {
123 client_addr = request->indirect_client_addr;
ec4d1a1d 124 } else
22fff3bf
AR
125#endif
126 client_addr = request->client_addr;
127 if (!client_addr.IsAnyAddr() && !client_addr.IsNoAddr()) {
128 char ntoabuf[MAX_IPSTRLEN] = "";
129 client_addr.NtoA(ntoabuf,MAX_IPSTRLEN);
130 return libecap::Area::FromTempBuffer(ntoabuf, strlen(ntoabuf));
131 }
ec4d1a1d 132 }
22fff3bf
AR
133 return libecap::Area();
134}
135
136const libecap::Area
137Adaptation::Ecap::XactionRep::usernameValue() const
138{
321f219c 139#if USE_AUTH
22fff3bf
AR
140 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
141 theCauseRep->raw().header : theVirginRep.raw().header);
142 Must(request);
143 if (request->auth_user_request != NULL) {
144 if (char const *name = request->auth_user_request->username())
145 return libecap::Area::FromTempBuffer(name, strlen(name));
cb72cd25 146 else if (request->extacl_user.defined() && request->extacl_user.size())
8661a1d0 147 return libecap::Area::FromTempBuffer(request->extacl_user.rawBuf(),
cb72cd25 148 request->extacl_user.size());
ec4d1a1d 149 }
321f219c 150#endif
22fff3bf
AR
151 return libecap::Area();
152}
153
5038f9d8
AR
154const libecap::Area
155Adaptation::Ecap::XactionRep::masterxSharedValue(const libecap::Name &name) const
156{
157 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
158 theCauseRep->raw().header : theVirginRep.raw().header);
159 Must(request);
160 if (name.known()) { // must check to avoid empty names matching unset cfg
161 Adaptation::History::Pointer ah = request->adaptHistory(false);
162 if (ah != NULL) {
163 String name, value;
164 if (ah->getXxRecord(name, value))
165 return libecap::Area::FromTempBuffer(value.rawBuf(), value.size());
166 }
167 }
168 return libecap::Area();
169}
170
71be37e0
CT
171const libecap::Area
172Adaptation::Ecap::XactionRep::metaValue(const libecap::Name &name) const
173{
174 HttpRequest *request = dynamic_cast<HttpRequest*>(theCauseRep ?
71ee0835 175 theCauseRep->raw().header : theVirginRep.raw().header);
71be37e0
CT
176 Must(request);
177 HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
178
179 if (name.known()) { // must check to avoid empty names matching unset cfg
d7f4a0b7 180 typedef Notes::iterator ACAMLI;
71ee0835 181 for (ACAMLI i = Adaptation::Config::metaHeaders.begin(); i != Adaptation::Config::metaHeaders.end(); ++i) {
d7f4a0b7 182 if (name == (*i)->key.termedBuf()) {
71be37e0
CT
183 if (const char *value = (*i)->match(request, reply))
184 return libecap::Area::FromTempString(value);
185 else
186 return libecap::Area();
187 }
188 }
189 }
190
191 return libecap::Area();
192}
193
71ee0835 194void
71be37e0
CT
195Adaptation::Ecap::XactionRep::visitEachMetaHeader(libecap::NamedValueVisitor &visitor) const
196{
197 HttpRequest *request = dynamic_cast<HttpRequest*>(theCauseRep ?
71ee0835 198 theCauseRep->raw().header : theVirginRep.raw().header);
71be37e0
CT
199 Must(request);
200 HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
71ee0835 201
d7f4a0b7 202 typedef Notes::iterator ACAMLI;
71ee0835 203 for (ACAMLI i = Adaptation::Config::metaHeaders.begin(); i != Adaptation::Config::metaHeaders.end(); ++i) {
baa31c42
HN
204 const char *v = (*i)->match(request, reply);
205 if (v) {
d7f4a0b7 206 const libecap::Name name((*i)->key.termedBuf());
71be37e0
CT
207 const libecap::Area value = libecap::Area::FromTempString(v);
208 visitor.visit(name, value);
209 }
210 }
211}
212
fdc96a39 213void
574b508c 214Adaptation::Ecap::XactionRep::start()
fdc96a39
AR
215{
216 Must(theMaster);
4d0854d4 217
e1e90d26
AR
218 if (!theVirginRep.raw().body_pipe)
219 makingVb = opNever; // there is nothing to deliver
4d0854d4 220
d7f4a0b7 221 HttpRequest *request = dynamic_cast<HttpRequest*> (theCauseRep ?
ffb82151 222 theCauseRep->raw().header : theVirginRep.raw().header);
3ff65596 223 Must(request);
d7f4a0b7
CT
224
225 HttpReply *reply = dynamic_cast<HttpReply*>(theVirginRep.raw().header);
226
a22e6cd3 227 Adaptation::History::Pointer ah = request->adaptLogHistory();
e1381638 228 if (ah != NULL) {
3ff65596
AR
229 // retrying=false because ecap never retries transactions
230 adaptHistoryId = ah->recordXactStart(service().cfg().key, current_time, false);
d7f4a0b7
CT
231 typedef Notes::iterator ACAMLI;
232 for (ACAMLI i = Adaptation::Config::metaHeaders.begin(); i != Adaptation::Config::metaHeaders.end(); ++i) {
233 const char *v = (*i)->match(request, reply);
234 if (v && !ah->metaHeaders.hasByNameListMember((*i)->key.termedBuf(), v, ',')) {
235 ah->metaHeaders.addEntry(new HttpHeaderEntry(HDR_OTHER, (*i)->key.termedBuf(), v));
236 }
237 }
3ff65596
AR
238 }
239
fdc96a39
AR
240 theMaster->start();
241}
242
243void
574b508c 244Adaptation::Ecap::XactionRep::swanSong()
fdc96a39 245{
506a0530 246 // clear body_pipes, if any
ea76d91e 247 // this code does not maintain proxying* and canAccessVb states; should it?
506a0530
AR
248
249 if (theAnswerRep != NULL) {
f1a768b2
AR
250 BodyPipe::Pointer body_pipe = answer().body_pipe;
251 if (body_pipe != NULL) {
252 Must(body_pipe->stillProducing(this));
253 stopProducingFor(body_pipe, false);
254 }
255 }
506a0530 256
e1e90d26
AR
257 BodyPipe::Pointer &body_pipe = theVirginRep.raw().body_pipe;
258 if (body_pipe != NULL && body_pipe->stillConsuming(this))
259 stopConsumingFrom(body_pipe);
506a0530 260
fdc96a39 261 terminateMaster();
3ff65596
AR
262
263 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
e1381638 264 theCauseRep->raw().header : theVirginRep.raw().header);
3ff65596 265 Must(request);
a22e6cd3 266 Adaptation::History::Pointer ah = request->adaptLogHistory();
3ff65596
AR
267 if (ah != NULL && adaptHistoryId >= 0)
268 ah->recordXactFinish(adaptHistoryId);
269
fdc96a39
AR
270 Adaptation::Initiate::swanSong();
271}
272
fdc96a39 273libecap::Message &
574b508c 274Adaptation::Ecap::XactionRep::virgin()
fdc96a39 275{
7b67e5b6 276 return theVirginRep;
fdc96a39
AR
277}
278
4d0854d4 279const libecap::Message &
574b508c 280Adaptation::Ecap::XactionRep::cause()
fdc96a39 281{
4d0854d4
AR
282 Must(theCauseRep != NULL);
283 return *theCauseRep;
fdc96a39
AR
284}
285
4d0854d4 286libecap::Message &
574b508c 287Adaptation::Ecap::XactionRep::adapted()
fdc96a39 288{
4d0854d4
AR
289 Must(theAnswerRep != NULL);
290 return *theAnswerRep;
291}
292
293Adaptation::Message &
574b508c 294Adaptation::Ecap::XactionRep::answer()
4d0854d4 295{
f1a768b2
AR
296 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
297 Must(rep);
4d0854d4
AR
298 return rep->raw();
299}
300
ea76d91e 301void
574b508c 302Adaptation::Ecap::XactionRep::terminateMaster()
4d0854d4
AR
303{
304 if (theMaster) {
ea76d91e
AR
305 AdapterXaction x = theMaster;
306 theMaster.reset();
307 x->stop();
f1a768b2 308 }
4d0854d4
AR
309}
310
4d0854d4 311bool
574b508c 312Adaptation::Ecap::XactionRep::doneAll() const
4d0854d4 313{
e1e90d26 314 return makingVb >= opComplete && proxyingAb >= opComplete &&
26ac0430 315 Adaptation::Initiate::doneAll();
4d0854d4
AR
316}
317
e1e90d26 318// stops receiving virgin and enables auto-consumption, dropping any vb bytes
4d0854d4 319void
e1e90d26 320Adaptation::Ecap::XactionRep::sinkVb(const char *reason)
4d0854d4 321{
e1e90d26 322 debugs(93,4, HERE << "sink for " << reason << "; status:" << status());
4d0854d4 323
e1e90d26
AR
324 // we reset raw().body_pipe when we are done, so use this one for checking
325 const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
326 if (permPipe != NULL)
327 permPipe->enableAutoConsumption();
3af10ac0 328
e1e90d26
AR
329 forgetVb(reason);
330}
331
332// stops receiving virgin but preserves it for others to use
333void
334Adaptation::Ecap::XactionRep::preserveVb(const char *reason)
335{
336 debugs(93,4, HERE << "preserve for " << reason << "; status:" << status());
337
338 // we reset raw().body_pipe when we are done, so use this one for checking
339 const BodyPipePointer &permPipe = theVirginRep.raw().header->body_pipe;
340 if (permPipe != NULL) {
341 // if libecap consumed, we cannot preserve
342 Must(!permPipe->consumedSize());
3af10ac0
AR
343 }
344
e1e90d26
AR
345 forgetVb(reason);
346}
347
348// disassociates us from vb; the last step of sinking or preserving vb
349void
350Adaptation::Ecap::XactionRep::forgetVb(const char *reason)
351{
352 debugs(93,9, HERE << "forget vb " << reason << "; status:" << status());
ea76d91e 353
e1e90d26
AR
354 BodyPipePointer &p = theVirginRep.raw().body_pipe;
355 if (p != NULL && p->stillConsuming(this))
356 stopConsumingFrom(p);
357
358 if (makingVb == opUndecided)
359 makingVb = opNever;
360 else if (makingVb == opOn)
361 makingVb = opComplete;
fdc96a39
AR
362}
363
26ac0430 364void
574b508c 365Adaptation::Ecap::XactionRep::useVirgin()
fdc96a39 366{
4d0854d4 367 debugs(93,3, HERE << status());
ea76d91e
AR
368 Must(proxyingAb == opUndecided);
369 proxyingAb = opNever;
4d0854d4 370
e1e90d26 371 preserveVb("useVirgin");
2874d9e3
AR
372
373 HttpMsg *clone = theVirginRep.raw().header->clone();
374 // check that clone() copies the pipe so that we do not have to
e1e90d26 375 Must(!theVirginRep.raw().header->body_pipe == !clone->body_pipe);
ea76d91e 376
aaf0559d 377 updateHistory(clone);
3af10ac0 378 sendAnswer(Answer::Forward(clone));
4d0854d4 379 Must(done());
fdc96a39
AR
380}
381
26ac0430 382void
574b508c 383Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
fdc96a39 384{
4d0854d4 385 debugs(93,3, HERE << status());
ea76d91e 386 Must(m);
4d0854d4 387 theAnswerRep = m;
ea76d91e
AR
388 Must(proxyingAb == opUndecided);
389
f1a768b2 390 HttpMsg *msg = answer().header;
ea76d91e
AR
391 if (!theAnswerRep->body()) { // final, bodyless answer
392 proxyingAb = opNever;
aaf0559d 393 updateHistory(msg);
3af10ac0 394 sendAnswer(Answer::Forward(msg));
f1a768b2 395 } else { // got answer headers but need to handle body
ea76d91e 396 proxyingAb = opOn;
f1a768b2 397 Must(!msg->body_pipe); // only host can set body pipes
ea76d91e 398 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
f1a768b2
AR
399 Must(rep);
400 rep->tieBody(this); // sets us as a producer
401 Must(msg->body_pipe != NULL); // check tieBody
ea76d91e 402
aaf0559d 403 updateHistory(msg);
3af10ac0 404 sendAnswer(Answer::Forward(msg));
ea76d91e 405
4d0854d4 406 debugs(93,4, HERE << "adapter will produce body" << status());
8679e6c2 407 theMaster->abMake(); // libecap will produce
4d0854d4 408 }
fdc96a39
AR
409}
410
3af10ac0
AR
411void
412Adaptation::Ecap::XactionRep::blockVirgin()
413{
414 debugs(93,3, HERE << status());
415 Must(proxyingAb == opUndecided);
416 proxyingAb = opNever;
417
e1e90d26 418 sinkVb("blockVirgin");
3af10ac0 419
aaf0559d 420 updateHistory(NULL);
3af10ac0
AR
421 sendAnswer(Answer::Block(service().cfg().key));
422 Must(done());
423}
424
5038f9d8
AR
425/// Called just before sendAnswer() to record adapter meta-information
426/// which may affect answer processing and may be needed for logging.
427void
aaf0559d 428Adaptation::Ecap::XactionRep::updateHistory(HttpMsg *adapted)
5038f9d8
AR
429{
430 if (!theMaster) // all updates rely on being able to query the adapter
431 return;
432
433 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
434 theCauseRep->raw().header : theVirginRep.raw().header);
435 Must(request);
436
437 // TODO: move common ICAP/eCAP logic to Adaptation::Xaction or similar
438 // TODO: optimize Area-to-String conversion
439
440 // update the cross-transactional database if needed
441 if (const char *xxNameStr = Adaptation::Config::masterx_shared_name) {
442 Adaptation::History::Pointer ah = request->adaptHistory(true);
443 if (ah != NULL) {
444 libecap::Name xxName(xxNameStr); // TODO: optimize?
445 if (const libecap::Area val = theMaster->option(xxName))
446 ah->updateXxRecord(xxNameStr, val.toString().c_str());
447 }
448 }
449
450 // update the adaptation plan if needed
451 if (service().cfg().routing) {
452 String services;
453 if (const libecap::Area services = theMaster->option(libecap::metaNextServices)) {
454 Adaptation::History::Pointer ah = request->adaptHistory(true);
455 if (ah != NULL)
456 ah->updateNextServices(services.toString().c_str());
457 }
458 } // TODO: else warn (occasionally!) if we got libecap::metaNextServices
459
460 // Store received meta headers for adapt::<last_h logformat code use.
461 // If we already have stored headers from a previous adaptation transaction
462 // related to the same master transction, they will be replaced.
463 Adaptation::History::Pointer ah = request->adaptLogHistory();
464 if (ah != NULL) {
465 HttpHeader meta(hoReply);
466 OptionsExtractor extractor(meta);
467 theMaster->visitEachOption(extractor);
468 ah->recordMeta(&meta);
469 }
aaf0559d
AR
470
471 // Add just-created history to the adapted/cloned request that lacks it.
472 if (HttpRequest *adaptedReq = dynamic_cast<HttpRequest*>(adapted))
473 adaptedReq->adaptHistoryImport(*request);
5038f9d8
AR
474}
475
4d0854d4 476void
574b508c 477Adaptation::Ecap::XactionRep::vbDiscard()
fdc96a39 478{
e1e90d26 479 Must(makingVb == opUndecided);
8679e6c2 480 // if adapter does not need vb, we do not need to send it
e1e90d26
AR
481 sinkVb("vbDiscard");
482 Must(makingVb == opNever);
fdc96a39
AR
483}
484
4d0854d4 485void
574b508c 486Adaptation::Ecap::XactionRep::vbMake()
fdc96a39 487{
e1e90d26 488 Must(makingVb == opUndecided);
ea76d91e
AR
489 BodyPipePointer &p = theVirginRep.raw().body_pipe;
490 Must(p != NULL);
e1e90d26
AR
491 Must(p->setConsumerIfNotLate(this)); // to deliver vb, we must receive vb
492 makingVb = opOn;
fdc96a39
AR
493}
494
4d0854d4 495void
574b508c 496Adaptation::Ecap::XactionRep::vbStopMaking()
fdc96a39 497{
e1e90d26 498 Must(makingVb == opOn);
ea76d91e 499 // if adapter does not need vb, we do not need to receive it
e1e90d26
AR
500 sinkVb("vbStopMaking");
501 Must(makingVb == opComplete);
4d0854d4
AR
502}
503
504void
574b508c 505Adaptation::Ecap::XactionRep::vbMakeMore()
4d0854d4 506{
e1e90d26 507 Must(makingVb == opOn); // cannot make more if done proxying
ea76d91e 508 // we cannot guarantee more vb, but we can check that there is a chance
e1e90d26
AR
509 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
510 Must(p != NULL && p->stillConsuming(this)); // we are plugged in
511 Must(!p->productionEnded() && p->mayNeedMoreData()); // and may get more
4d0854d4
AR
512}
513
8679e6c2 514libecap::Area
574b508c 515Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
4d0854d4 516{
e1e90d26 517 // We may not be makingVb yet. It should be OK, but see vbContentShift().
ea76d91e 518
8679e6c2 519 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
ea76d91e
AR
520 Must(p != NULL);
521
522 // TODO: make MemBuf use size_t?
523 const size_t haveSize = static_cast<size_t>(p->buf().contentSize());
4d0854d4 524
8679e6c2
AR
525 // convert to Squid types; XXX: check for overflow
526 const uint64_t offset = static_cast<uint64_t>(o);
527 Must(offset <= haveSize); // equal iff at the end of content
528
529 // nsize means no size limit: all content starting from offset
530 const size_t size = s == libecap::nsize ?
26ac0430 531 haveSize - offset : static_cast<size_t>(s);
8679e6c2 532
8679e6c2
AR
533 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
534 return libecap::Area::FromTempBuffer(p->buf().content() + offset,
26ac0430 535 min(static_cast<size_t>(haveSize - offset), size));
4d0854d4
AR
536}
537
538void
574b508c 539Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n)
4d0854d4 540{
e1e90d26 541 // We may not be makingVb yet. It should be OK now, but if BodyPipe
ea76d91e
AR
542 // consume() requirements change, we would have to return empty vbContent
543 // until the adapter registers as a consumer
544
8679e6c2
AR
545 BodyPipePointer &p = theVirginRep.raw().body_pipe;
546 Must(p != NULL);
547 const size_t size = static_cast<size_t>(n); // XXX: check for overflow
548 const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
549 p->consume(min(size, haveSize));
4d0854d4
AR
550}
551
552void
574b508c 553Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd)
4d0854d4 554{
7477a343
AR
555 Must(proxyingAb == opOn && !abProductionFinished);
556 abProductionFinished = true;
557 abProductionAtEnd = atEnd; // store until ready to stop producing ourselves
558 debugs(93,5, HERE << "adapted body production ended");
559 moveAbContent();
4d0854d4
AR
560}
561
8679e6c2 562void
574b508c 563Adaptation::Ecap::XactionRep::noteAbContentAvailable()
4d0854d4 564{
7477a343 565 Must(proxyingAb == opOn && !abProductionFinished);
8679e6c2 566 moveAbContent();
4d0854d4
AR
567}
568
ea76d91e 569#if 0 /* XXX: implement */
4d0854d4 570void
574b508c 571Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
4d0854d4
AR
572{
573 Must(answer().body_pipe != NULL);
8679e6c2
AR
574 if (size.known())
575 answer().body_pipe->setBodySize(size.value());
576 // else the piped body size is unknown by default
4d0854d4 577}
8679e6c2 578#endif
4d0854d4
AR
579
580void
574b508c 581Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
4d0854d4
AR
582{
583 debugs(93,3, HERE << "adapter needs time: " <<
26ac0430 584 d.state << '/' << d.progress);
4d0854d4 585 // XXX: set timeout?
fdc96a39
AR
586}
587
26ac0430 588void
574b508c 589Adaptation::Ecap::XactionRep::adaptationAborted()
fdc96a39 590{
fdc96a39 591 tellQueryAborted(true); // should eCAP support retries?
ea76d91e 592 mustStop("adaptationAborted");
fdc96a39
AR
593}
594
8679e6c2 595bool
574b508c 596Adaptation::Ecap::XactionRep::callable() const
8679e6c2
AR
597{
598 return !done();
599}
600
26ac0430 601void
574b508c 602Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
fdc96a39 603{
ea76d91e
AR
604 Must(proxyingAb == opOn);
605 moveAbContent();
fdc96a39
AR
606}
607
26ac0430 608void
574b508c 609Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
fdc96a39 610{
ea76d91e
AR
611 Must(proxyingAb == opOn);
612 stopProducingFor(answer().body_pipe, false);
613 Must(theMaster);
614 theMaster->abStopMaking();
615 proxyingAb = opComplete;
fdc96a39
AR
616}
617
618void
574b508c 619Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
fdc96a39 620{
e1e90d26 621 Must(makingVb == opOn); // or we would not be registered as a consumer
fdc96a39 622 Must(theMaster);
8679e6c2 623 theMaster->noteVbContentAvailable();
fdc96a39
AR
624}
625
626void
574b508c 627Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
fdc96a39 628{
e1e90d26 629 Must(makingVb == opOn); // or we would not be registered as a consumer
fdc96a39 630 Must(theMaster);
8679e6c2 631 theMaster->noteVbContentDone(true);
e1e90d26 632 vbProductionFinished = true;
fdc96a39
AR
633}
634
635void
574b508c 636Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
fdc96a39 637{
e1e90d26 638 Must(makingVb == opOn); // or we would not be registered as a consumer
8679e6c2
AR
639 Must(theMaster);
640 theMaster->noteVbContentDone(false);
e1e90d26 641 vbProductionFinished = true;
fdc96a39
AR
642}
643
644void
574b508c 645Adaptation::Ecap::XactionRep::noteInitiatorAborted()
fdc96a39
AR
646{
647 mustStop("initiator aborted");
648}
649
8679e6c2
AR
650// get content from the adapter and put it into the adapted pipe
651void
574b508c 652Adaptation::Ecap::XactionRep::moveAbContent()
8679e6c2 653{
ea76d91e 654 Must(proxyingAb == opOn);
8679e6c2 655 const libecap::Area c = theMaster->abContent(0, libecap::nsize);
7477a343
AR
656 debugs(93,5, HERE << "up to " << c.size << " bytes");
657 if (c.size == 0 && abProductionFinished) { // no ab now and in the future
658 stopProducingFor(answer().body_pipe, abProductionAtEnd);
659 proxyingAb = opComplete;
660 debugs(93,5, HERE << "last adapted body data retrieved");
e1381638 661 } else if (c.size > 0) {
7477a343
AR
662 if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
663 theMaster->abContentShift(used);
664 }
8679e6c2
AR
665}
666
667const char *
574b508c 668Adaptation::Ecap::XactionRep::status() const
fdc96a39 669{
4d0854d4
AR
670 static MemBuf buf;
671 buf.reset();
672
673 buf.append(" [", 2);
674
e1e90d26
AR
675 if (makingVb)
676 buf.Printf("M%d", static_cast<int>(makingVb));
677
678 const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
679 if (!vp)
680 buf.append(" !V", 3);
ec4d1a1d 681 else if (vp->stillConsuming(const_cast<XactionRep*>(this)))
e1e90d26
AR
682 buf.append(" Vc", 3);
683 else
684 buf.append(" V?", 3);
685
686 if (vbProductionFinished)
687 buf.append(".", 1);
688
e1e90d26 689 buf.Printf(" A%d", static_cast<int>(proxyingAb));
506a0530 690
ea76d91e
AR
691 if (proxyingAb == opOn) {
692 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
693 Must(rep);
f1a768b2 694 const BodyPipePointer &ap = rep->raw().body_pipe;
e1e90d26
AR
695 if (!ap)
696 buf.append(" !A", 3);
697 else if (ap->stillProducing(const_cast<XactionRep*>(this)))
698 buf.append(" Ap", 3);
699 else
700 buf.append(" A?", 3);
f1a768b2 701 }
4d0854d4 702
52ed047a 703 buf.Printf(" %s%u]", id.Prefix, id.value);
4d0854d4
AR
704
705 buf.terminate();
706
707 return buf.content();
fdc96a39 708}