]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ICAP/ICAPModXact.cc
Add comment explaining why there is 'break' in the loop that looks for
[thirdparty/squid.git] / src / ICAP / ICAPModXact.cc
CommitLineData
774c051c 1/*
2 * DEBUG: section 93 ICAP (RFC 3507) Client
3 */
4
5#include "squid.h"
6#include "comm.h"
7#include "MsgPipe.h"
8#include "MsgPipeData.h"
9#include "HttpRequest.h"
10#include "HttpReply.h"
11#include "ICAPServiceRep.h"
12#include "ICAPModXact.h"
13#include "ICAPClient.h"
14#include "ChunkedCodingParser.h"
15#include "TextException.h"
16
17// flow and terminology:
18// HTTP| --> receive --> encode --> write --> |network
19// end | <-- send <-- parse <-- read <-- |end
20
21// TODO: doneSending()/doneReceving() data members should probably be in sync
22// with this->adapted/virgin pointers. Make adapted/virgin methods?
23
24// TODO: replace gotEncapsulated() with something faster; we call it often
25
26CBDATA_CLASS_INIT(ICAPModXact);
27
28static const size_t TheBackupLimit = ICAP::MsgPipeBufSizeMax;
29
30
31ICAPModXact::State::State()
32{
33 memset(this, sizeof(*this), 0);
34}
35
36ICAPModXact::ICAPModXact(): ICAPXaction("ICAPModXact"),
37 self(NULL), virgin(NULL), adapted(NULL),
38 icapReply(NULL), virginConsumed(0),
39 bodyParser(NULL)
40{}
41
42void ICAPModXact::init(ICAPServiceRep::Pointer &aService, MsgPipe::Pointer &aVirgin, MsgPipe::Pointer &anAdapted, Pointer &aSelf)
43{
44 assert(!self.getRaw() && !virgin.getRaw() && !adapted.getRaw());
45 assert(aSelf.getRaw() && aVirgin.getRaw() && anAdapted.getRaw());
46
47 self = aSelf;
48 service(aService);
49
50 virgin = aVirgin;
51 adapted = anAdapted;
52
53 // receiving end
54 virgin->sink = this; // should be 'self' and refcounted
55 // virgin pipe data is initiated by the source
56
57 // sending end
58 adapted->source = this; // should be 'self' and refcounted
59 adapted->data = new MsgPipeData;
60
61 adapted->data->body = new MemBuf; // XXX: make body a non-pointer?
62 adapted->data->body->init(ICAP::MsgPipeBufSizeMin, ICAP::MsgPipeBufSizeMax);
63 // headers are initialized when we parse them
64
65 // writing and reading ends are handled by ICAPXaction
66
67 // encoding
68 // nothing to do because we are using temporary buffers
69
70 // parsing
71 icapReply = new HttpReply;
72 icapReply->protoPrefix = "ICAP/"; // TODO: make an IcapReply class?
73
74 // XXX: make sure stop() cleans all buffers
75}
76
77// HTTP side starts sending virgin data
78void ICAPModXact::noteSourceStart(MsgPipe *p)
79{
80 ICAPXaction_Enter(noteSourceStart);
81
82 // make sure TheBackupLimit is in-sync with the buffer size
83 Must(TheBackupLimit <= static_cast<size_t>(virgin->data->body->max_capacity));
84
85 estimateVirginBody(); // before virgin disappears!
86
87 // it is an ICAP violation to send request to a service w/o known OPTIONS
88
89 if (service().up())
90 startWriting();
91 else
92 waitForService();
93
94 // XXX: but this has to be here to catch other errors. Thus, if
95 // commConnectStart in startWriting fails, we may get here
96 //_after_ the object got destroyed. Somebody please fix commConnectStart!
97 ICAPXaction_Exit();
98}
99
100static
101void ICAPModXact_noteServiceReady(void *data, ICAPServiceRep::Pointer &)
102{
103 ICAPModXact *x = static_cast<ICAPModXact*>(data);
104 assert(x);
105 x->noteServiceReady();
106}
107
108void ICAPModXact::waitForService()
109{
110 Must(!state.serviceWaiting);
111 debugs(93, 7, "ICAPModXact will wait for the ICAP service " << status());
112 state.serviceWaiting = true;
113 service().callWhenReady(&ICAPModXact_noteServiceReady, this);
114}
115
116void ICAPModXact::noteServiceReady()
117{
118 ICAPXaction_Enter(noteServiceReady);
119
120 Must(state.serviceWaiting);
121 state.serviceWaiting = false;
122 startWriting(); // will throw if service is not up
123
124 ICAPXaction_Exit();
125}
126
127void ICAPModXact::startWriting()
128{
129 Must(service().up());
130
131 state.writing = State::writingConnect;
132 openConnection();
133 // put nothing here as openConnection calls commConnectStart
134 // and that may call us back without waiting for the next select loop
135}
136
137// connection with the ICAP service established
138void ICAPModXact::handleCommConnected()
139{
140 Must(state.writing == State::writingConnect);
141
142 startReading(); // wait for early errors from the ICAP server
143
144 MemBuf requestBuf;
145 requestBuf.init();
146
147 makeRequestHeaders(requestBuf);
148 debugs(93, 9, "ICAPModXact ICAP request prefix " << status() << ":\n" <<
149 (requestBuf.terminate(), requestBuf.content()));
150
151 // write headers
152 state.writing = State::writingHeaders;
153 scheduleWrite(requestBuf);
154}
155
156void ICAPModXact::handleCommWrote(size_t)
157{
158 if (state.writing == State::writingHeaders)
159 handleCommWroteHeaders();
160 else
161 handleCommWroteBody();
162}
163
164void ICAPModXact::handleCommWroteHeaders()
165{
166 Must(state.writing == State::writingHeaders);
167
168 if (virginBody.expected()) {
169 state.writing = preview.enabled() ?
170 State::writingPreview : State::writingPrime;
171 virginWriteClaim.protectAll();
172 writeMore();
173 } else {
174 stopWriting();
175 }
176}
177
178void ICAPModXact::writeMore()
179{
180 if (writer) // already writing something
181 return;
182
183 switch (state.writing) {
184
185 case State::writingInit: // waiting for service OPTIONS
186 Must(state.serviceWaiting);
187
188 case State::writingConnect: // waiting for the connection to establish
189
190 case State::writingHeaders: // waiting for the headers to be written
191
192 case State::writingPaused: // waiting for the ICAP server response
193
194 case State::writingDone: // nothing more to write
195 return;
196
197 case State::writingPreview:
198 writePriviewBody();
199 return;
200
201 case State::writingPrime:
202 writePrimeBody();
203 return;
204
205 default:
206 throw TexcHere("ICAPModXact in bad writing state");
207 }
208}
209
210void ICAPModXact::writePriviewBody()
211{
212 debugs(93, 8, "ICAPModXact will write Preview body " << status());
213 Must(state.writing == State::writingPreview);
214
215 MsgPipeData::Body *body = virgin->data->body;
216 const size_t size = XMIN(preview.debt(), (size_t)body->contentSize());
217 writeSomeBody("preview body", size);
218
219 // change state once preview is written
220
221 if (preview.done()) {
222 debugs(93, 7, "ICAPModXact wrote entire Preview body " << status());
223
224 if (preview.ieof())
225 stopWriting();
226 else
227 state.writing = State::writingPaused;
228 }
229}
230
231void ICAPModXact::writePrimeBody()
232{
233 Must(state.writing == State::writingPrime);
234 Must(virginWriteClaim.active());
235
236 MsgPipeData::Body *body = virgin->data->body;
237 const size_t size = body->contentSize();
238 writeSomeBody("prime virgin body", size);
239
240 if (state.doneReceiving)
241 stopWriting();
242}
243
244void ICAPModXact::writeSomeBody(const char *label, size_t size)
245{
246 Must(!writer && !state.doneWriting());
247 debugs(93, 8, "ICAPModXact will write up to " << size << " bytes of " <<
248 label);
249
250 MemBuf writeBuf; // TODO: suggest a min size based on size and lastChunk
251
252 writeBuf.init(); // note: we assume that last-chunk will fit
253
254 const size_t writeableSize = claimSize(virginWriteClaim);
255 const size_t chunkSize = XMIN(writeableSize, size);
256
257 if (chunkSize) {
258 debugs(93, 7, "ICAPModXact will write " << chunkSize <<
259 "-byte chunk of " << label);
260 } else {
261 debugs(93, 7, "ICAPModXact has no writeable " << label << " content");
262 }
263
264 moveRequestChunk(writeBuf, chunkSize);
265
266 const bool lastChunk =
267 (state.writing == State::writingPreview && preview.done()) ||
268 (state.doneReceiving && claimSize(virginWriteClaim) <= 0);
269
270 if (lastChunk && virginBody.expected()) {
271 debugs(93, 8, "ICAPModXact will write last-chunk of " << label);
272 addLastRequestChunk(writeBuf);
273 }
274
275 debugs(93, 7, "ICAPModXact will write " << writeBuf.contentSize()
276 << " raw bytes of " << label);
277
278 if (writeBuf.hasContent()) {
279 scheduleWrite(writeBuf); // comm will free the chunk
280 } else {
281 writeBuf.clean();
282 }
283}
284
285void ICAPModXact::moveRequestChunk(MemBuf &buf, size_t chunkSize)
286{
287 if (chunkSize > 0) {
288 openChunk(buf, chunkSize);
289 buf.append(claimContent(virginWriteClaim), chunkSize);
290 closeChunk(buf, false);
291
292 virginWriteClaim.release(chunkSize);
293 virginConsume();
294 }
295
296 if (state.writing == State::writingPreview)
297 preview.wrote(chunkSize, state.doneReceiving); // even if wrote nothing
298}
299
300void ICAPModXact::addLastRequestChunk(MemBuf &buf)
301{
302 openChunk(buf, 0);
303 closeChunk(buf, state.writing == State::writingPreview && preview.ieof());
304}
305
306void ICAPModXact::openChunk(MemBuf &buf, size_t chunkSize)
307{
1dd6edf2 308 buf.Printf("%x\r\n", (int) chunkSize);
774c051c 309}
310
311void ICAPModXact::closeChunk(MemBuf &buf, bool ieof)
312{
313 if (ieof)
314 buf.append("; ieof", 6);
315
316 buf.append(ICAP::crlf, 2); // chunk-terminating CRLF
317}
318
319size_t ICAPModXact::claimSize(const MemBufClaim &claim) const
320{
321 Must(claim.active());
322 const size_t start = claim.offset();
323 const size_t end = virginConsumed + virgin->data->body->contentSize();
324 Must(virginConsumed <= start && start <= end);
325 return end - start;
326}
327
328const char *ICAPModXact::claimContent(const MemBufClaim &claim) const
329{
330 Must(claim.active());
331 const size_t start = claim.offset();
332 Must(virginConsumed <= start);
333 return virgin->data->body->content() + (start - virginConsumed);
334}
335
336void ICAPModXact::virginConsume()
337{
338 MemBuf &buf = *virgin->data->body;
339 const size_t have = static_cast<size_t>(buf.contentSize());
340 const size_t end = virginConsumed + have;
341 size_t offset = end;
342
343 if (virginWriteClaim.active())
344 offset = XMIN(virginWriteClaim.offset(), offset);
345
346 if (virginSendClaim.active())
347 offset = XMIN(virginSendClaim.offset(), offset);
348
349 Must(virginConsumed <= offset && offset <= end);
350
351 if (const size_t size = offset - virginConsumed) {
352 debugs(93, 8, "ICAPModXact consumes " << size << " out of " << have <<
353 " virgin body bytes");
354 buf.consume(size);
355 virginConsumed += size;
356
357 if (!state.doneReceiving)
358 virgin->sendSinkNeed();
359 }
360}
361
362void ICAPModXact::handleCommWroteBody()
363{
364 writeMore();
365}
366
367void ICAPModXact::stopWriting()
368{
369 if (state.writing == State::writingDone)
370 return;
371
372 debugs(93, 7, "ICAPModXact will no longer write " << status());
373
374 state.writing = State::writingDone;
375
376 virginWriteClaim.disable();
377
378 virginConsume();
379
380 // Comm does not have an interface to clear the writer, but
381 // writeMore() will not write if our write callback is called
382 // when state.writing == State::writingDone;
383}
384
385void ICAPModXact::stopBackup()
386{
387 if (!virginSendClaim.active())
388 return;
389
390 debugs(93, 7, "ICAPModXact will no longer backup " << status());
391
392 virginSendClaim.disable();
393
394 virginConsume();
395}
396
397bool ICAPModXact::doneAll() const
398{
399 return ICAPXaction::doneAll() && !state.serviceWaiting &&
400 state.doneReceiving && doneSending() &&
401 doneReading() && state.doneWriting();
402}
403
404void ICAPModXact::startReading()
405{
406 Must(connection >= 0);
407 Must(!reader);
408 Must(adapted.getRaw());
409 Must(adapted->data);
410 Must(adapted->data->body);
411
412 // we use the same buffer for headers and body and then consume headers
413 readMore();
414}
415
416void ICAPModXact::readMore()
417{
418 if (reader || doneReading())
419 return;
420
421 // do not fill readBuf if we have no space to store the result
422 if (!adapted->data->body->hasPotentialSpace())
423 return;
424
425 if (readBuf.hasSpace())
426 scheduleRead();
427}
428
429// comm module read a portion of the ICAP response for us
430void ICAPModXact::handleCommRead(size_t)
431{
432 Must(!state.doneParsing());
433 parseMore();
434 readMore();
435}
436
437void ICAPModXact::echoMore()
438{
439 Must(state.sending == State::sendingVirgin);
440 Must(virginSendClaim.active());
441
442 MemBuf &from = *virgin->data->body;
443 MemBuf &to = *adapted->data->body;
444
445 const size_t sizeMax = claimSize(virginSendClaim);
446 const size_t size = XMIN(static_cast<size_t>(to.potentialSpaceSize()),
447 sizeMax);
448 debugs(93, 5, "ICAPModXact echos " << size << " out of " << sizeMax <<
449 " bytes");
450
451 if (size > 0) {
452 to.append(claimContent(virginSendClaim), size);
453 virginSendClaim.release(size);
454 virginConsume();
455 adapted->sendSourceProgress();
456 }
457
458 if (!from.hasContent() && state.doneReceiving) {
459 debugs(93, 5, "ICAPModXact echoed all " << status());
460 stopSending(true);
461 } else {
462 debugs(93, 5, "ICAPModXact has " << from.contentSize() << " bytes " <<
463 "and expects more to echo " << status());
464 virgin->sendSinkNeed(); // TODO: timeout if sink is broken
465 }
466}
467
468bool ICAPModXact::doneSending() const
469{
470 Must((state.sending == State::sendingDone) == (!adapted));
471 return state.sending == State::sendingDone;
472}
473
474void ICAPModXact::stopSending(bool nicely)
475{
476 if (doneSending())
477 return;
478
479 if (state.sending != State::sendingUndecided) {
480 debugs(93, 7, "ICAPModXact will no longer send " << status());
481
482 if (nicely)
483 adapted->sendSourceFinish();
484 else
485 adapted->sendSourceAbort();
486 } else {
487 debugs(93, 7, "ICAPModXact will not start sending " << status());
488 adapted->sendSourceAbort(); // or the sink may wait forever
489 }
490
491 state.sending = State::sendingDone;
492
493 /*
494 * Note on adapted->data->header: we created the header, but allow the
bab98cf3 495 * other side (ICAPClientRespmodPrecache) to take control of it. We won't touch it here
774c051c 496 * and instead rely on the Anchor-side to make sure it is properly freed.
497 */
498 adapted = NULL; // refcounted
499}
500
501void ICAPModXact::stopReceiving()
502{
503 // stopSending NULLifies adapted but we do not NULLify virgin.
504 // This is assymetric because we want to keep virgin->data even
505 // though we are not expecting any more virgin->data->body.
506 // TODO: can we cache just the needed headers info instead?
507
508 // If they closed first, there is not point (or means) to notify them.
509
510 if (state.doneReceiving)
511 return;
512
513 // There is no sendSinkFinished() to notify the other side.
514 debugs(93, 7, "ICAPModXact will not receive " << status());
515
516 state.doneReceiving = true;
517}
518
519void ICAPModXact::parseMore()
520{
521 debugs(93, 5, "have " << readBuf.contentSize() << " bytes to parse" <<
522 status());
523
524 if (state.parsingHeaders())
525 parseHeaders();
526
527 if (state.parsing == State::psBody)
528 parseBody();
529}
530
531// note that allocation for echoing is done in handle204NoContent()
532void ICAPModXact::maybeAllocateHttpMsg()
533{
534 if (adapted->data->header) // already allocated
535 return;
536
537 if (gotEncapsulated("res-hdr")) {
538 adapted->data->header = new HttpReply;
539 } else if (gotEncapsulated("req-hdr")) {
540 adapted->data->header = new HttpRequest;
541 } else
542 throw TexcHere("Neither res-hdr nor req-hdr in maybeAllocateHttpMsg()");
543}
544
545void ICAPModXact::parseHeaders()
546{
547 Must(state.parsingHeaders());
548
549 if (state.parsing == State::psIcapHeader)
550 parseIcapHead();
551
552 if (state.parsing == State::psHttpHeader)
553 parseHttpHead();
554
555 if (state.parsingHeaders()) { // need more data
556 Must(mayReadMore());
557 return;
558 }
559
560 adapted->sendSourceStart();
561
562 if (state.sending == State::sendingVirgin)
563 echoMore();
564}
565
566void ICAPModXact::parseIcapHead()
567{
568 Must(state.sending == State::sendingUndecided);
569
570 if (!parseHead(icapReply))
571 return;
572
573 switch (icapReply->sline.status) {
574
575 case 100:
576 handle100Continue();
577 break;
578
579 case 200:
580 handle200Ok();
581 break;
582
583 case 204:
584 handle204NoContent();
585 break;
586
587 default:
588 handleUnknownScode();
589 break;
590 }
591
592 // handle100Continue() manages state.writing on its own.
593 // Non-100 status means the server needs no postPreview data from us.
594 if (state.writing == State::writingPaused)
595 stopWriting();
596
597 // TODO: Consider applying a Squid 2.5 patch to recognize 201 responses
598}
599
600void ICAPModXact::handle100Continue()
601{
602 Must(state.writing == State::writingPaused);
603 Must(preview.enabled() && preview.done() && !preview.ieof());
604 Must(virginSendClaim.active());
605
606 if (virginSendClaim.limited()) // preview only
607 stopBackup();
608
609 state.parsing = State::psHttpHeader; // eventually
610
611 state.writing = State::writingPrime;
612
613 writeMore();
614}
615
616void ICAPModXact::handle200Ok()
617{
618 state.parsing = State::psHttpHeader;
619 state.sending = State::sendingAdapted;
620 stopBackup();
621}
622
623void ICAPModXact::handle204NoContent()
624{
625 stopParsing();
626 Must(virginSendClaim.active());
627 virginSendClaim.protectAll(); // extends protection if needed
628 state.sending = State::sendingVirgin;
629
630 // We want to clone the HTTP message, but we do not want
631 // to copy non-HTTP state parts that HttpMsg kids carry in them.
632 // Thus, we cannot use a smart pointer, copy constructor, or equivalent.
633 // Instead, we simply write the HTTP message and "clone" it by parsing.
634
635 HttpMsg *oldHead = virgin->data->header;
636 debugs(93, 7, "ICAPModXact cloning virgin message " << oldHead);
637
638 MemBuf httpBuf;
639
640 // write the virgin message into a memory buffer
641 httpBuf.init();
642 packHead(httpBuf, oldHead);
643
644 // allocate the adapted message
645 HttpMsg *&newHead = adapted->data->header;
646 Must(!newHead);
647
648 if (dynamic_cast<const HttpRequest*>(oldHead))
649 newHead = new HttpRequest;
650 else
651 if (dynamic_cast<const HttpReply*>(oldHead))
652 newHead = new HttpReply;
653
654 Must(newHead);
655
656 // parse the buffer back
657 http_status error = HTTP_STATUS_NONE;
658
659 Must(newHead->parse(&httpBuf, true, &error));
660
661 Must(newHead->hdr_sz == httpBuf.contentSize()); // no leftovers
662
663 httpBuf.clean();
664
665 debugs(93, 7, "ICAPModXact cloned virgin message " << oldHead << " to " << newHead);
666}
667
668void ICAPModXact::handleUnknownScode()
669{
670 stopParsing();
671 stopBackup();
672 // TODO: mark connection as "bad"
673
674 // Terminate the transaction; we do not know how to handle this response.
675 throw TexcHere("Unsupported ICAP status code");
676}
677
678void ICAPModXact::parseHttpHead()
679{
680 if (gotEncapsulated("res-hdr") || gotEncapsulated("req-hdr")) {
681 maybeAllocateHttpMsg();
682
683 if (!parseHead(adapted->data->header))
684 return;
685 }
686
687 state.parsing = State::psBody;
688}
689
690bool ICAPModXact::parseHead(HttpMsg *head)
691{
692 assert(head);
693 debugs(93, 5, "have " << readBuf.contentSize() << " head bytes to parse" <<
694 "; state: " << state.parsing);
695
696 http_status error = HTTP_STATUS_NONE;
697 const bool parsed = head->parse(&readBuf, commEof, &error);
698 Must(parsed || !error); // success or need more data
699
700 if (!parsed) { // need more data
701 head->reset();
702 return false;
703 }
704
705 readBuf.consume(head->hdr_sz);
706 return true;
707}
708
709void ICAPModXact::parseBody()
710{
711 Must(state.parsing == State::psBody);
712
713 debugs(93, 5, "have " << readBuf.contentSize() << " body bytes to parse");
714
715 if (gotEncapsulated("res-body")) {
716 if (!parsePresentBody()) // need more body data
717 return;
718 } else {
719 debugs(93, 5, "not expecting a body");
720 }
721
722 stopParsing();
723 stopSending(true);
724}
725
726// returns true iff complete body was parsed
727bool ICAPModXact::parsePresentBody()
728{
729 if (!bodyParser)
730 bodyParser = new ChunkedCodingParser;
731
732 // the parser will throw on errors
733 const bool parsed = bodyParser->parse(&readBuf, adapted->data->body);
734
735 adapted->sendSourceProgress(); // TODO: do not send if parsed nothing
736
737 debugs(93, 5, "have " << readBuf.contentSize() << " body bytes after " <<
738 "parse; parsed all: " << parsed);
739
740 if (parsed)
741 return true;
742
743 if (bodyParser->needsMoreData())
744 Must(mayReadMore());
745
746 if (bodyParser->needsMoreSpace()) {
747 Must(!doneSending()); // can hope for more space
748 Must(adapted->data->body->hasContent()); // paranoid
749 // TODO: there should be a timeout in case the sink is broken.
750 }
751
752 return false;
753}
754
755void ICAPModXact::stopParsing()
756{
757 if (state.parsing == State::psDone)
758 return;
759
760 debugs(93, 7, "ICAPModXact will no longer parse " << status());
761
762 delete bodyParser;
763
764 bodyParser = NULL;
765
766 state.parsing = State::psDone;
767}
768
769// HTTP side added virgin body data
770void ICAPModXact::noteSourceProgress(MsgPipe *p)
771{
772 ICAPXaction_Enter(noteSourceProgress);
773
774 Must(!state.doneReceiving);
775 writeMore();
776
777 if (state.sending == State::sendingVirgin)
778 echoMore();
779
780 ICAPXaction_Exit();
781}
782
783// HTTP side sent us all virgin info
784void ICAPModXact::noteSourceFinish(MsgPipe *p)
785{
786 ICAPXaction_Enter(noteSourceFinish);
787
788 Must(!state.doneReceiving);
789 stopReceiving();
790
791 // push writer and sender in case we were waiting for the last-chunk
792 writeMore();
793
794 if (state.sending == State::sendingVirgin)
795 echoMore();
796
797 ICAPXaction_Exit();
798}
799
800// HTTP side is aborting
801void ICAPModXact::noteSourceAbort(MsgPipe *p)
802{
803 ICAPXaction_Enter(noteSourceAbort);
804
805 Must(!state.doneReceiving);
806 stopReceiving();
807 mustStop("HTTP source quit");
808
809 ICAPXaction_Exit();
810}
811
812// HTTP side wants more adapted data and possibly freed some buffer space
813void ICAPModXact::noteSinkNeed(MsgPipe *p)
814{
815 ICAPXaction_Enter(noteSinkNeed);
816
817 if (state.sending == State::sendingVirgin)
818 echoMore();
819 else
820 if (state.sending == State::sendingAdapted)
821 parseMore();
822 else
823 Must(state.sending == State::sendingUndecided);
824
825 ICAPXaction_Exit();
826}
827
828// HTTP side aborted
829void ICAPModXact::noteSinkAbort(MsgPipe *p)
830{
831 ICAPXaction_Enter(noteSinkAbort);
832
833 mustStop("HTTP sink quit");
834
835 ICAPXaction_Exit();
836}
837
838// internal cleanup
839void ICAPModXact::doStop()
840{
841 ICAPXaction::doStop();
842
843 stopWriting();
844 stopBackup();
845
846 if (icapReply) {
847 delete icapReply;
848 icapReply = NULL;
849 }
850
851 stopSending(false);
852
853 // see stopReceiving() for reasons it cannot NULLify virgin there
854
855 if (virgin != NULL) {
856 if (!state.doneReceiving)
857 virgin->sendSinkAbort();
858 else
859 virgin->sink = NULL;
860
861 virgin = NULL; // refcounted
862 }
863
864 if (self != NULL) {
865 Pointer s = self;
866 self = NULL;
867 ICAPNoteXactionDone(s);
868 /* this object may be destroyed when 's' is cleared */
869 }
870}
871
872void ICAPModXact::makeRequestHeaders(MemBuf &buf)
873{
874 const ICAPServiceRep &s = service();
875 buf.Printf("%s %s ICAP/1.0\r\n", s.methodStr(), s.uri.buf());
876 buf.Printf("Host: %s:%d\r\n", s.host.buf(), s.port);
877 buf.Printf("Encapsulated: ");
878
879 MemBuf httpBuf;
880 httpBuf.init();
881
882 // build HTTP request header, if any
883 ICAP::Method m = s.method;
884
885 if (ICAP::methodRespmod == m && virgin->data->cause)
886 encapsulateHead(buf, "req-hdr", httpBuf, virgin->data->cause);
887 else if (ICAP::methodReqmod == m)
888 encapsulateHead(buf, "req-hdr", httpBuf, virgin->data->header);
889
890 if (ICAP::methodRespmod == m)
891 if (const MsgPipeData::Header *prime = virgin->data->header)
892 encapsulateHead(buf, "res-hdr", httpBuf, prime);
893
894 if (!virginBody.expected())
1dd6edf2 895 buf.Printf("null-body=%d", (int) httpBuf.contentSize());
774c051c 896 else if (ICAP::methodReqmod == m)
1dd6edf2 897 buf.Printf("req-body=%d", (int) httpBuf.contentSize());
774c051c 898 else
1dd6edf2 899 buf.Printf("res-body=%d", (int) httpBuf.contentSize());
774c051c 900
901 buf.append(ICAP::crlf, 2); // terminate Encapsulated line
902
903 if (shouldPreview()) {
904 buf.Printf("Preview: %d\r\n", (int)preview.ad());
905 virginSendClaim.protectUpTo(preview.ad());
906 }
907
908 if (shouldAllow204()) {
909 buf.Printf("Allow: 204\r\n");
910 // be robust: do not rely on the expected body size
911 virginSendClaim.protectAll();
912 }
913
914 buf.append(ICAP::crlf, 2); // terminate ICAP header
915
916 // start ICAP request body with encapsulated HTTP headers
917 buf.append(httpBuf.content(), httpBuf.contentSize());
918
919 httpBuf.clean();
920}
921
922void ICAPModXact::encapsulateHead(MemBuf &icapBuf, const char *section, MemBuf &httpBuf, const HttpMsg *head)
923{
924 // update ICAP header
1dd6edf2 925 icapBuf.Printf("%s=%d,", section, (int) httpBuf.contentSize());
774c051c 926
927 // pack HTTP head
928 packHead(httpBuf, head);
929}
930
931void ICAPModXact::packHead(MemBuf &httpBuf, const HttpMsg *head)
932{
933 Packer p;
934 packerToMemInit(&p, &httpBuf);
935 head->packInto(&p, true);
936 packerClean(&p);
937}
938
939// decides whether to offer a preview and calculates its size
940bool ICAPModXact::shouldPreview()
941{
942 size_t wantedSize;
943
944 if (!service().wantsPreview(wantedSize)) {
945 debugs(93, 5, "ICAPModXact should not offer preview");
946 return false;
947 }
948
949 Must(wantedSize >= 0);
950
951 // cannot preview more than we can backup
952 size_t ad = XMIN(wantedSize, TheBackupLimit);
953
954 if (virginBody.expected() && virginBody.knownSize())
955 ad = XMIN(ad, virginBody.size()); // not more than we have
956 else
957 ad = 0; // questionable optimization?
958
959 debugs(93, 5, "ICAPModXact should offer " << ad << "-byte preview " <<
960 "(service wanted " << wantedSize << ")");
961
962 preview.enable(ad);
963
964 return preview.enabled();
965}
966
967// decides whether to allow 204 responses
968bool ICAPModXact::shouldAllow204()
969{
970 if (!service().allows204())
971 return false;
972
973 if (!virginBody.expected())
974 return true; // no body means no problems with supporting 204s.
975
976 // if there is a body, make sure we can backup it all
977
978 if (!virginBody.knownSize())
979 return false;
980
981 // or should we have a different backup limit?
982 // note that '<' allows for 0-termination of the "full" backup buffer
983 return virginBody.size() < TheBackupLimit;
984}
985
986// returns a temporary string depicting transaction status, for debugging
987void ICAPModXact::fillPendingStatus(MemBuf &buf) const
988{
989 if (state.serviceWaiting)
990 buf.append("U", 1);
991
992 if (!state.doneWriting() && state.writing != State::writingInit)
993 buf.Printf("w(%d)", state.writing);
994
995 if (preview.enabled()) {
996 if (!preview.done())
1dd6edf2 997 buf.Printf("P(%d)", (int) preview.debt());
774c051c 998 }
999
1000 if (virginSendClaim.active())
1001 buf.append("B", 1);
1002
1003 if (!state.doneParsing() && state.parsing != State::psIcapHeader)
1004 buf.Printf("p(%d)", state.parsing);
1005
1006 if (!doneSending() && state.sending != State::sendingUndecided)
1007 buf.Printf("S(%d)", state.sending);
1008}
1009
1010void ICAPModXact::fillDoneStatus(MemBuf &buf) const
1011{
1012 if (state.doneReceiving)
1013 buf.append("R", 1);
1014
1015 if (state.doneWriting())
1016 buf.append("w", 1);
1017
1018 if (preview.enabled()) {
1019 if (preview.done())
1020 buf.Printf("P%s", preview.ieof() ? "(ieof)" : "");
1021 }
1022
1023 if (doneReading())
1024 buf.append("r", 1);
1025
1026 if (state.doneParsing())
1027 buf.append("p", 1);
1028
1029 if (doneSending())
1030 buf.append("S", 1);
1031}
1032
1033bool ICAPModXact::gotEncapsulated(const char *section) const
1034{
1035 return httpHeaderGetByNameListMember(&icapReply->header, "Encapsulated",
1036 section, ',').size() > 0;
1037}
1038
1039// calculate whether there is a virgin HTTP body and
1040// whether its expected size is known
1041void ICAPModXact::estimateVirginBody()
1042{
1043 // note: defaults should be fine but will disable previews and 204s
1044
1045 Must(virgin != NULL && virgin->data->header);
1046
1047 method_t method;
1048
1049 if (virgin->data->cause)
1050 method = virgin->data->cause->method;
1051 else
1052 if (HttpRequest *req= dynamic_cast<HttpRequest*>(virgin->data->
1053 header))
1054 method = req->method;
1055 else
1056 return;
1057
1058 ssize_t size;
1059 if (virgin->data->header->expectingBody(method, size)) {
1060 virginBody.expect(size)
1061 ;
1062 debugs(93, 6, "ICAPModXact expects virgin body; size: " << size);
1063 } else {
1064 debugs(93, 6, "ICAPModXact does not expect virgin body");
1065 }
1066}
1067
1068
1069// TODO: Move SizedEstimate, MemBufBackup, and ICAPPreview elsewhere
1070
1071SizedEstimate::SizedEstimate()
1072 : theData(dtUnexpected)
1073{}
1074
1075void SizedEstimate::expect(ssize_t aSize)
1076{
1077 theData = (aSize >= 0) ? aSize : (ssize_t)dtUnknown;
1078}
1079
1080bool SizedEstimate::expected() const
1081{
1082 return theData != dtUnexpected;
1083}
1084
1085bool SizedEstimate::knownSize() const
1086{
1087 Must(expected());
1088 return theData != dtUnknown;
1089}
1090
1091size_t SizedEstimate::size() const
1092{
1093 Must(knownSize());
1094 return static_cast<size_t>(theData);
1095}
1096
1097
1098
1099MemBufClaim::MemBufClaim(): theStart(-1), theGoal(-1)
1100{}
1101
1102void MemBufClaim::protectAll()
1103{
1104 if (theStart < 0)
1105 theStart = 0;
1106
1107 theGoal = -1; // no specific goal
1108}
1109
1110void MemBufClaim::protectUpTo(size_t aGoal)
1111{
1112 if (theStart < 0)
1113 theStart = 0;
1114
1115 Must(aGoal >= 0);
1116
1117 theGoal = (theGoal < 0) ? static_cast<ssize_t>(aGoal) :
1118 XMIN(static_cast<ssize_t>(aGoal), theGoal);
1119}
1120
1121void MemBufClaim::disable()
1122{
1123 theStart = -1;
1124}
1125
1126void MemBufClaim::release(size_t size)
1127{
1128 Must(active());
1129 Must(size >= 0);
1130 theStart += static_cast<ssize_t>(size);
1131
1132 if (limited() && theStart >= theGoal)
1133 disable();
1134}
1135
1136size_t MemBufClaim::offset() const
1137{
1138 Must(active());
1139 return static_cast<size_t>(theStart);
1140}
1141
1142bool MemBufClaim::limited() const
1143{
1144 Must(active());
1145 return theGoal >= 0;
1146}
1147
1148
1149ICAPPreview::ICAPPreview(): theWritten(0), theAd(0), theState(stDisabled)
1150{}
1151
1152void ICAPPreview::enable(size_t anAd)
1153{
1154 // TODO: check for anAd not exceeding preview size limit
1155 Must(anAd >= 0);
1156 Must(!enabled());
1157 theAd = anAd;
1158 theState = stWriting;
1159}
1160
1161bool ICAPPreview::enabled() const
1162{
1163 return theState != stDisabled;
1164}
1165
1166size_t ICAPPreview::ad() const
1167{
1168 Must(enabled());
1169 return theAd;
1170}
1171
1172bool ICAPPreview::done() const
1173{
1174 Must(enabled());
1175 return theState >= stIeof;
1176}
1177
1178bool ICAPPreview::ieof() const
1179{
1180 Must(enabled());
1181 return theState == stIeof;
1182}
1183
1184size_t ICAPPreview::debt() const
1185{
1186 Must(enabled());
1187 return done() ? 0 : (theAd - theWritten);
1188}
1189
1190void ICAPPreview::wrote(size_t size, bool sawEof)
1191{
1192 Must(enabled());
1193 theWritten += size;
1194
1195 if (theWritten >= theAd)
1196 theState = stDone; // sawEof is irrelevant
1197 else
1198 if (sawEof)
1199 theState = stIeof;
1200}
1201