]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ICAP/ICAPModXact.cc
Add X-Client-IP and X-Client-Username extention headers to ICAP
[thirdparty/squid.git] / src / ICAP / ICAPModXact.cc
CommitLineData
774c051c 1/*
2 * DEBUG: section 93 ICAP (RFC 3507) Client
3 */
4
5#include "squid.h"
6#include "comm.h"
7#include "MsgPipe.h"
8#include "MsgPipeData.h"
9#include "HttpRequest.h"
10#include "HttpReply.h"
11#include "ICAPServiceRep.h"
12#include "ICAPModXact.h"
13#include "ICAPClient.h"
14#include "ChunkedCodingParser.h"
15#include "TextException.h"
a97e82a8 16#include "AuthUserRequest.h"
774c051c 17
18// flow and terminology:
19// HTTP| --> receive --> encode --> write --> |network
20// end | <-- send <-- parse <-- read <-- |end
21
22// TODO: doneSending()/doneReceving() data members should probably be in sync
23// with this->adapted/virgin pointers. Make adapted/virgin methods?
24
25// TODO: replace gotEncapsulated() with something faster; we call it often
26
27CBDATA_CLASS_INIT(ICAPModXact);
28
29static const size_t TheBackupLimit = ICAP::MsgPipeBufSizeMax;
30
31
32ICAPModXact::State::State()
33{
34 memset(this, sizeof(*this), 0);
35}
36
37ICAPModXact::ICAPModXact(): ICAPXaction("ICAPModXact"),
38 self(NULL), virgin(NULL), adapted(NULL),
39 icapReply(NULL), virginConsumed(0),
40 bodyParser(NULL)
41{}
42
43void ICAPModXact::init(ICAPServiceRep::Pointer &aService, MsgPipe::Pointer &aVirgin, MsgPipe::Pointer &anAdapted, Pointer &aSelf)
44{
45 assert(!self.getRaw() && !virgin.getRaw() && !adapted.getRaw());
46 assert(aSelf.getRaw() && aVirgin.getRaw() && anAdapted.getRaw());
47
48 self = aSelf;
49 service(aService);
50
51 virgin = aVirgin;
52 adapted = anAdapted;
53
54 // receiving end
55 virgin->sink = this; // should be 'self' and refcounted
56 // virgin pipe data is initiated by the source
57
58 // sending end
59 adapted->source = this; // should be 'self' and refcounted
60 adapted->data = new MsgPipeData;
61
62 adapted->data->body = new MemBuf; // XXX: make body a non-pointer?
63 adapted->data->body->init(ICAP::MsgPipeBufSizeMin, ICAP::MsgPipeBufSizeMax);
64 // headers are initialized when we parse them
65
66 // writing and reading ends are handled by ICAPXaction
67
68 // encoding
69 // nothing to do because we are using temporary buffers
70
71 // parsing
72 icapReply = new HttpReply;
73 icapReply->protoPrefix = "ICAP/"; // TODO: make an IcapReply class?
74
75 // XXX: make sure stop() cleans all buffers
76}
77
78// HTTP side starts sending virgin data
79void ICAPModXact::noteSourceStart(MsgPipe *p)
80{
81 ICAPXaction_Enter(noteSourceStart);
82
83 // make sure TheBackupLimit is in-sync with the buffer size
84 Must(TheBackupLimit <= static_cast<size_t>(virgin->data->body->max_capacity));
85
86 estimateVirginBody(); // before virgin disappears!
87
88 // it is an ICAP violation to send request to a service w/o known OPTIONS
89
90 if (service().up())
91 startWriting();
92 else
93 waitForService();
94
95 // XXX: but this has to be here to catch other errors. Thus, if
96 // commConnectStart in startWriting fails, we may get here
97 //_after_ the object got destroyed. Somebody please fix commConnectStart!
98 ICAPXaction_Exit();
99}
100
101static
102void ICAPModXact_noteServiceReady(void *data, ICAPServiceRep::Pointer &)
103{
104 ICAPModXact *x = static_cast<ICAPModXact*>(data);
105 assert(x);
106 x->noteServiceReady();
107}
108
109void ICAPModXact::waitForService()
110{
111 Must(!state.serviceWaiting);
112 debugs(93, 7, "ICAPModXact will wait for the ICAP service " << status());
113 state.serviceWaiting = true;
114 service().callWhenReady(&ICAPModXact_noteServiceReady, this);
115}
116
117void ICAPModXact::noteServiceReady()
118{
119 ICAPXaction_Enter(noteServiceReady);
120
121 Must(state.serviceWaiting);
122 state.serviceWaiting = false;
123 startWriting(); // will throw if service is not up
124
125 ICAPXaction_Exit();
126}
127
128void ICAPModXact::startWriting()
129{
130 Must(service().up());
131
132 state.writing = State::writingConnect;
133 openConnection();
134 // put nothing here as openConnection calls commConnectStart
135 // and that may call us back without waiting for the next select loop
136}
137
138// connection with the ICAP service established
139void ICAPModXact::handleCommConnected()
140{
141 Must(state.writing == State::writingConnect);
142
143 startReading(); // wait for early errors from the ICAP server
144
145 MemBuf requestBuf;
146 requestBuf.init();
147
148 makeRequestHeaders(requestBuf);
149 debugs(93, 9, "ICAPModXact ICAP request prefix " << status() << ":\n" <<
150 (requestBuf.terminate(), requestBuf.content()));
151
152 // write headers
153 state.writing = State::writingHeaders;
154 scheduleWrite(requestBuf);
155}
156
157void ICAPModXact::handleCommWrote(size_t)
158{
159 if (state.writing == State::writingHeaders)
160 handleCommWroteHeaders();
161 else
162 handleCommWroteBody();
163}
164
165void ICAPModXact::handleCommWroteHeaders()
166{
167 Must(state.writing == State::writingHeaders);
168
169 if (virginBody.expected()) {
170 state.writing = preview.enabled() ?
171 State::writingPreview : State::writingPrime;
172 virginWriteClaim.protectAll();
173 writeMore();
174 } else {
175 stopWriting();
176 }
177}
178
179void ICAPModXact::writeMore()
180{
181 if (writer) // already writing something
182 return;
183
184 switch (state.writing) {
185
186 case State::writingInit: // waiting for service OPTIONS
187 Must(state.serviceWaiting);
188
189 case State::writingConnect: // waiting for the connection to establish
190
191 case State::writingHeaders: // waiting for the headers to be written
192
193 case State::writingPaused: // waiting for the ICAP server response
194
195 case State::writingDone: // nothing more to write
196 return;
197
198 case State::writingPreview:
199 writePriviewBody();
200 return;
201
202 case State::writingPrime:
203 writePrimeBody();
204 return;
205
206 default:
207 throw TexcHere("ICAPModXact in bad writing state");
208 }
209}
210
211void ICAPModXact::writePriviewBody()
212{
213 debugs(93, 8, "ICAPModXact will write Preview body " << status());
214 Must(state.writing == State::writingPreview);
215
216 MsgPipeData::Body *body = virgin->data->body;
217 const size_t size = XMIN(preview.debt(), (size_t)body->contentSize());
218 writeSomeBody("preview body", size);
219
220 // change state once preview is written
221
222 if (preview.done()) {
223 debugs(93, 7, "ICAPModXact wrote entire Preview body " << status());
224
225 if (preview.ieof())
226 stopWriting();
227 else
228 state.writing = State::writingPaused;
229 }
230}
231
232void ICAPModXact::writePrimeBody()
233{
234 Must(state.writing == State::writingPrime);
235 Must(virginWriteClaim.active());
236
237 MsgPipeData::Body *body = virgin->data->body;
238 const size_t size = body->contentSize();
239 writeSomeBody("prime virgin body", size);
240
241 if (state.doneReceiving)
242 stopWriting();
243}
244
245void ICAPModXact::writeSomeBody(const char *label, size_t size)
246{
247 Must(!writer && !state.doneWriting());
248 debugs(93, 8, "ICAPModXact will write up to " << size << " bytes of " <<
249 label);
250
251 MemBuf writeBuf; // TODO: suggest a min size based on size and lastChunk
252
253 writeBuf.init(); // note: we assume that last-chunk will fit
254
255 const size_t writeableSize = claimSize(virginWriteClaim);
256 const size_t chunkSize = XMIN(writeableSize, size);
257
258 if (chunkSize) {
259 debugs(93, 7, "ICAPModXact will write " << chunkSize <<
260 "-byte chunk of " << label);
261 } else {
262 debugs(93, 7, "ICAPModXact has no writeable " << label << " content");
263 }
264
265 moveRequestChunk(writeBuf, chunkSize);
266
267 const bool lastChunk =
268 (state.writing == State::writingPreview && preview.done()) ||
269 (state.doneReceiving && claimSize(virginWriteClaim) <= 0);
270
271 if (lastChunk && virginBody.expected()) {
272 debugs(93, 8, "ICAPModXact will write last-chunk of " << label);
273 addLastRequestChunk(writeBuf);
274 }
275
276 debugs(93, 7, "ICAPModXact will write " << writeBuf.contentSize()
277 << " raw bytes of " << label);
278
279 if (writeBuf.hasContent()) {
280 scheduleWrite(writeBuf); // comm will free the chunk
281 } else {
282 writeBuf.clean();
283 }
284}
285
286void ICAPModXact::moveRequestChunk(MemBuf &buf, size_t chunkSize)
287{
288 if (chunkSize > 0) {
289 openChunk(buf, chunkSize);
290 buf.append(claimContent(virginWriteClaim), chunkSize);
291 closeChunk(buf, false);
292
293 virginWriteClaim.release(chunkSize);
294 virginConsume();
295 }
296
297 if (state.writing == State::writingPreview)
298 preview.wrote(chunkSize, state.doneReceiving); // even if wrote nothing
299}
300
301void ICAPModXact::addLastRequestChunk(MemBuf &buf)
302{
303 openChunk(buf, 0);
304 closeChunk(buf, state.writing == State::writingPreview && preview.ieof());
305}
306
307void ICAPModXact::openChunk(MemBuf &buf, size_t chunkSize)
308{
1dd6edf2 309 buf.Printf("%x\r\n", (int) chunkSize);
774c051c 310}
311
312void ICAPModXact::closeChunk(MemBuf &buf, bool ieof)
313{
314 if (ieof)
315 buf.append("; ieof", 6);
316
317 buf.append(ICAP::crlf, 2); // chunk-terminating CRLF
318}
319
320size_t ICAPModXact::claimSize(const MemBufClaim &claim) const
321{
322 Must(claim.active());
323 const size_t start = claim.offset();
324 const size_t end = virginConsumed + virgin->data->body->contentSize();
325 Must(virginConsumed <= start && start <= end);
326 return end - start;
327}
328
329const char *ICAPModXact::claimContent(const MemBufClaim &claim) const
330{
331 Must(claim.active());
332 const size_t start = claim.offset();
333 Must(virginConsumed <= start);
334 return virgin->data->body->content() + (start - virginConsumed);
335}
336
337void ICAPModXact::virginConsume()
338{
339 MemBuf &buf = *virgin->data->body;
340 const size_t have = static_cast<size_t>(buf.contentSize());
341 const size_t end = virginConsumed + have;
342 size_t offset = end;
343
344 if (virginWriteClaim.active())
345 offset = XMIN(virginWriteClaim.offset(), offset);
346
347 if (virginSendClaim.active())
348 offset = XMIN(virginSendClaim.offset(), offset);
349
350 Must(virginConsumed <= offset && offset <= end);
351
352 if (const size_t size = offset - virginConsumed) {
353 debugs(93, 8, "ICAPModXact consumes " << size << " out of " << have <<
354 " virgin body bytes");
355 buf.consume(size);
356 virginConsumed += size;
357
358 if (!state.doneReceiving)
359 virgin->sendSinkNeed();
360 }
361}
362
363void ICAPModXact::handleCommWroteBody()
364{
365 writeMore();
366}
367
368void ICAPModXact::stopWriting()
369{
370 if (state.writing == State::writingDone)
371 return;
372
373 debugs(93, 7, "ICAPModXact will no longer write " << status());
374
375 state.writing = State::writingDone;
376
377 virginWriteClaim.disable();
378
379 virginConsume();
380
381 // Comm does not have an interface to clear the writer, but
382 // writeMore() will not write if our write callback is called
383 // when state.writing == State::writingDone;
384}
385
386void ICAPModXact::stopBackup()
387{
388 if (!virginSendClaim.active())
389 return;
390
391 debugs(93, 7, "ICAPModXact will no longer backup " << status());
392
393 virginSendClaim.disable();
394
395 virginConsume();
396}
397
398bool ICAPModXact::doneAll() const
399{
400 return ICAPXaction::doneAll() && !state.serviceWaiting &&
401 state.doneReceiving && doneSending() &&
402 doneReading() && state.doneWriting();
403}
404
405void ICAPModXact::startReading()
406{
407 Must(connection >= 0);
408 Must(!reader);
409 Must(adapted.getRaw());
410 Must(adapted->data);
411 Must(adapted->data->body);
412
413 // we use the same buffer for headers and body and then consume headers
414 readMore();
415}
416
417void ICAPModXact::readMore()
418{
419 if (reader || doneReading())
420 return;
421
422 // do not fill readBuf if we have no space to store the result
423 if (!adapted->data->body->hasPotentialSpace())
424 return;
425
426 if (readBuf.hasSpace())
427 scheduleRead();
428}
429
430// comm module read a portion of the ICAP response for us
431void ICAPModXact::handleCommRead(size_t)
432{
433 Must(!state.doneParsing());
434 parseMore();
435 readMore();
436}
437
438void ICAPModXact::echoMore()
439{
440 Must(state.sending == State::sendingVirgin);
441 Must(virginSendClaim.active());
442
443 MemBuf &from = *virgin->data->body;
444 MemBuf &to = *adapted->data->body;
445
446 const size_t sizeMax = claimSize(virginSendClaim);
447 const size_t size = XMIN(static_cast<size_t>(to.potentialSpaceSize()),
448 sizeMax);
449 debugs(93, 5, "ICAPModXact echos " << size << " out of " << sizeMax <<
450 " bytes");
451
452 if (size > 0) {
453 to.append(claimContent(virginSendClaim), size);
454 virginSendClaim.release(size);
455 virginConsume();
456 adapted->sendSourceProgress();
457 }
458
459 if (!from.hasContent() && state.doneReceiving) {
460 debugs(93, 5, "ICAPModXact echoed all " << status());
461 stopSending(true);
462 } else {
463 debugs(93, 5, "ICAPModXact has " << from.contentSize() << " bytes " <<
464 "and expects more to echo " << status());
465 virgin->sendSinkNeed(); // TODO: timeout if sink is broken
466 }
467}
468
469bool ICAPModXact::doneSending() const
470{
471 Must((state.sending == State::sendingDone) == (!adapted));
472 return state.sending == State::sendingDone;
473}
474
475void ICAPModXact::stopSending(bool nicely)
476{
477 if (doneSending())
478 return;
479
480 if (state.sending != State::sendingUndecided) {
481 debugs(93, 7, "ICAPModXact will no longer send " << status());
482
483 if (nicely)
484 adapted->sendSourceFinish();
485 else
486 adapted->sendSourceAbort();
487 } else {
488 debugs(93, 7, "ICAPModXact will not start sending " << status());
489 adapted->sendSourceAbort(); // or the sink may wait forever
490 }
491
492 state.sending = State::sendingDone;
493
494 /*
495 * Note on adapted->data->header: we created the header, but allow the
bab98cf3 496 * other side (ICAPClientRespmodPrecache) to take control of it. We won't touch it here
774c051c 497 * and instead rely on the Anchor-side to make sure it is properly freed.
498 */
499 adapted = NULL; // refcounted
500}
501
502void ICAPModXact::stopReceiving()
503{
504 // stopSending NULLifies adapted but we do not NULLify virgin.
505 // This is assymetric because we want to keep virgin->data even
506 // though we are not expecting any more virgin->data->body.
507 // TODO: can we cache just the needed headers info instead?
508
509 // If they closed first, there is not point (or means) to notify them.
510
511 if (state.doneReceiving)
512 return;
513
514 // There is no sendSinkFinished() to notify the other side.
515 debugs(93, 7, "ICAPModXact will not receive " << status());
516
517 state.doneReceiving = true;
518}
519
520void ICAPModXact::parseMore()
521{
522 debugs(93, 5, "have " << readBuf.contentSize() << " bytes to parse" <<
523 status());
524
525 if (state.parsingHeaders())
526 parseHeaders();
527
528 if (state.parsing == State::psBody)
529 parseBody();
530}
531
532// note that allocation for echoing is done in handle204NoContent()
533void ICAPModXact::maybeAllocateHttpMsg()
534{
535 if (adapted->data->header) // already allocated
536 return;
537
538 if (gotEncapsulated("res-hdr")) {
539 adapted->data->header = new HttpReply;
540 } else if (gotEncapsulated("req-hdr")) {
541 adapted->data->header = new HttpRequest;
542 } else
543 throw TexcHere("Neither res-hdr nor req-hdr in maybeAllocateHttpMsg()");
544}
545
546void ICAPModXact::parseHeaders()
547{
548 Must(state.parsingHeaders());
549
550 if (state.parsing == State::psIcapHeader)
551 parseIcapHead();
552
553 if (state.parsing == State::psHttpHeader)
554 parseHttpHead();
555
556 if (state.parsingHeaders()) { // need more data
557 Must(mayReadMore());
558 return;
559 }
560
561 adapted->sendSourceStart();
562
563 if (state.sending == State::sendingVirgin)
564 echoMore();
565}
566
567void ICAPModXact::parseIcapHead()
568{
569 Must(state.sending == State::sendingUndecided);
570
571 if (!parseHead(icapReply))
572 return;
573
574 switch (icapReply->sline.status) {
575
576 case 100:
577 handle100Continue();
578 break;
579
580 case 200:
581 handle200Ok();
582 break;
583
584 case 204:
585 handle204NoContent();
586 break;
587
588 default:
589 handleUnknownScode();
590 break;
591 }
592
593 // handle100Continue() manages state.writing on its own.
594 // Non-100 status means the server needs no postPreview data from us.
595 if (state.writing == State::writingPaused)
596 stopWriting();
597
598 // TODO: Consider applying a Squid 2.5 patch to recognize 201 responses
599}
600
601void ICAPModXact::handle100Continue()
602{
603 Must(state.writing == State::writingPaused);
604 Must(preview.enabled() && preview.done() && !preview.ieof());
605 Must(virginSendClaim.active());
606
607 if (virginSendClaim.limited()) // preview only
608 stopBackup();
609
610 state.parsing = State::psHttpHeader; // eventually
611
612 state.writing = State::writingPrime;
613
614 writeMore();
615}
616
617void ICAPModXact::handle200Ok()
618{
619 state.parsing = State::psHttpHeader;
620 state.sending = State::sendingAdapted;
621 stopBackup();
622}
623
624void ICAPModXact::handle204NoContent()
625{
626 stopParsing();
627 Must(virginSendClaim.active());
628 virginSendClaim.protectAll(); // extends protection if needed
629 state.sending = State::sendingVirgin;
630
631 // We want to clone the HTTP message, but we do not want
632 // to copy non-HTTP state parts that HttpMsg kids carry in them.
633 // Thus, we cannot use a smart pointer, copy constructor, or equivalent.
634 // Instead, we simply write the HTTP message and "clone" it by parsing.
635
636 HttpMsg *oldHead = virgin->data->header;
637 debugs(93, 7, "ICAPModXact cloning virgin message " << oldHead);
638
639 MemBuf httpBuf;
640
641 // write the virgin message into a memory buffer
642 httpBuf.init();
643 packHead(httpBuf, oldHead);
644
645 // allocate the adapted message
646 HttpMsg *&newHead = adapted->data->header;
647 Must(!newHead);
648
649 if (dynamic_cast<const HttpRequest*>(oldHead))
650 newHead = new HttpRequest;
651 else
652 if (dynamic_cast<const HttpReply*>(oldHead))
653 newHead = new HttpReply;
654
655 Must(newHead);
656
657 // parse the buffer back
658 http_status error = HTTP_STATUS_NONE;
659
660 Must(newHead->parse(&httpBuf, true, &error));
661
662 Must(newHead->hdr_sz == httpBuf.contentSize()); // no leftovers
663
664 httpBuf.clean();
665
666 debugs(93, 7, "ICAPModXact cloned virgin message " << oldHead << " to " << newHead);
667}
668
669void ICAPModXact::handleUnknownScode()
670{
671 stopParsing();
672 stopBackup();
673 // TODO: mark connection as "bad"
674
675 // Terminate the transaction; we do not know how to handle this response.
676 throw TexcHere("Unsupported ICAP status code");
677}
678
679void ICAPModXact::parseHttpHead()
680{
681 if (gotEncapsulated("res-hdr") || gotEncapsulated("req-hdr")) {
682 maybeAllocateHttpMsg();
683
684 if (!parseHead(adapted->data->header))
685 return;
686 }
687
688 state.parsing = State::psBody;
689}
690
691bool ICAPModXact::parseHead(HttpMsg *head)
692{
693 assert(head);
694 debugs(93, 5, "have " << readBuf.contentSize() << " head bytes to parse" <<
695 "; state: " << state.parsing);
696
697 http_status error = HTTP_STATUS_NONE;
698 const bool parsed = head->parse(&readBuf, commEof, &error);
699 Must(parsed || !error); // success or need more data
700
701 if (!parsed) { // need more data
702 head->reset();
703 return false;
704 }
705
706 readBuf.consume(head->hdr_sz);
707 return true;
708}
709
710void ICAPModXact::parseBody()
711{
712 Must(state.parsing == State::psBody);
713
714 debugs(93, 5, "have " << readBuf.contentSize() << " body bytes to parse");
715
716 if (gotEncapsulated("res-body")) {
717 if (!parsePresentBody()) // need more body data
718 return;
719 } else {
720 debugs(93, 5, "not expecting a body");
721 }
722
723 stopParsing();
724 stopSending(true);
725}
726
727// returns true iff complete body was parsed
728bool ICAPModXact::parsePresentBody()
729{
730 if (!bodyParser)
731 bodyParser = new ChunkedCodingParser;
732
733 // the parser will throw on errors
734 const bool parsed = bodyParser->parse(&readBuf, adapted->data->body);
735
736 adapted->sendSourceProgress(); // TODO: do not send if parsed nothing
737
738 debugs(93, 5, "have " << readBuf.contentSize() << " body bytes after " <<
739 "parse; parsed all: " << parsed);
740
741 if (parsed)
742 return true;
743
744 if (bodyParser->needsMoreData())
745 Must(mayReadMore());
746
747 if (bodyParser->needsMoreSpace()) {
748 Must(!doneSending()); // can hope for more space
749 Must(adapted->data->body->hasContent()); // paranoid
750 // TODO: there should be a timeout in case the sink is broken.
751 }
752
753 return false;
754}
755
756void ICAPModXact::stopParsing()
757{
758 if (state.parsing == State::psDone)
759 return;
760
761 debugs(93, 7, "ICAPModXact will no longer parse " << status());
762
763 delete bodyParser;
764
765 bodyParser = NULL;
766
767 state.parsing = State::psDone;
768}
769
770// HTTP side added virgin body data
771void ICAPModXact::noteSourceProgress(MsgPipe *p)
772{
773 ICAPXaction_Enter(noteSourceProgress);
774
775 Must(!state.doneReceiving);
776 writeMore();
777
778 if (state.sending == State::sendingVirgin)
779 echoMore();
780
781 ICAPXaction_Exit();
782}
783
784// HTTP side sent us all virgin info
785void ICAPModXact::noteSourceFinish(MsgPipe *p)
786{
787 ICAPXaction_Enter(noteSourceFinish);
788
789 Must(!state.doneReceiving);
790 stopReceiving();
791
792 // push writer and sender in case we were waiting for the last-chunk
793 writeMore();
794
795 if (state.sending == State::sendingVirgin)
796 echoMore();
797
798 ICAPXaction_Exit();
799}
800
801// HTTP side is aborting
802void ICAPModXact::noteSourceAbort(MsgPipe *p)
803{
804 ICAPXaction_Enter(noteSourceAbort);
805
806 Must(!state.doneReceiving);
807 stopReceiving();
808 mustStop("HTTP source quit");
809
810 ICAPXaction_Exit();
811}
812
813// HTTP side wants more adapted data and possibly freed some buffer space
814void ICAPModXact::noteSinkNeed(MsgPipe *p)
815{
816 ICAPXaction_Enter(noteSinkNeed);
817
818 if (state.sending == State::sendingVirgin)
819 echoMore();
820 else
821 if (state.sending == State::sendingAdapted)
822 parseMore();
823 else
824 Must(state.sending == State::sendingUndecided);
825
826 ICAPXaction_Exit();
827}
828
829// HTTP side aborted
830void ICAPModXact::noteSinkAbort(MsgPipe *p)
831{
832 ICAPXaction_Enter(noteSinkAbort);
833
834 mustStop("HTTP sink quit");
835
836 ICAPXaction_Exit();
837}
838
839// internal cleanup
840void ICAPModXact::doStop()
841{
842 ICAPXaction::doStop();
843
844 stopWriting();
845 stopBackup();
846
847 if (icapReply) {
848 delete icapReply;
849 icapReply = NULL;
850 }
851
852 stopSending(false);
853
854 // see stopReceiving() for reasons it cannot NULLify virgin there
855
856 if (virgin != NULL) {
857 if (!state.doneReceiving)
858 virgin->sendSinkAbort();
859 else
860 virgin->sink = NULL;
861
862 virgin = NULL; // refcounted
863 }
864
865 if (self != NULL) {
866 Pointer s = self;
867 self = NULL;
868 ICAPNoteXactionDone(s);
869 /* this object may be destroyed when 's' is cleared */
870 }
871}
872
873void ICAPModXact::makeRequestHeaders(MemBuf &buf)
874{
875 const ICAPServiceRep &s = service();
876 buf.Printf("%s %s ICAP/1.0\r\n", s.methodStr(), s.uri.buf());
877 buf.Printf("Host: %s:%d\r\n", s.host.buf(), s.port);
878 buf.Printf("Encapsulated: ");
879
880 MemBuf httpBuf;
881 httpBuf.init();
882
883 // build HTTP request header, if any
884 ICAP::Method m = s.method;
885
886 if (ICAP::methodRespmod == m && virgin->data->cause)
887 encapsulateHead(buf, "req-hdr", httpBuf, virgin->data->cause);
888 else if (ICAP::methodReqmod == m)
889 encapsulateHead(buf, "req-hdr", httpBuf, virgin->data->header);
890
891 if (ICAP::methodRespmod == m)
892 if (const MsgPipeData::Header *prime = virgin->data->header)
893 encapsulateHead(buf, "res-hdr", httpBuf, prime);
894
895 if (!virginBody.expected())
1dd6edf2 896 buf.Printf("null-body=%d", (int) httpBuf.contentSize());
774c051c 897 else if (ICAP::methodReqmod == m)
1dd6edf2 898 buf.Printf("req-body=%d", (int) httpBuf.contentSize());
774c051c 899 else
1dd6edf2 900 buf.Printf("res-body=%d", (int) httpBuf.contentSize());
774c051c 901
902 buf.append(ICAP::crlf, 2); // terminate Encapsulated line
903
904 if (shouldPreview()) {
905 buf.Printf("Preview: %d\r\n", (int)preview.ad());
906 virginSendClaim.protectUpTo(preview.ad());
907 }
908
909 if (shouldAllow204()) {
910 buf.Printf("Allow: 204\r\n");
911 // be robust: do not rely on the expected body size
912 virginSendClaim.protectAll();
913 }
914
a97e82a8 915 const HttpRequest *request = virgin->data->cause ?
916 virgin->data->cause :
917 dynamic_cast<const HttpRequest*>(virgin->data->header);
918
919 if (request->client_addr.s_addr != any_addr.s_addr)
920 buf.Printf("X-Client-IP: %s\r\n", inet_ntoa(request->client_addr));
921
922 if (request->auth_user_request)
923 if (request->auth_user_request->username())
924 buf.Printf("X-Client-Username: %s\r\n", request->auth_user_request->username());
925
926 fprintf(stderr, "%s\n", buf.content());
927
774c051c 928 buf.append(ICAP::crlf, 2); // terminate ICAP header
929
930 // start ICAP request body with encapsulated HTTP headers
931 buf.append(httpBuf.content(), httpBuf.contentSize());
932
933 httpBuf.clean();
934}
935
936void ICAPModXact::encapsulateHead(MemBuf &icapBuf, const char *section, MemBuf &httpBuf, const HttpMsg *head)
937{
938 // update ICAP header
1dd6edf2 939 icapBuf.Printf("%s=%d,", section, (int) httpBuf.contentSize());
774c051c 940
941 // pack HTTP head
942 packHead(httpBuf, head);
943}
944
945void ICAPModXact::packHead(MemBuf &httpBuf, const HttpMsg *head)
946{
947 Packer p;
948 packerToMemInit(&p, &httpBuf);
949 head->packInto(&p, true);
950 packerClean(&p);
951}
952
953// decides whether to offer a preview and calculates its size
954bool ICAPModXact::shouldPreview()
955{
956 size_t wantedSize;
957
958 if (!service().wantsPreview(wantedSize)) {
959 debugs(93, 5, "ICAPModXact should not offer preview");
960 return false;
961 }
962
963 Must(wantedSize >= 0);
964
965 // cannot preview more than we can backup
966 size_t ad = XMIN(wantedSize, TheBackupLimit);
967
968 if (virginBody.expected() && virginBody.knownSize())
969 ad = XMIN(ad, virginBody.size()); // not more than we have
970 else
971 ad = 0; // questionable optimization?
972
973 debugs(93, 5, "ICAPModXact should offer " << ad << "-byte preview " <<
974 "(service wanted " << wantedSize << ")");
975
976 preview.enable(ad);
977
978 return preview.enabled();
979}
980
981// decides whether to allow 204 responses
982bool ICAPModXact::shouldAllow204()
983{
984 if (!service().allows204())
985 return false;
986
987 if (!virginBody.expected())
988 return true; // no body means no problems with supporting 204s.
989
990 // if there is a body, make sure we can backup it all
991
992 if (!virginBody.knownSize())
993 return false;
994
995 // or should we have a different backup limit?
996 // note that '<' allows for 0-termination of the "full" backup buffer
997 return virginBody.size() < TheBackupLimit;
998}
999
1000// returns a temporary string depicting transaction status, for debugging
1001void ICAPModXact::fillPendingStatus(MemBuf &buf) const
1002{
1003 if (state.serviceWaiting)
1004 buf.append("U", 1);
1005
1006 if (!state.doneWriting() && state.writing != State::writingInit)
1007 buf.Printf("w(%d)", state.writing);
1008
1009 if (preview.enabled()) {
1010 if (!preview.done())
1dd6edf2 1011 buf.Printf("P(%d)", (int) preview.debt());
774c051c 1012 }
1013
1014 if (virginSendClaim.active())
1015 buf.append("B", 1);
1016
1017 if (!state.doneParsing() && state.parsing != State::psIcapHeader)
1018 buf.Printf("p(%d)", state.parsing);
1019
1020 if (!doneSending() && state.sending != State::sendingUndecided)
1021 buf.Printf("S(%d)", state.sending);
1022}
1023
1024void ICAPModXact::fillDoneStatus(MemBuf &buf) const
1025{
1026 if (state.doneReceiving)
1027 buf.append("R", 1);
1028
1029 if (state.doneWriting())
1030 buf.append("w", 1);
1031
1032 if (preview.enabled()) {
1033 if (preview.done())
1034 buf.Printf("P%s", preview.ieof() ? "(ieof)" : "");
1035 }
1036
1037 if (doneReading())
1038 buf.append("r", 1);
1039
1040 if (state.doneParsing())
1041 buf.append("p", 1);
1042
1043 if (doneSending())
1044 buf.append("S", 1);
1045}
1046
1047bool ICAPModXact::gotEncapsulated(const char *section) const
1048{
1049 return httpHeaderGetByNameListMember(&icapReply->header, "Encapsulated",
1050 section, ',').size() > 0;
1051}
1052
1053// calculate whether there is a virgin HTTP body and
1054// whether its expected size is known
1055void ICAPModXact::estimateVirginBody()
1056{
1057 // note: defaults should be fine but will disable previews and 204s
1058
1059 Must(virgin != NULL && virgin->data->header);
1060
1061 method_t method;
1062
1063 if (virgin->data->cause)
1064 method = virgin->data->cause->method;
1065 else
a97e82a8 1066 if (HttpRequest *req = dynamic_cast<HttpRequest*>(virgin->data->
1067 header))
774c051c 1068 method = req->method;
1069 else
1070 return;
1071
1072 ssize_t size;
1073 if (virgin->data->header->expectingBody(method, size)) {
1074 virginBody.expect(size)
1075 ;
1076 debugs(93, 6, "ICAPModXact expects virgin body; size: " << size);
1077 } else {
1078 debugs(93, 6, "ICAPModXact does not expect virgin body");
1079 }
1080}
1081
1082
1083// TODO: Move SizedEstimate, MemBufBackup, and ICAPPreview elsewhere
1084
1085SizedEstimate::SizedEstimate()
1086 : theData(dtUnexpected)
1087{}
1088
1089void SizedEstimate::expect(ssize_t aSize)
1090{
1091 theData = (aSize >= 0) ? aSize : (ssize_t)dtUnknown;
1092}
1093
1094bool SizedEstimate::expected() const
1095{
1096 return theData != dtUnexpected;
1097}
1098
1099bool SizedEstimate::knownSize() const
1100{
1101 Must(expected());
1102 return theData != dtUnknown;
1103}
1104
1105size_t SizedEstimate::size() const
1106{
1107 Must(knownSize());
1108 return static_cast<size_t>(theData);
1109}
1110
1111
1112
1113MemBufClaim::MemBufClaim(): theStart(-1), theGoal(-1)
1114{}
1115
1116void MemBufClaim::protectAll()
1117{
1118 if (theStart < 0)
1119 theStart = 0;
1120
1121 theGoal = -1; // no specific goal
1122}
1123
1124void MemBufClaim::protectUpTo(size_t aGoal)
1125{
1126 if (theStart < 0)
1127 theStart = 0;
1128
1129 Must(aGoal >= 0);
1130
1131 theGoal = (theGoal < 0) ? static_cast<ssize_t>(aGoal) :
1132 XMIN(static_cast<ssize_t>(aGoal), theGoal);
1133}
1134
1135void MemBufClaim::disable()
1136{
1137 theStart = -1;
1138}
1139
1140void MemBufClaim::release(size_t size)
1141{
1142 Must(active());
1143 Must(size >= 0);
1144 theStart += static_cast<ssize_t>(size);
1145
1146 if (limited() && theStart >= theGoal)
1147 disable();
1148}
1149
1150size_t MemBufClaim::offset() const
1151{
1152 Must(active());
1153 return static_cast<size_t>(theStart);
1154}
1155
1156bool MemBufClaim::limited() const
1157{
1158 Must(active());
1159 return theGoal >= 0;
1160}
1161
1162
1163ICAPPreview::ICAPPreview(): theWritten(0), theAd(0), theState(stDisabled)
1164{}
1165
1166void ICAPPreview::enable(size_t anAd)
1167{
1168 // TODO: check for anAd not exceeding preview size limit
1169 Must(anAd >= 0);
1170 Must(!enabled());
1171 theAd = anAd;
1172 theState = stWriting;
1173}
1174
1175bool ICAPPreview::enabled() const
1176{
1177 return theState != stDisabled;
1178}
1179
1180size_t ICAPPreview::ad() const
1181{
1182 Must(enabled());
1183 return theAd;
1184}
1185
1186bool ICAPPreview::done() const
1187{
1188 Must(enabled());
1189 return theState >= stIeof;
1190}
1191
1192bool ICAPPreview::ieof() const
1193{
1194 Must(enabled());
1195 return theState == stIeof;
1196}
1197
1198size_t ICAPPreview::debt() const
1199{
1200 Must(enabled());
1201 return done() ? 0 : (theAd - theWritten);
1202}
1203
1204void ICAPPreview::wrote(size_t size, bool sawEof)
1205{
1206 Must(enabled());
1207 theWritten += size;
1208
1209 if (theWritten >= theAd)
1210 theState = stDone; // sawEof is irrelevant
1211 else
1212 if (sawEof)
1213 theState = stIeof;
1214}
1215