]> git.ipfire.org Git - thirdparty/squid.git/blame - src/adaptation/ecap/XactionRep.cc
SourceFormat: enforcement
[thirdparty/squid.git] / src / adaptation / ecap / XactionRep.cc
CommitLineData
fdc96a39 1#include "squid.h"
4d0854d4
AR
2#include <libecap/common/area.h>
3#include <libecap/common/delay.h>
fdc96a39
AR
4#include <libecap/adapter/xaction.h>
5#include "TextException.h"
fdc96a39
AR
6#include "HttpRequest.h"
7#include "HttpReply.h"
3ff65596 8#include "SquidTime.h"
1f3c65fc 9#include "adaptation/ecap/XactionRep.h"
fdc96a39 10
574b508c 11CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep, XactionRep);
fdc96a39
AR
12
13
574b508c 14Adaptation::Ecap::XactionRep::XactionRep(Adaptation::Initiator *anInitiator,
af6a12ee
AJ
15 HttpMsg *virginHeader, HttpRequest *virginCause,
16 const Adaptation::ServicePointer &aService):
574b508c 17 AsyncJob("Adaptation::Ecap::XactionRep"),
a22e6cd3
AR
18 Adaptation::Initiate("Adaptation::Ecap::XactionRep", anInitiator),
19 theService(aService),
26ac0430 20 theVirginRep(virginHeader), theCauseRep(NULL),
3ff65596
AR
21 proxyingVb(opUndecided), proxyingAb(opUndecided),
22 adaptHistoryId(-1),
e1381638 23 canAccessVb(false),
7477a343 24 abProductionFinished(false), abProductionAtEnd(false)
fdc96a39 25{
027320b4 26 if (virginCause)
4d0854d4 27 theCauseRep = new MessageRep(virginCause);
fdc96a39
AR
28}
29
574b508c 30Adaptation::Ecap::XactionRep::~XactionRep()
fdc96a39
AR
31{
32 assert(!theMaster);
027320b4 33 delete theCauseRep;
4d0854d4 34 theAnswerRep.reset();
fdc96a39
AR
35}
36
37void
574b508c 38Adaptation::Ecap::XactionRep::master(const AdapterXaction &x)
fdc96a39
AR
39{
40 Must(!theMaster);
41 Must(x != NULL);
42 theMaster = x;
43}
44
a22e6cd3
AR
45Adaptation::Service &
46Adaptation::Ecap::XactionRep::service()
47{
48 Must(theService != NULL);
49 return *theService;
50}
51
fdc96a39 52void
574b508c 53Adaptation::Ecap::XactionRep::start()
fdc96a39
AR
54{
55 Must(theMaster);
4d0854d4 56
ea76d91e
AR
57 if (theVirginRep.raw().body_pipe != NULL)
58 canAccessVb = true; /// assumes nobody is consuming; \todo check
59 else
60 proxyingVb = opNever;
4d0854d4 61
3ff65596 62 const HttpRequest *request = dynamic_cast<const HttpRequest*> (theCauseRep ?
e1381638 63 theCauseRep->raw().header : theVirginRep.raw().header);
3ff65596 64 Must(request);
a22e6cd3 65 Adaptation::History::Pointer ah = request->adaptLogHistory();
e1381638 66 if (ah != NULL) {
3ff65596
AR
67 // retrying=false because ecap never retries transactions
68 adaptHistoryId = ah->recordXactStart(service().cfg().key, current_time, false);
69 }
70
fdc96a39
AR
71 theMaster->start();
72}
73
74void
574b508c 75Adaptation::Ecap::XactionRep::swanSong()
fdc96a39 76{
506a0530 77 // clear body_pipes, if any
ea76d91e 78 // this code does not maintain proxying* and canAccessVb states; should it?
506a0530
AR
79
80 if (theAnswerRep != NULL) {
f1a768b2
AR
81 BodyPipe::Pointer body_pipe = answer().body_pipe;
82 if (body_pipe != NULL) {
83 Must(body_pipe->stillProducing(this));
84 stopProducingFor(body_pipe, false);
85 }
86 }
506a0530
AR
87
88 {
f1a768b2
AR
89 BodyPipe::Pointer body_pipe = theVirginRep.raw().body_pipe;
90 if (body_pipe != NULL) {
91 Must(body_pipe->stillConsuming(this));
92 stopConsumingFrom(body_pipe);
93 }
94 }
506a0530 95
fdc96a39 96 terminateMaster();
3ff65596
AR
97
98 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
e1381638 99 theCauseRep->raw().header : theVirginRep.raw().header);
3ff65596 100 Must(request);
a22e6cd3 101 Adaptation::History::Pointer ah = request->adaptLogHistory();
3ff65596
AR
102 if (ah != NULL && adaptHistoryId >= 0)
103 ah->recordXactFinish(adaptHistoryId);
104
fdc96a39
AR
105 Adaptation::Initiate::swanSong();
106}
107
fdc96a39 108libecap::Message &
574b508c 109Adaptation::Ecap::XactionRep::virgin()
fdc96a39 110{
7b67e5b6 111 return theVirginRep;
fdc96a39
AR
112}
113
4d0854d4 114const libecap::Message &
574b508c 115Adaptation::Ecap::XactionRep::cause()
fdc96a39 116{
4d0854d4
AR
117 Must(theCauseRep != NULL);
118 return *theCauseRep;
fdc96a39
AR
119}
120
4d0854d4 121libecap::Message &
574b508c 122Adaptation::Ecap::XactionRep::adapted()
fdc96a39 123{
4d0854d4
AR
124 Must(theAnswerRep != NULL);
125 return *theAnswerRep;
126}
127
128Adaptation::Message &
574b508c 129Adaptation::Ecap::XactionRep::answer()
4d0854d4 130{
f1a768b2
AR
131 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
132 Must(rep);
4d0854d4
AR
133 return rep->raw();
134}
135
ea76d91e 136void
574b508c 137Adaptation::Ecap::XactionRep::terminateMaster()
4d0854d4
AR
138{
139 if (theMaster) {
ea76d91e
AR
140 AdapterXaction x = theMaster;
141 theMaster.reset();
142 x->stop();
f1a768b2 143 }
4d0854d4
AR
144}
145
4d0854d4 146bool
574b508c 147Adaptation::Ecap::XactionRep::doneAll() const
4d0854d4 148{
ea76d91e 149 return proxyingVb >= opComplete && proxyingAb >= opComplete &&
26ac0430 150 Adaptation::Initiate::doneAll();
4d0854d4
AR
151}
152
ea76d91e 153// stops receiving virgin and enables auto-consumption
4d0854d4 154void
574b508c 155Adaptation::Ecap::XactionRep::dropVirgin(const char *reason)
4d0854d4 156{
ea76d91e
AR
157 debugs(93,4, HERE << "because " << reason << "; status:" << status());
158 Must(proxyingVb = opOn);
4d0854d4
AR
159
160 BodyPipePointer &p = theVirginRep.raw().body_pipe;
161 Must(p != NULL);
162 Must(p->stillConsuming(this));
163 stopConsumingFrom(p);
164 p->enableAutoConsumption();
ea76d91e
AR
165 proxyingVb = opComplete;
166 canAccessVb = false;
167
168 // called from adapter handler so does not inform adapter
fdc96a39
AR
169}
170
26ac0430 171void
574b508c 172Adaptation::Ecap::XactionRep::useVirgin()
fdc96a39 173{
4d0854d4 174 debugs(93,3, HERE << status());
ea76d91e
AR
175 Must(proxyingAb == opUndecided);
176 proxyingAb = opNever;
4d0854d4 177
ea76d91e 178 BodyPipePointer &vbody_pipe = theVirginRep.raw().body_pipe;
2874d9e3
AR
179
180 HttpMsg *clone = theVirginRep.raw().header->clone();
181 // check that clone() copies the pipe so that we do not have to
182 Must(!vbody_pipe == !clone->body_pipe);
183
ea76d91e
AR
184 if (proxyingVb == opOn) {
185 Must(vbody_pipe->stillConsuming(this));
4d0854d4 186 // if libecap consumed, we cannot shortcircuit
ea76d91e
AR
187 Must(!vbody_pipe->consumedSize());
188 stopConsumingFrom(vbody_pipe);
189 canAccessVb = false;
190 proxyingVb = opComplete;
e1381638
AJ
191 } else if (proxyingVb == opUndecided) {
192 vbody_pipe = NULL; // it is not our pipe anymore
193 proxyingVb = opNever;
194 }
ea76d91e 195
ea76d91e 196 sendAnswer(clone);
4d0854d4 197 Must(done());
fdc96a39
AR
198}
199
26ac0430 200void
574b508c 201Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
fdc96a39 202{
4d0854d4 203 debugs(93,3, HERE << status());
ea76d91e 204 Must(m);
4d0854d4 205 theAnswerRep = m;
ea76d91e
AR
206 Must(proxyingAb == opUndecided);
207
f1a768b2 208 HttpMsg *msg = answer().header;
ea76d91e
AR
209 if (!theAnswerRep->body()) { // final, bodyless answer
210 proxyingAb = opNever;
211 sendAnswer(msg);
f1a768b2 212 } else { // got answer headers but need to handle body
ea76d91e 213 proxyingAb = opOn;
f1a768b2 214 Must(!msg->body_pipe); // only host can set body pipes
ea76d91e 215 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
f1a768b2
AR
216 Must(rep);
217 rep->tieBody(this); // sets us as a producer
218 Must(msg->body_pipe != NULL); // check tieBody
ea76d91e
AR
219
220 sendAnswer(msg);
221
4d0854d4 222 debugs(93,4, HERE << "adapter will produce body" << status());
8679e6c2 223 theMaster->abMake(); // libecap will produce
4d0854d4 224 }
fdc96a39
AR
225}
226
4d0854d4 227void
574b508c 228Adaptation::Ecap::XactionRep::vbDiscard()
fdc96a39 229{
ea76d91e 230 Must(proxyingVb == opUndecided);
8679e6c2 231 // if adapter does not need vb, we do not need to send it
a96661a8 232 dropVirgin("vbDiscard");
ea76d91e 233 Must(proxyingVb == opNever);
fdc96a39
AR
234}
235
4d0854d4 236void
574b508c 237Adaptation::Ecap::XactionRep::vbMake()
fdc96a39 238{
ea76d91e
AR
239 Must(proxyingVb == opUndecided);
240 BodyPipePointer &p = theVirginRep.raw().body_pipe;
241 Must(p != NULL);
242 Must(p->setConsumerIfNotLate(this)); // to make vb, we must receive vb
243 proxyingVb = opOn;
fdc96a39
AR
244}
245
4d0854d4 246void
574b508c 247Adaptation::Ecap::XactionRep::vbStopMaking()
fdc96a39 248{
ea76d91e
AR
249 // if adapter does not need vb, we do not need to receive it
250 if (proxyingVb == opOn)
251 dropVirgin("vbStopMaking");
252 Must(proxyingVb == opComplete);
4d0854d4
AR
253}
254
255void
574b508c 256Adaptation::Ecap::XactionRep::vbMakeMore()
4d0854d4 257{
ea76d91e
AR
258 Must(proxyingVb == opOn); // cannot make more if done proxying
259 // we cannot guarantee more vb, but we can check that there is a chance
260 Must(!theVirginRep.raw().body_pipe->exhausted());
4d0854d4
AR
261}
262
8679e6c2 263libecap::Area
574b508c 264Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
4d0854d4 265{
ea76d91e
AR
266 Must(canAccessVb);
267 // We may not be proxyingVb yet. It should be OK, but see vbContentShift().
268
8679e6c2 269 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
ea76d91e
AR
270 Must(p != NULL);
271
272 // TODO: make MemBuf use size_t?
273 const size_t haveSize = static_cast<size_t>(p->buf().contentSize());
4d0854d4 274
8679e6c2
AR
275 // convert to Squid types; XXX: check for overflow
276 const uint64_t offset = static_cast<uint64_t>(o);
277 Must(offset <= haveSize); // equal iff at the end of content
278
279 // nsize means no size limit: all content starting from offset
280 const size_t size = s == libecap::nsize ?
26ac0430 281 haveSize - offset : static_cast<size_t>(s);
8679e6c2 282
8679e6c2
AR
283 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
284 return libecap::Area::FromTempBuffer(p->buf().content() + offset,
26ac0430 285 min(static_cast<size_t>(haveSize - offset), size));
4d0854d4
AR
286}
287
288void
574b508c 289Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n)
4d0854d4 290{
ea76d91e
AR
291 Must(canAccessVb);
292 // We may not be proxyingVb yet. It should be OK now, but if BodyPipe
293 // consume() requirements change, we would have to return empty vbContent
294 // until the adapter registers as a consumer
295
8679e6c2
AR
296 BodyPipePointer &p = theVirginRep.raw().body_pipe;
297 Must(p != NULL);
298 const size_t size = static_cast<size_t>(n); // XXX: check for overflow
299 const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
300 p->consume(min(size, haveSize));
4d0854d4
AR
301}
302
303void
574b508c 304Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd)
4d0854d4 305{
7477a343
AR
306 Must(proxyingAb == opOn && !abProductionFinished);
307 abProductionFinished = true;
308 abProductionAtEnd = atEnd; // store until ready to stop producing ourselves
309 debugs(93,5, HERE << "adapted body production ended");
310 moveAbContent();
4d0854d4
AR
311}
312
8679e6c2 313void
574b508c 314Adaptation::Ecap::XactionRep::noteAbContentAvailable()
4d0854d4 315{
7477a343 316 Must(proxyingAb == opOn && !abProductionFinished);
8679e6c2 317 moveAbContent();
4d0854d4
AR
318}
319
ea76d91e 320#if 0 /* XXX: implement */
4d0854d4 321void
574b508c 322Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
4d0854d4
AR
323{
324 Must(answer().body_pipe != NULL);
8679e6c2
AR
325 if (size.known())
326 answer().body_pipe->setBodySize(size.value());
327 // else the piped body size is unknown by default
4d0854d4 328}
8679e6c2 329#endif
4d0854d4
AR
330
331void
574b508c 332Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
4d0854d4
AR
333{
334 debugs(93,3, HERE << "adapter needs time: " <<
26ac0430 335 d.state << '/' << d.progress);
4d0854d4 336 // XXX: set timeout?
fdc96a39
AR
337}
338
26ac0430 339void
574b508c 340Adaptation::Ecap::XactionRep::adaptationAborted()
fdc96a39 341{
fdc96a39 342 tellQueryAborted(true); // should eCAP support retries?
ea76d91e 343 mustStop("adaptationAborted");
fdc96a39
AR
344}
345
8679e6c2 346bool
574b508c 347Adaptation::Ecap::XactionRep::callable() const
8679e6c2
AR
348{
349 return !done();
350}
351
26ac0430 352void
574b508c 353Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
fdc96a39 354{
ea76d91e
AR
355 Must(proxyingAb == opOn);
356 moveAbContent();
fdc96a39
AR
357}
358
26ac0430 359void
574b508c 360Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
fdc96a39 361{
ea76d91e
AR
362 Must(proxyingAb == opOn);
363 stopProducingFor(answer().body_pipe, false);
364 Must(theMaster);
365 theMaster->abStopMaking();
366 proxyingAb = opComplete;
fdc96a39
AR
367}
368
369void
574b508c 370Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
fdc96a39 371{
ea76d91e 372 Must(proxyingVb == opOn);
fdc96a39 373 Must(theMaster);
8679e6c2 374 theMaster->noteVbContentAvailable();
fdc96a39
AR
375}
376
377void
574b508c 378Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
fdc96a39 379{
ea76d91e 380 Must(proxyingVb == opOn);
fdc96a39 381 Must(theMaster);
8679e6c2 382 theMaster->noteVbContentDone(true);
ea76d91e 383 proxyingVb = opComplete;
fdc96a39
AR
384}
385
386void
574b508c 387Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
fdc96a39 388{
ea76d91e 389 Must(proxyingVb == opOn);
8679e6c2
AR
390 Must(theMaster);
391 theMaster->noteVbContentDone(false);
ea76d91e 392 proxyingVb = opComplete;
fdc96a39
AR
393}
394
395void
574b508c 396Adaptation::Ecap::XactionRep::noteInitiatorAborted()
fdc96a39
AR
397{
398 mustStop("initiator aborted");
399}
400
8679e6c2
AR
401// get content from the adapter and put it into the adapted pipe
402void
574b508c 403Adaptation::Ecap::XactionRep::moveAbContent()
8679e6c2 404{
ea76d91e 405 Must(proxyingAb == opOn);
8679e6c2 406 const libecap::Area c = theMaster->abContent(0, libecap::nsize);
7477a343
AR
407 debugs(93,5, HERE << "up to " << c.size << " bytes");
408 if (c.size == 0 && abProductionFinished) { // no ab now and in the future
409 stopProducingFor(answer().body_pipe, abProductionAtEnd);
410 proxyingAb = opComplete;
411 debugs(93,5, HERE << "last adapted body data retrieved");
e1381638 412 } else if (c.size > 0) {
7477a343
AR
413 if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
414 theMaster->abContentShift(used);
415 }
8679e6c2
AR
416}
417
418const char *
574b508c 419Adaptation::Ecap::XactionRep::status() const
fdc96a39 420{
4d0854d4
AR
421 static MemBuf buf;
422 buf.reset();
423
424 buf.append(" [", 2);
425
ea76d91e
AR
426 if (proxyingVb == opOn) {
427 const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
428 if (!canAccessVb)
429 buf.append("x", 1);
430 if (vp != NULL && vp->stillConsuming(this)) {
f1a768b2 431 buf.append("Vb", 2);
ea76d91e 432 buf.append(vp->status(), strlen(vp->status())); // XXX
f1a768b2 433 } else
ea76d91e 434 buf.append("V.", 2);
f1a768b2 435 }
506a0530 436
ea76d91e
AR
437 if (proxyingAb == opOn) {
438 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
439 Must(rep);
f1a768b2
AR
440 const BodyPipePointer &ap = rep->raw().body_pipe;
441 if (ap != NULL && ap->stillProducing(this)) {
442 buf.append(" Ab", 3);
443 buf.append(ap->status(), strlen(ap->status())); // XXX
444 } else
445 buf.append(" A.", 3);
446 }
4d0854d4 447
4d0854d4
AR
448 buf.Printf(" ecapx%d]", id);
449
450 buf.terminate();
451
452 return buf.content();
fdc96a39 453}