]> git.ipfire.org Git - thirdparty/squid.git/blob - src/adaptation/ecap/XactionRep.cc
219c2889f2232b79b6e3dff571763d96fba8be90
[thirdparty/squid.git] / src / adaptation / ecap / XactionRep.cc
1 #include "squid.h"
2 #include <libecap/common/area.h>
3 #include <libecap/common/delay.h>
4 #include <libecap/adapter/xaction.h>
5 #include "HttpRequest.h"
6 #include "HttpReply.h"
7 #include "SquidTime.h"
8 #include "adaptation/ecap/XactionRep.h"
9 #include "base/TextException.h"
10
11 CBDATA_NAMESPACED_CLASS_INIT(Adaptation::Ecap::XactionRep, XactionRep);
12
13
14 Adaptation::Ecap::XactionRep::XactionRep(Adaptation::Initiator *anInitiator,
15 HttpMsg *virginHeader, HttpRequest *virginCause,
16 const Adaptation::ServicePointer &aService):
17 AsyncJob("Adaptation::Ecap::XactionRep"),
18 Adaptation::Initiate("Adaptation::Ecap::XactionRep", anInitiator),
19 theService(aService),
20 theVirginRep(virginHeader), theCauseRep(NULL),
21 proxyingVb(opUndecided), proxyingAb(opUndecided),
22 adaptHistoryId(-1),
23 canAccessVb(false),
24 abProductionFinished(false), abProductionAtEnd(false)
25 {
26 if (virginCause)
27 theCauseRep = new MessageRep(virginCause);
28 }
29
30 Adaptation::Ecap::XactionRep::~XactionRep()
31 {
32 assert(!theMaster);
33 delete theCauseRep;
34 theAnswerRep.reset();
35 }
36
37 void
38 Adaptation::Ecap::XactionRep::master(const AdapterXaction &x)
39 {
40 Must(!theMaster);
41 Must(x != NULL);
42 theMaster = x;
43 }
44
45 Adaptation::Service &
46 Adaptation::Ecap::XactionRep::service()
47 {
48 Must(theService != NULL);
49 return *theService;
50 }
51
52 void
53 Adaptation::Ecap::XactionRep::start()
54 {
55 Must(theMaster);
56
57 if (theVirginRep.raw().body_pipe != NULL)
58 canAccessVb = true; /// assumes nobody is consuming; \todo check
59 else
60 proxyingVb = opNever;
61
62 const HttpRequest *request = dynamic_cast<const HttpRequest*> (theCauseRep ?
63 theCauseRep->raw().header : theVirginRep.raw().header);
64 Must(request);
65 Adaptation::History::Pointer ah = request->adaptLogHistory();
66 if (ah != NULL) {
67 // retrying=false because ecap never retries transactions
68 adaptHistoryId = ah->recordXactStart(service().cfg().key, current_time, false);
69 }
70
71 theMaster->start();
72 }
73
74 void
75 Adaptation::Ecap::XactionRep::swanSong()
76 {
77 // clear body_pipes, if any
78 // this code does not maintain proxying* and canAccessVb states; should it?
79
80 if (theAnswerRep != NULL) {
81 BodyPipe::Pointer body_pipe = answer().body_pipe;
82 if (body_pipe != NULL) {
83 Must(body_pipe->stillProducing(this));
84 stopProducingFor(body_pipe, false);
85 }
86 }
87
88 {
89 BodyPipe::Pointer body_pipe = theVirginRep.raw().body_pipe;
90 if (body_pipe != NULL) {
91 Must(body_pipe->stillConsuming(this));
92 stopConsumingFrom(body_pipe);
93 }
94 }
95
96 terminateMaster();
97
98 const HttpRequest *request = dynamic_cast<const HttpRequest*>(theCauseRep ?
99 theCauseRep->raw().header : theVirginRep.raw().header);
100 Must(request);
101 Adaptation::History::Pointer ah = request->adaptLogHistory();
102 if (ah != NULL && adaptHistoryId >= 0)
103 ah->recordXactFinish(adaptHistoryId);
104
105 Adaptation::Initiate::swanSong();
106 }
107
108 libecap::Message &
109 Adaptation::Ecap::XactionRep::virgin()
110 {
111 return theVirginRep;
112 }
113
114 const libecap::Message &
115 Adaptation::Ecap::XactionRep::cause()
116 {
117 Must(theCauseRep != NULL);
118 return *theCauseRep;
119 }
120
121 libecap::Message &
122 Adaptation::Ecap::XactionRep::adapted()
123 {
124 Must(theAnswerRep != NULL);
125 return *theAnswerRep;
126 }
127
128 Adaptation::Message &
129 Adaptation::Ecap::XactionRep::answer()
130 {
131 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
132 Must(rep);
133 return rep->raw();
134 }
135
136 void
137 Adaptation::Ecap::XactionRep::terminateMaster()
138 {
139 if (theMaster) {
140 AdapterXaction x = theMaster;
141 theMaster.reset();
142 x->stop();
143 }
144 }
145
146 bool
147 Adaptation::Ecap::XactionRep::doneAll() const
148 {
149 return proxyingVb >= opComplete && proxyingAb >= opComplete &&
150 Adaptation::Initiate::doneAll();
151 }
152
153 // stops receiving virgin and enables auto-consumption
154 void
155 Adaptation::Ecap::XactionRep::dropVirgin(const char *reason)
156 {
157 debugs(93,4, HERE << "because " << reason << "; status:" << status());
158 Must(proxyingVb = opOn);
159
160 BodyPipePointer &p = theVirginRep.raw().body_pipe;
161 Must(p != NULL);
162 Must(p->stillConsuming(this));
163 stopConsumingFrom(p);
164 p->enableAutoConsumption();
165 proxyingVb = opComplete;
166 canAccessVb = false;
167
168 // called from adapter handler so does not inform adapter
169 }
170
171 void
172 Adaptation::Ecap::XactionRep::useVirgin()
173 {
174 debugs(93,3, HERE << status());
175 Must(proxyingAb == opUndecided);
176 proxyingAb = opNever;
177
178 BodyPipePointer &vbody_pipe = theVirginRep.raw().body_pipe;
179
180 HttpMsg *clone = theVirginRep.raw().header->clone();
181 // check that clone() copies the pipe so that we do not have to
182 Must(!vbody_pipe == !clone->body_pipe);
183
184 if (proxyingVb == opOn) {
185 Must(vbody_pipe->stillConsuming(this));
186 // if libecap consumed, we cannot shortcircuit
187 Must(!vbody_pipe->consumedSize());
188 stopConsumingFrom(vbody_pipe);
189 canAccessVb = false;
190 proxyingVb = opComplete;
191 } else if (proxyingVb == opUndecided) {
192 vbody_pipe = NULL; // it is not our pipe anymore
193 proxyingVb = opNever;
194 }
195
196 sendAnswer(clone);
197 Must(done());
198 }
199
200 void
201 Adaptation::Ecap::XactionRep::useAdapted(const libecap::shared_ptr<libecap::Message> &m)
202 {
203 debugs(93,3, HERE << status());
204 Must(m);
205 theAnswerRep = m;
206 Must(proxyingAb == opUndecided);
207
208 HttpMsg *msg = answer().header;
209 if (!theAnswerRep->body()) { // final, bodyless answer
210 proxyingAb = opNever;
211 sendAnswer(msg);
212 } else { // got answer headers but need to handle body
213 proxyingAb = opOn;
214 Must(!msg->body_pipe); // only host can set body pipes
215 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
216 Must(rep);
217 rep->tieBody(this); // sets us as a producer
218 Must(msg->body_pipe != NULL); // check tieBody
219
220 sendAnswer(msg);
221
222 debugs(93,4, HERE << "adapter will produce body" << status());
223 theMaster->abMake(); // libecap will produce
224 }
225 }
226
227 void
228 Adaptation::Ecap::XactionRep::vbDiscard()
229 {
230 Must(proxyingVb == opUndecided);
231 // if adapter does not need vb, we do not need to send it
232 dropVirgin("vbDiscard");
233 Must(proxyingVb == opNever);
234 }
235
236 void
237 Adaptation::Ecap::XactionRep::vbMake()
238 {
239 Must(proxyingVb == opUndecided);
240 BodyPipePointer &p = theVirginRep.raw().body_pipe;
241 Must(p != NULL);
242 Must(p->setConsumerIfNotLate(this)); // to make vb, we must receive vb
243 proxyingVb = opOn;
244 }
245
246 void
247 Adaptation::Ecap::XactionRep::vbStopMaking()
248 {
249 // if adapter does not need vb, we do not need to receive it
250 if (proxyingVb == opOn)
251 dropVirgin("vbStopMaking");
252 Must(proxyingVb == opComplete);
253 }
254
255 void
256 Adaptation::Ecap::XactionRep::vbMakeMore()
257 {
258 Must(proxyingVb == opOn); // cannot make more if done proxying
259 // we cannot guarantee more vb, but we can check that there is a chance
260 Must(!theVirginRep.raw().body_pipe->exhausted());
261 }
262
263 libecap::Area
264 Adaptation::Ecap::XactionRep::vbContent(libecap::size_type o, libecap::size_type s)
265 {
266 Must(canAccessVb);
267 // We may not be proxyingVb yet. It should be OK, but see vbContentShift().
268
269 const BodyPipePointer &p = theVirginRep.raw().body_pipe;
270 Must(p != NULL);
271
272 // TODO: make MemBuf use size_t?
273 const size_t haveSize = static_cast<size_t>(p->buf().contentSize());
274
275 // convert to Squid types; XXX: check for overflow
276 const uint64_t offset = static_cast<uint64_t>(o);
277 Must(offset <= haveSize); // equal iff at the end of content
278
279 // nsize means no size limit: all content starting from offset
280 const size_t size = s == libecap::nsize ?
281 haveSize - offset : static_cast<size_t>(s);
282
283 // XXX: optimize by making theBody a shared_ptr (see Area::FromTemp*() src)
284 return libecap::Area::FromTempBuffer(p->buf().content() + offset,
285 min(static_cast<size_t>(haveSize - offset), size));
286 }
287
288 void
289 Adaptation::Ecap::XactionRep::vbContentShift(libecap::size_type n)
290 {
291 Must(canAccessVb);
292 // We may not be proxyingVb yet. It should be OK now, but if BodyPipe
293 // consume() requirements change, we would have to return empty vbContent
294 // until the adapter registers as a consumer
295
296 BodyPipePointer &p = theVirginRep.raw().body_pipe;
297 Must(p != NULL);
298 const size_t size = static_cast<size_t>(n); // XXX: check for overflow
299 const size_t haveSize = static_cast<size_t>(p->buf().contentSize()); // TODO: make MemBuf use size_t?
300 p->consume(min(size, haveSize));
301 }
302
303 void
304 Adaptation::Ecap::XactionRep::noteAbContentDone(bool atEnd)
305 {
306 Must(proxyingAb == opOn && !abProductionFinished);
307 abProductionFinished = true;
308 abProductionAtEnd = atEnd; // store until ready to stop producing ourselves
309 debugs(93,5, HERE << "adapted body production ended");
310 moveAbContent();
311 }
312
313 void
314 Adaptation::Ecap::XactionRep::noteAbContentAvailable()
315 {
316 Must(proxyingAb == opOn && !abProductionFinished);
317 moveAbContent();
318 }
319
320 #if 0 /* XXX: implement */
321 void
322 Adaptation::Ecap::XactionRep::setAdaptedBodySize(const libecap::BodySize &size)
323 {
324 Must(answer().body_pipe != NULL);
325 if (size.known())
326 answer().body_pipe->setBodySize(size.value());
327 // else the piped body size is unknown by default
328 }
329 #endif
330
331 void
332 Adaptation::Ecap::XactionRep::adaptationDelayed(const libecap::Delay &d)
333 {
334 debugs(93,3, HERE << "adapter needs time: " <<
335 d.state << '/' << d.progress);
336 // XXX: set timeout?
337 }
338
339 void
340 Adaptation::Ecap::XactionRep::adaptationAborted()
341 {
342 tellQueryAborted(true); // should eCAP support retries?
343 mustStop("adaptationAborted");
344 }
345
346 bool
347 Adaptation::Ecap::XactionRep::callable() const
348 {
349 return !done();
350 }
351
352 void
353 Adaptation::Ecap::XactionRep::noteMoreBodySpaceAvailable(RefCount<BodyPipe> bp)
354 {
355 Must(proxyingAb == opOn);
356 moveAbContent();
357 }
358
359 void
360 Adaptation::Ecap::XactionRep::noteBodyConsumerAborted(RefCount<BodyPipe> bp)
361 {
362 Must(proxyingAb == opOn);
363 stopProducingFor(answer().body_pipe, false);
364 Must(theMaster);
365 theMaster->abStopMaking();
366 proxyingAb = opComplete;
367 }
368
369 void
370 Adaptation::Ecap::XactionRep::noteMoreBodyDataAvailable(RefCount<BodyPipe> bp)
371 {
372 Must(proxyingVb == opOn);
373 Must(theMaster);
374 theMaster->noteVbContentAvailable();
375 }
376
377 void
378 Adaptation::Ecap::XactionRep::noteBodyProductionEnded(RefCount<BodyPipe> bp)
379 {
380 Must(proxyingVb == opOn);
381 Must(theMaster);
382 theMaster->noteVbContentDone(true);
383 proxyingVb = opComplete;
384 }
385
386 void
387 Adaptation::Ecap::XactionRep::noteBodyProducerAborted(RefCount<BodyPipe> bp)
388 {
389 Must(proxyingVb == opOn);
390 Must(theMaster);
391 theMaster->noteVbContentDone(false);
392 proxyingVb = opComplete;
393 }
394
395 void
396 Adaptation::Ecap::XactionRep::noteInitiatorAborted()
397 {
398 mustStop("initiator aborted");
399 }
400
401 // get content from the adapter and put it into the adapted pipe
402 void
403 Adaptation::Ecap::XactionRep::moveAbContent()
404 {
405 Must(proxyingAb == opOn);
406 const libecap::Area c = theMaster->abContent(0, libecap::nsize);
407 debugs(93,5, HERE << "up to " << c.size << " bytes");
408 if (c.size == 0 && abProductionFinished) { // no ab now and in the future
409 stopProducingFor(answer().body_pipe, abProductionAtEnd);
410 proxyingAb = opComplete;
411 debugs(93,5, HERE << "last adapted body data retrieved");
412 } else if (c.size > 0) {
413 if (const size_t used = answer().body_pipe->putMoreData(c.start, c.size))
414 theMaster->abContentShift(used);
415 }
416 }
417
418 const char *
419 Adaptation::Ecap::XactionRep::status() const
420 {
421 static MemBuf buf;
422 buf.reset();
423
424 buf.append(" [", 2);
425
426 if (proxyingVb == opOn) {
427 const BodyPipePointer &vp = theVirginRep.raw().body_pipe;
428 if (!canAccessVb)
429 buf.append("x", 1);
430 if (vp != NULL && vp->stillConsuming(this)) {
431 buf.append("Vb", 2);
432 buf.append(vp->status(), strlen(vp->status())); // XXX
433 } else
434 buf.append("V.", 2);
435 }
436
437 if (proxyingAb == opOn) {
438 MessageRep *rep = dynamic_cast<MessageRep*>(theAnswerRep.get());
439 Must(rep);
440 const BodyPipePointer &ap = rep->raw().body_pipe;
441 if (ap != NULL && ap->stillProducing(this)) {
442 buf.append(" Ab", 3);
443 buf.append(ap->status(), strlen(ap->status())); // XXX
444 } else
445 buf.append(" A.", 3);
446 }
447
448 buf.Printf(" ecapx%d]", id);
449
450 buf.terminate();
451
452 return buf.content();
453 }