]>
git.ipfire.org Git - thirdparty/squid.git/blob - src/ICAP/ICAPModXact.cc
e714eaaacb4e662e210a30869ba219b497c85529
2 * DEBUG: section 93 ICAP (RFC 3507) Client
8 #include "HttpRequest.h"
10 #include "adaptation/Initiator.h"
11 #include "ICAPServiceRep.h"
12 #include "ICAPLauncher.h"
13 #include "ICAPModXact.h"
14 #include "ICAPClient.h"
15 #include "ChunkedCodingParser.h"
16 #include "TextException.h"
17 #include "AuthUserRequest.h"
18 #include "ICAPConfig.h"
19 #include "SquidTime.h"
21 // flow and terminology:
22 // HTTP| --> receive --> encode --> write --> |network
23 // end | <-- send <-- parse <-- read <-- |end
25 // TODO: replace gotEncapsulated() with something faster; we call it often
27 CBDATA_CLASS_INIT(ICAPModXact
);
28 CBDATA_CLASS_INIT(ICAPModXactLauncher
);
30 static const size_t TheBackupLimit
= BodyPipe::MaxCapacity
;
32 extern ICAPConfig TheICAPConfig
;
35 ICAPModXact::State::State()
37 memset(this, 0, sizeof(*this));
40 ICAPModXact::ICAPModXact(Adaptation::Initiator
*anInitiator
, HttpMsg
*virginHeader
,
41 HttpRequest
*virginCause
, ICAPServiceRep::Pointer
&aService
):
42 AsyncJob("ICAPModXact"),
43 ICAPXaction("ICAPModXact", anInitiator
, aService
),
47 canStartBypass(false) // too early
51 virgin
.setHeader(virginHeader
); // sets virgin.body_pipe if needed
52 virgin
.setCause(virginCause
); // may be NULL
54 // adapted header and body are initialized when we parse them
56 // writing and reading ends are handled by ICAPXaction
59 // nothing to do because we are using temporary buffers
62 icapReply
= new HttpReply
;
63 icapReply
->protoPrefix
= "ICAP/"; // TODO: make an IcapReply class?
65 debugs(93,7, "ICAPModXact initialized." << status());
68 // initiator wants us to start
69 void ICAPModXact::start()
73 estimateVirginBody(); // before virgin disappears!
75 canStartBypass
= service().cfg().bypass
;
77 // it is an ICAP violation to send request to a service w/o known OPTIONS
85 void ICAPModXact::waitForService()
87 Must(!state
.serviceWaiting
);
88 debugs(93, 7, "ICAPModXact will wait for the ICAP service" << status());
89 state
.serviceWaiting
= true;
90 AsyncCall::Pointer call
= asyncCall(93,5, "ICAPModXact::noteServiceReady",
91 MemFun(this, &ICAPModXact::noteServiceReady
));
92 service().callWhenReady(call
);
95 void ICAPModXact::noteServiceReady()
97 Must(state
.serviceWaiting
);
98 state
.serviceWaiting
= false;
100 if (service().up()) {
104 throw TexcHere("ICAP service is unusable");
108 void ICAPModXact::startWriting()
110 state
.writing
= State::writingConnect
;
112 decideOnPreview(); // must be decided before we decideOnRetries
118 // connection with the ICAP service established
119 void ICAPModXact::handleCommConnected()
121 Must(state
.writing
== State::writingConnect
);
123 startReading(); // wait for early errors from the ICAP server
128 makeRequestHeaders(requestBuf
);
129 debugs(93, 9, "ICAPModXact ICAP will write" << status() << ":\n" <<
130 (requestBuf
.terminate(), requestBuf
.content()));
133 state
.writing
= State::writingHeaders
;
134 scheduleWrite(requestBuf
);
137 void ICAPModXact::handleCommWrote(size_t sz
)
139 debugs(93, 5, HERE
<< "Wrote " << sz
<< " bytes");
141 if (state
.writing
== State::writingHeaders
)
142 handleCommWroteHeaders();
144 handleCommWroteBody();
147 void ICAPModXact::handleCommWroteHeaders()
149 Must(state
.writing
== State::writingHeaders
);
151 // determine next step
152 if (preview
.enabled())
153 state
.writing
= preview
.done() ? State::writingPaused
: State::writingPreview
;
155 if (virginBody
.expected())
156 state
.writing
= State::writingPrime
;
165 void ICAPModXact::writeMore()
167 debugs(93, 5, HERE
<< "checking whether to write more" << status());
169 if (writer
!= NULL
) // already writing something
172 switch (state
.writing
) {
174 case State::writingInit
: // waiting for service OPTIONS
175 Must(state
.serviceWaiting
);
177 case State::writingConnect
: // waiting for the connection to establish
179 case State::writingHeaders
: // waiting for the headers to be written
181 case State::writingPaused
: // waiting for the ICAP server response
183 case State::writingReallyDone
: // nothing more to write
186 case State::writingAlmostDone
: // was waiting for the last write
190 case State::writingPreview
:
194 case State::writingPrime
:
199 throw TexcHere("ICAPModXact in bad writing state");
203 void ICAPModXact::writePreviewBody()
205 debugs(93, 8, HERE
<< "will write Preview body from " <<
206 virgin
.body_pipe
<< status());
207 Must(state
.writing
== State::writingPreview
);
208 Must(virgin
.body_pipe
!= NULL
);
210 const size_t sizeMax
= (size_t)virgin
.body_pipe
->buf().contentSize();
211 const size_t size
= XMIN(preview
.debt(), sizeMax
);
212 writeSomeBody("preview body", size
);
214 // change state once preview is written
216 if (preview
.done()) {
217 debugs(93, 7, "ICAPModXact wrote entire Preview body" << status());
222 state
.writing
= State::writingPaused
;
226 void ICAPModXact::writePrimeBody()
228 Must(state
.writing
== State::writingPrime
);
229 Must(virginBodyWriting
.active());
231 const size_t size
= (size_t)virgin
.body_pipe
->buf().contentSize();
232 writeSomeBody("prime virgin body", size
);
234 if (virginBodyEndReached(virginBodyWriting
)) {
235 debugs(93, 5, HERE
<< "wrote entire body");
240 void ICAPModXact::writeSomeBody(const char *label
, size_t size
)
242 Must(!writer
&& state
.writing
< state
.writingAlmostDone
);
243 Must(virgin
.body_pipe
!= NULL
);
244 debugs(93, 8, HERE
<< "will write up to " << size
<< " bytes of " <<
247 MemBuf writeBuf
; // TODO: suggest a min size based on size and lastChunk
249 writeBuf
.init(); // note: we assume that last-chunk will fit
251 const size_t writableSize
= virginContentSize(virginBodyWriting
);
252 const size_t chunkSize
= XMIN(writableSize
, size
);
255 debugs(93, 7, HERE
<< "will write " << chunkSize
<<
256 "-byte chunk of " << label
);
258 openChunk(writeBuf
, chunkSize
, false);
259 writeBuf
.append(virginContentData(virginBodyWriting
), chunkSize
);
260 closeChunk(writeBuf
);
262 virginBodyWriting
.progress(chunkSize
);
265 debugs(93, 7, "ICAPModXact has no writable " << label
<< " content");
268 const bool wroteEof
= virginBodyEndReached(virginBodyWriting
);
269 bool lastChunk
= wroteEof
;
270 if (state
.writing
== State::writingPreview
) {
271 preview
.wrote(chunkSize
, wroteEof
); // even if wrote nothing
272 lastChunk
= lastChunk
|| preview
.done();
276 debugs(93, 8, HERE
<< "will write last-chunk of " << label
);
277 addLastRequestChunk(writeBuf
);
280 debugs(93, 7, HERE
<< "will write " << writeBuf
.contentSize()
281 << " raw bytes of " << label
);
283 if (writeBuf
.hasContent()) {
284 scheduleWrite(writeBuf
); // comm will free the chunk
290 void ICAPModXact::addLastRequestChunk(MemBuf
&buf
)
292 const bool ieof
= state
.writing
== State::writingPreview
&& preview
.ieof();
293 openChunk(buf
, 0, ieof
);
297 void ICAPModXact::openChunk(MemBuf
&buf
, size_t chunkSize
, bool ieof
)
299 buf
.Printf((ieof
? "%x; ieof\r\n" : "%x\r\n"), (int) chunkSize
);
302 void ICAPModXact::closeChunk(MemBuf
&buf
)
304 buf
.append(ICAP::crlf
, 2); // chunk-terminating CRLF
307 // did the activity reached the end of the virgin body?
308 bool ICAPModXact::virginBodyEndReached(const VirginBodyAct
&act
) const
311 !act
.active() || // did all (assuming it was originally planned)
312 !virgin
.body_pipe
->expectMoreAfter(act
.offset()); // wont have more
315 // the size of buffered virgin body data available for the specified activity
316 // if this size is zero, we may be done or may be waiting for more data
317 size_t ICAPModXact::virginContentSize(const VirginBodyAct
&act
) const
320 // asbolute start of unprocessed data
321 const uint64_t start
= act
.offset();
322 // absolute end of buffered data
323 const uint64_t end
= virginConsumed
+ virgin
.body_pipe
->buf().contentSize();
324 Must(virginConsumed
<= start
&& start
<= end
);
325 return static_cast<size_t>(end
- start
);
328 // pointer to buffered virgin body data available for the specified activity
329 const char *ICAPModXact::virginContentData(const VirginBodyAct
&act
) const
332 const uint64_t start
= act
.offset();
333 Must(virginConsumed
<= start
);
334 return virgin
.body_pipe
->buf().content() + static_cast<size_t>(start
-virginConsumed
);
337 void ICAPModXact::virginConsume()
339 debugs(93, 9, "consumption guards: " << !virgin
.body_pipe
<< isRetriable
);
341 if (!virgin
.body_pipe
)
342 return; // nothing to consume
345 return; // do not consume if we may have to retry later
347 BodyPipe
&bp
= *virgin
.body_pipe
;
349 // Why > 2? HttpState does not use the last bytes in the buffer
350 // because delayAwareRead() is arguably broken. See
351 // HttpStateData::maybeReadVirginBody for more details.
352 if (canStartBypass
&& bp
.buf().spaceSize() > 2) {
353 // Postponing may increase memory footprint and slow the HTTP side
354 // down. Not postponing may increase the number of ICAP errors
355 // if the ICAP service fails. We may also use "potential" space to
356 // postpone more aggressively. Should the trade-off be configurable?
357 debugs(93, 8, HERE
<< "postponing consumption from " << bp
.status());
361 const size_t have
= static_cast<size_t>(bp
.buf().contentSize());
362 const uint64_t end
= virginConsumed
+ have
;
363 uint64_t offset
= end
;
365 debugs(93, 9, HERE
<< "max virgin consumption offset=" << offset
<<
366 " acts " << virginBodyWriting
.active() << virginBodySending
.active() <<
367 " consumed=" << virginConsumed
<<
368 " from " << virgin
.body_pipe
->status());
370 if (virginBodyWriting
.active())
371 offset
= XMIN(virginBodyWriting
.offset(), offset
);
373 if (virginBodySending
.active())
374 offset
= XMIN(virginBodySending
.offset(), offset
);
376 Must(virginConsumed
<= offset
&& offset
<= end
);
378 if (const size_t size
= static_cast<size_t>(offset
- virginConsumed
)) {
379 debugs(93, 8, HERE
<< "consuming " << size
<< " out of " << have
<<
380 " virgin body bytes");
382 virginConsumed
+= size
;
383 Must(!isRetriable
); // or we should not be consuming
384 disableBypass("consumed content");
388 void ICAPModXact::handleCommWroteBody()
393 // Called when we do not expect to call comm_write anymore.
394 // We may have a pending write though.
395 // If stopping nicely, we will just wait for that pending write, if any.
396 void ICAPModXact::stopWriting(bool nicely
)
398 if (state
.writing
== State::writingReallyDone
)
401 if (writer
!= NULL
) {
403 debugs(93, 7, HERE
<< "will wait for the last write" << status());
404 state
.writing
= State::writingAlmostDone
; // may already be set
408 debugs(93, 3, HERE
<< "will NOT wait for the last write" << status());
410 // Comm does not have an interface to clear the writer callback nicely,
411 // but without clearing the writer we cannot recycle the connection.
412 // We prevent connection reuse and hope that we can handle a callback
413 // call at any time, usually in the middle of the destruction sequence!
414 // Somebody should add comm_remove_write_handler() to comm API.
415 reuseConnection
= false;
416 ignoreLastWrite
= true;
419 debugs(93, 7, HERE
<< "will no longer write" << status());
420 if (virginBodyWriting
.active()) {
421 virginBodyWriting
.disable();
424 state
.writing
= State::writingReallyDone
;
428 void ICAPModXact::stopBackup()
430 if (!virginBodySending
.active())
433 debugs(93, 7, "ICAPModXact will no longer backup" << status());
434 virginBodySending
.disable();
438 bool ICAPModXact::doneAll() const
440 return ICAPXaction::doneAll() && !state
.serviceWaiting
&&
442 doneReading() && state
.doneWriting();
445 void ICAPModXact::startReading()
447 Must(connection
>= 0);
449 Must(!adapted
.header
);
450 Must(!adapted
.body_pipe
);
452 // we use the same buffer for headers and body and then consume headers
456 void ICAPModXact::readMore()
458 if (reader
!= NULL
|| doneReading()) {
459 debugs(93,3,HERE
<< "returning from readMore because reader or doneReading()");
463 // do not fill readBuf if we have no space to store the result
464 if (adapted
.body_pipe
!= NULL
&&
465 !adapted
.body_pipe
->buf().hasPotentialSpace()) {
466 debugs(93,3,HERE
<< "not reading because ICAP reply pipe is full");
470 if (readBuf
.hasSpace())
473 debugs(93,3,HERE
<< "nothing to do because !readBuf.hasSpace()");
476 // comm module read a portion of the ICAP response for us
477 void ICAPModXact::handleCommRead(size_t)
479 Must(!state
.doneParsing());
484 void ICAPModXact::echoMore()
486 Must(state
.sending
== State::sendingVirgin
);
487 Must(adapted
.body_pipe
!= NULL
);
488 Must(virginBodySending
.active());
490 const size_t sizeMax
= virginContentSize(virginBodySending
);
491 debugs(93,5, HERE
<< "will echo up to " << sizeMax
<< " bytes from " <<
492 virgin
.body_pipe
->status());
493 debugs(93,5, HERE
<< "will echo up to " << sizeMax
<< " bytes to " <<
494 adapted
.body_pipe
->status());
497 const size_t size
= adapted
.body_pipe
->putMoreData(virginContentData(virginBodySending
), sizeMax
);
498 debugs(93,5, HERE
<< "echoed " << size
<< " out of " << sizeMax
<<
500 virginBodySending
.progress(size
);
502 disableBypass("echoed content");
505 if (virginBodyEndReached(virginBodySending
)) {
506 debugs(93, 5, "ICAPModXact echoed all" << status());
509 debugs(93, 5, "ICAPModXact has " <<
510 virgin
.body_pipe
->buf().contentSize() << " bytes " <<
511 "and expects more to echo" << status());
512 // TODO: timeout if virgin or adapted pipes are broken
516 bool ICAPModXact::doneSending() const
518 return state
.sending
== State::sendingDone
;
521 // stop (or do not start) sending adapted message body
522 void ICAPModXact::stopSending(bool nicely
)
527 if (state
.sending
!= State::sendingUndecided
) {
528 debugs(93, 7, "ICAPModXact will no longer send" << status());
529 if (adapted
.body_pipe
!= NULL
) {
530 virginBodySending
.disable();
531 // we may leave debts if we were echoing and the virgin
532 // body_pipe got exhausted before we echoed all planned bytes
533 const bool leftDebts
= adapted
.body_pipe
->needsMoreData();
534 stopProducingFor(adapted
.body_pipe
, nicely
&& !leftDebts
);
537 debugs(93, 7, "ICAPModXact will not start sending" << status());
538 Must(!adapted
.body_pipe
);
541 state
.sending
= State::sendingDone
;
545 // should be called after certain state.writing or state.sending changes
546 void ICAPModXact::checkConsuming()
548 // quit if we already stopped or are still using the pipe
549 if (!virgin
.body_pipe
|| !state
.doneConsumingVirgin())
552 debugs(93, 7, HERE
<< "will stop consuming" << status());
553 stopConsumingFrom(virgin
.body_pipe
);
556 void ICAPModXact::parseMore()
558 debugs(93, 5, HERE
<< "have " << readBuf
.contentSize() << " bytes to parse" <<
560 debugs(93, 5, HERE
<< "\n" << readBuf
.content());
562 if (state
.parsingHeaders())
565 if (state
.parsing
== State::psBody
)
569 void ICAPModXact::callException(const std::exception
&e
)
571 if (!canStartBypass
|| isRetriable
) {
572 ICAPXaction::callException(e
);
577 debugs(93, 3, "bypassing ICAPModXact::" << inCall
<< " exception: " <<
578 e
.what() << ' ' << status());
580 } catch (const std::exception
&bypassE
) {
581 ICAPXaction::callException(bypassE
);
585 void ICAPModXact::bypassFailure()
587 disableBypass("already started to bypass");
589 Must(!isRetriable
); // or we should not be bypassing
595 // end all activities associated with the ICAP server
599 stopWriting(true); // or should we force it?
600 if (connection
>= 0) {
601 reuseConnection
= false; // be conservative
602 cancelRead(); // may not work; and we cannot stop connecting either
604 debugs(93, 7, "Warning: bypass failed to stop I/O" << status());
608 void ICAPModXact::disableBypass(const char *reason
)
610 if (canStartBypass
) {
611 debugs(93,7, HERE
<< "will never start bypass because " << reason
);
612 canStartBypass
= false;
618 // note that allocation for echoing is done in handle204NoContent()
619 void ICAPModXact::maybeAllocateHttpMsg()
621 if (adapted
.header
) // already allocated
624 if (gotEncapsulated("res-hdr")) {
625 adapted
.setHeader(new HttpReply
);
626 } else if (gotEncapsulated("req-hdr")) {
627 adapted
.setHeader(new HttpRequest
);
629 throw TexcHere("Neither res-hdr nor req-hdr in maybeAllocateHttpMsg()");
632 void ICAPModXact::parseHeaders()
634 Must(state
.parsingHeaders());
636 if (state
.parsing
== State::psIcapHeader
) {
637 debugs(93, 5, HERE
<< "parse ICAP headers");
641 if (state
.parsing
== State::psHttpHeader
) {
642 debugs(93, 5, HERE
<< "parse HTTP headers");
646 if (state
.parsingHeaders()) { // need more data
654 // called after parsing all headers or when bypassing an exception
655 void ICAPModXact::startSending()
657 disableBypass("sent headers");
658 sendAnswer(adapted
.header
);
660 if (state
.sending
== State::sendingVirgin
)
664 void ICAPModXact::parseIcapHead()
666 Must(state
.sending
== State::sendingUndecided
);
668 if (!parseHead(icapReply
))
671 if (httpHeaderHasConnDir(&icapReply
->header
, "close")) {
672 debugs(93, 5, HERE
<< "found connection close");
673 reuseConnection
= false;
676 switch (icapReply
->sline
.status
) {
683 case 201: // Symantec Scan Engine 5.0 and later when modifying HTTP msg
685 if (!validate200Ok()) {
686 throw TexcHere("Invalid ICAP Response");
694 handle204NoContent();
698 debugs(93, 5, HERE
<< "ICAP status " << icapReply
->sline
.status
);
699 handleUnknownScode();
703 // handle100Continue() manages state.writing on its own.
704 // Non-100 status means the server needs no postPreview data from us.
705 if (state
.writing
== State::writingPaused
)
709 bool ICAPModXact::validate200Ok()
711 if (ICAP::methodRespmod
== service().cfg().method
) {
712 if (!gotEncapsulated("res-hdr"))
718 if (ICAP::methodReqmod
== service().cfg().method
) {
719 if (!gotEncapsulated("res-hdr") && !gotEncapsulated("req-hdr"))
728 void ICAPModXact::handle100Continue()
730 Must(state
.writing
== State::writingPaused
);
731 // server must not respond before the end of preview: we may send ieof
732 Must(preview
.enabled() && preview
.done() && !preview
.ieof());
734 // 100 "Continue" cancels our preview commitment, not 204s outside preview
735 if (!state
.allowedPostview204
)
738 state
.parsing
= State::psIcapHeader
; // eventually
741 state
.writing
= State::writingPrime
;
746 void ICAPModXact::handle200Ok()
748 state
.parsing
= State::psHttpHeader
;
749 state
.sending
= State::sendingAdapted
;
754 void ICAPModXact::handle204NoContent()
760 // Called when we receive a 204 No Content response and
761 // when we are trying to bypass a service failure.
762 // We actually start sending (echoig or not) in startSending.
763 void ICAPModXact::prepEchoing()
765 disableBypass("preparing to echo content");
767 // We want to clone the HTTP message, but we do not want
768 // to copy some non-HTTP state parts that HttpMsg kids carry in them.
769 // Thus, we cannot use a smart pointer, copy constructor, or equivalent.
770 // Instead, we simply write the HTTP message and "clone" it by parsing.
772 HttpMsg
*oldHead
= virgin
.header
;
773 debugs(93, 7, "ICAPModXact cloning virgin message " << oldHead
);
777 // write the virgin message into a memory buffer
779 packHead(httpBuf
, oldHead
);
781 // allocate the adapted message and copy metainfo
782 Must(!adapted
.header
);
783 HttpMsg
*newHead
= NULL
;
784 if (dynamic_cast<const HttpRequest
*>(oldHead
)) {
785 HttpRequest
*newR
= new HttpRequest
;
787 } else if (dynamic_cast<const HttpReply
*>(oldHead
)) {
788 HttpReply
*newRep
= new HttpReply
;
792 newHead
->inheritProperties(oldHead
);
794 adapted
.setHeader(newHead
);
796 // parse the buffer back
797 http_status error
= HTTP_STATUS_NONE
;
799 Must(newHead
->parse(&httpBuf
, true, &error
));
801 Must(newHead
->hdr_sz
== httpBuf
.contentSize()); // no leftovers
805 debugs(93, 7, "ICAPModXact cloned virgin message " << oldHead
<< " to " <<
808 // setup adapted body pipe if needed
809 if (oldHead
->body_pipe
!= NULL
) {
810 debugs(93, 7, HERE
<< "will echo virgin body from " <<
812 if (!virginBodySending
.active())
813 virginBodySending
.plan(); // will throw if not possible
814 state
.sending
= State::sendingVirgin
;
817 // TODO: optimize: is it possible to just use the oldHead pipe and
818 // remove ICAP from the loop? This echoing is probably a common case!
819 makeAdaptedBodyPipe("echoed virgin response");
820 if (oldHead
->body_pipe
->bodySizeKnown())
821 adapted
.body_pipe
->setBodySize(oldHead
->body_pipe
->bodySize());
822 debugs(93, 7, HERE
<< "will echo virgin body to " <<
825 debugs(93, 7, HERE
<< "no virgin body to echo");
830 void ICAPModXact::handleUnknownScode()
834 // TODO: mark connection as "bad"
836 // Terminate the transaction; we do not know how to handle this response.
837 throw TexcHere("Unsupported ICAP status code");
840 void ICAPModXact::parseHttpHead()
842 if (gotEncapsulated("res-hdr") || gotEncapsulated("req-hdr")) {
843 maybeAllocateHttpMsg();
845 if (!parseHead(adapted
.header
))
846 return; // need more header data
848 if (dynamic_cast<HttpRequest
*>(adapted
.header
)) {
849 const HttpRequest
*oldR
= dynamic_cast<const HttpRequest
*>(virgin
.header
);
851 // TODO: the adapted request did not really originate from the
852 // client; give proxy admin an option to prevent copying of
853 // sensitive client information here. See the following thread:
854 // http://www.squid-cache.org/mail-archive/squid-dev/200703/0040.html
857 // Maybe adapted.header==NULL if HttpReply and have Http 0.9 ....
859 adapted
.header
->inheritProperties(virgin
.header
);
862 decideOnParsingBody();
865 // parses both HTTP and ICAP headers
866 bool ICAPModXact::parseHead(HttpMsg
*head
)
869 debugs(93, 5, HERE
<< "have " << readBuf
.contentSize() << " head bytes to parse" <<
870 "; state: " << state
.parsing
);
872 http_status error
= HTTP_STATUS_NONE
;
873 const bool parsed
= head
->parse(&readBuf
, commEof
, &error
);
874 Must(parsed
|| !error
); // success or need more data
876 if (!parsed
) { // need more data
877 debugs(93, 5, HERE
<< "parse failed, need more data, return false");
882 debugs(93, 5, HERE
<< "parse success, consume " << head
->hdr_sz
<< " bytes, return true");
883 readBuf
.consume(head
->hdr_sz
);
887 void ICAPModXact::decideOnParsingBody()
889 if (gotEncapsulated("res-body") || gotEncapsulated("req-body")) {
890 debugs(93, 5, HERE
<< "expecting a body");
891 state
.parsing
= State::psBody
;
892 bodyParser
= new ChunkedCodingParser
;
893 makeAdaptedBodyPipe("adapted response from the ICAP server");
894 Must(state
.sending
== State::sendingAdapted
);
896 debugs(93, 5, HERE
<< "not expecting a body");
902 void ICAPModXact::parseBody()
904 Must(state
.parsing
== State::psBody
);
907 debugs(93, 5, HERE
<< "have " << readBuf
.contentSize() << " body bytes to parse");
909 // the parser will throw on errors
910 BodyPipeCheckout
bpc(*adapted
.body_pipe
);
911 const bool parsed
= bodyParser
->parse(&readBuf
, &bpc
.buf
);
914 debugs(93, 5, HERE
<< "have " << readBuf
.contentSize() << " body bytes after " <<
915 "parse; parsed all: " << parsed
);
917 // TODO: expose BodyPipe::putSize() to make this check simpler and clearer
918 if (adapted
.body_pipe
->buf().contentSize() > 0) // parsed something sometime
919 disableBypass("sent adapted content");
923 stopSending(true); // the parser succeeds only if all parsed data fits
927 debugs(93,3,HERE
<< this << " needsMoreData = " << bodyParser
->needsMoreData());
929 if (bodyParser
->needsMoreData()) {
930 debugs(93,3,HERE
<< this);
935 if (bodyParser
->needsMoreSpace()) {
936 Must(!doneSending()); // can hope for more space
937 Must(adapted
.body_pipe
->buf().contentSize() > 0); // paranoid
938 // TODO: there should be a timeout in case the sink is broken
939 // or cannot consume partial content (while we need more space)
943 void ICAPModXact::stopParsing()
945 if (state
.parsing
== State::psDone
)
948 debugs(93, 7, "ICAPModXact will no longer parse" << status());
954 state
.parsing
= State::psDone
;
957 // HTTP side added virgin body data
958 void ICAPModXact::noteMoreBodyDataAvailable(BodyPipe::Pointer
)
962 if (state
.sending
== State::sendingVirgin
)
966 // HTTP side sent us all virgin info
967 void ICAPModXact::noteBodyProductionEnded(BodyPipe::Pointer
)
969 Must(virgin
.body_pipe
->productionEnded());
971 // push writer and sender in case we were waiting for the last-chunk
974 if (state
.sending
== State::sendingVirgin
)
978 // body producer aborted, but the initiator may still want to know
979 // the answer, even though the HTTP message has been truncated
980 void ICAPModXact::noteBodyProducerAborted(BodyPipe::Pointer
)
982 Must(virgin
.body_pipe
->productionEnded());
984 // push writer and sender in case we were waiting for the last-chunk
987 if (state
.sending
== State::sendingVirgin
)
991 // adapted body consumer wants more adapted data and
992 // possibly freed some buffer space
993 void ICAPModXact::noteMoreBodySpaceAvailable(BodyPipe::Pointer
)
995 if (state
.sending
== State::sendingVirgin
)
997 else if (state
.sending
== State::sendingAdapted
)
1000 Must(state
.sending
== State::sendingUndecided
);
1003 // adapted body consumer aborted
1004 void ICAPModXact::noteBodyConsumerAborted(BodyPipe::Pointer
)
1006 mustStop("adapted body consumer aborted");
1010 void ICAPModXact::swanSong()
1012 debugs(93, 5, HERE
<< "swan sings" << status());
1022 ICAPXaction::swanSong();
1025 void ICAPModXact::makeRequestHeaders(MemBuf
&buf
)
1027 char ntoabuf
[MAX_IPSTRLEN
];
1029 * XXX These should use HttpHdr interfaces instead of Printfs
1031 const Adaptation::ServiceConfig
&s
= service().cfg();
1032 buf
.Printf("%s %s ICAP/1.0\r\n", s
.methodStr(), s
.uri
.buf());
1033 buf
.Printf("Host: %s:%d\r\n", s
.host
.buf(), s
.port
);
1034 buf
.Printf("Date: %s\r\n", mkrfc1123(squid_curtime
));
1036 if (!TheICAPConfig
.reuse_connections
)
1037 buf
.Printf("Connection: close\r\n");
1039 // we must forward "Proxy-Authenticate" and "Proxy-Authorization"
1041 if (virgin
.header
->header
.has(HDR_PROXY_AUTHENTICATE
))
1042 buf
.Printf("Proxy-Authenticate: %s\r\n",
1043 virgin
.header
->header
.getByName("Proxy-Authenticate").unsafeBuf());
1045 if (virgin
.header
->header
.has(HDR_PROXY_AUTHORIZATION
))
1046 buf
.Printf("Proxy-Authorization: %s\r\n",
1047 virgin
.header
->header
.getByName("Proxy-Authorization").unsafeBuf());
1049 buf
.Printf("Encapsulated: ");
1055 // build HTTP request header, if any
1056 ICAP::Method m
= s
.method
;
1058 const HttpRequest
*request
= virgin
.cause
?
1060 dynamic_cast<const HttpRequest
*>(virgin
.header
);
1062 // to simplify, we could assume that request is always available
1066 urlPath
= request
->urlpath
;
1067 if (ICAP::methodRespmod
== m
)
1068 encapsulateHead(buf
, "req-hdr", httpBuf
, request
);
1070 if (ICAP::methodReqmod
== m
)
1071 encapsulateHead(buf
, "req-hdr", httpBuf
, virgin
.header
);
1074 if (ICAP::methodRespmod
== m
)
1075 if (const HttpMsg
*prime
= virgin
.header
)
1076 encapsulateHead(buf
, "res-hdr", httpBuf
, prime
);
1078 if (!virginBody
.expected())
1079 buf
.Printf("null-body=%d", (int) httpBuf
.contentSize());
1080 else if (ICAP::methodReqmod
== m
)
1081 buf
.Printf("req-body=%d", (int) httpBuf
.contentSize());
1083 buf
.Printf("res-body=%d", (int) httpBuf
.contentSize());
1085 buf
.append(ICAP::crlf
, 2); // terminate Encapsulated line
1087 if (preview
.enabled()) {
1088 buf
.Printf("Preview: %d\r\n", (int)preview
.ad());
1089 if (virginBody
.expected()) // there is a body to preview
1090 virginBodySending
.plan();
1092 finishNullOrEmptyBodyPreview(httpBuf
);
1095 if (shouldAllow204()) {
1096 debugs(93,5, HERE
<< "will allow 204s outside of preview");
1097 state
.allowedPostview204
= true;
1098 buf
.Printf("Allow: 204\r\n");
1099 if (virginBody
.expected()) // there is a body to echo
1100 virginBodySending
.plan();
1103 if (TheICAPConfig
.send_client_ip
&& request
)
1104 if (!request
->client_addr
.IsAnyAddr() && !request
->client_addr
.IsNoAddr())
1105 buf
.Printf("X-Client-IP: %s\r\n", request
->client_addr
.NtoA(ntoabuf
,MAX_IPSTRLEN
));
1107 if (TheICAPConfig
.send_client_username
&& request
)
1108 makeUsernameHeader(request
, buf
);
1110 // fprintf(stderr, "%s\n", unsafeBuf.content());
1112 buf
.append(ICAP::crlf
, 2); // terminate ICAP header
1114 // start ICAP request body with encapsulated HTTP headers
1115 buf
.append(httpBuf
.content(), httpBuf
.contentSize());
1120 void ICAPModXact::makeUsernameHeader(const HttpRequest
*request
, MemBuf
&buf
)
1122 if (const AuthUserRequest
*auth
= request
->auth_user_request
) {
1123 if (char const *name
= auth
->username()) {
1124 const char *value
= TheICAPConfig
.client_username_encode
?
1125 base64_encode(name
) : name
;
1126 buf
.Printf("%s: %s\r\n", TheICAPConfig
.client_username_header
,
1132 void ICAPModXact::encapsulateHead(MemBuf
&icapBuf
, const char *section
, MemBuf
&httpBuf
, const HttpMsg
*head
)
1134 // update ICAP header
1135 icapBuf
.Printf("%s=%d, ", section
, (int) httpBuf
.contentSize());
1138 HttpMsg
*headClone
= NULL
;
1140 if (const HttpRequest
* old_request
= dynamic_cast<const HttpRequest
*>(head
)) {
1141 HttpRequest
* new_request
= new HttpRequest
;
1142 urlParse(old_request
->method
, old_request
->canonical
,new_request
);
1143 new_request
->http_ver
= old_request
->http_ver
;
1144 headClone
= new_request
;
1145 } else if (const HttpReply
*old_reply
= dynamic_cast<const HttpReply
*>(head
)) {
1146 HttpReply
* new_reply
= new HttpReply
;
1147 new_reply
->sline
= old_reply
->sline
;
1148 headClone
= new_reply
;
1152 headClone
->inheritProperties(head
);
1154 HttpHeaderPos pos
= HttpHeaderInitPos
;
1155 HttpHeaderEntry
* p_head_entry
= NULL
;
1156 while (NULL
!= (p_head_entry
= head
->header
.getEntry(&pos
)) )
1157 headClone
->header
.addEntry(p_head_entry
->clone());
1161 // remove all hop-by-hop headers from the clone
1162 headClone
->header
.delById(HDR_PROXY_AUTHENTICATE
);
1163 headClone
->header
.removeHopByHopEntries();
1165 // pack polished HTTP header
1166 packHead(httpBuf
, headClone
);
1171 void ICAPModXact::packHead(MemBuf
&httpBuf
, const HttpMsg
*head
)
1174 packerToMemInit(&p
, &httpBuf
);
1175 head
->packInto(&p
, true);
1179 // decides whether to offer a preview and calculates its size
1180 void ICAPModXact::decideOnPreview()
1182 if (!TheICAPConfig
.preview_enable
) {
1183 debugs(93, 5, HERE
<< "preview disabled by squid.conf");
1187 const HttpRequest
*request
= virgin
.cause
?
1189 dynamic_cast<const HttpRequest
*>(virgin
.header
);
1190 const String urlPath
= request
? request
->urlpath
: String();
1192 if (!service().wantsPreview(urlPath
, wantedSize
)) {
1193 debugs(93, 5, "ICAPModXact should not offer preview for " << urlPath
);
1197 // we decided to do preview, now compute its size
1199 Must(wantedSize
>= 0);
1201 // cannot preview more than we can backup
1202 size_t ad
= XMIN(wantedSize
, TheBackupLimit
);
1204 if (!virginBody
.expected())
1207 if (virginBody
.knownSize())
1208 ad
= XMIN(static_cast<uint64_t>(ad
), virginBody
.size()); // not more than we have
1210 debugs(93, 5, "ICAPModXact should offer " << ad
<< "-byte preview " <<
1211 "(service wanted " << wantedSize
<< ")");
1214 Must(preview
.enabled());
1217 // decides whether to allow 204 responses
1218 bool ICAPModXact::shouldAllow204()
1220 if (!service().allows204())
1223 return canBackupEverything();
1226 // used by shouldAllow204 and decideOnRetries
1227 bool ICAPModXact::canBackupEverything() const
1229 if (!virginBody
.expected())
1230 return true; // no body means no problems with backup
1232 // if there is a body, check whether we can backup it all
1234 if (!virginBody
.knownSize())
1237 // or should we have a different backup limit?
1238 // note that '<' allows for 0-termination of the "full" backup buffer
1239 return virginBody
.size() < TheBackupLimit
;
1242 // Decide whether this transaction can be retried if pconn fails
1243 // Must be called after decideOnPreview and before openConnection()
1244 void ICAPModXact::decideOnRetries()
1247 return; // no, already decided
1249 if (preview
.enabled())
1250 return; // yes, because preview provides enough guarantees
1252 if (canBackupEverything())
1253 return; // yes, because we can back everything up
1255 disableRetries(); // no, because we cannot back everything up
1258 // Normally, the body-writing code handles preview body. It can deal with
1259 // bodies of unexpected size, including those that turn out to be empty.
1260 // However, that code assumes that the body was expected and body control
1261 // structures were initialized. This is not the case when there is no body
1262 // or the body is known to be empty, because the virgin message will lack a
1263 // body_pipe. So we handle preview of null-body and zero-size bodies here.
1264 void ICAPModXact::finishNullOrEmptyBodyPreview(MemBuf
&buf
)
1266 Must(!virginBodyWriting
.active()); // one reason we handle it here
1267 Must(!virgin
.body_pipe
); // another reason we handle it here
1268 Must(!preview
.ad());
1270 // do not add last-chunk because our Encapsulated header says null-body
1271 // addLastRequestChunk(unsafeBuf);
1272 preview
.wrote(0, true);
1274 Must(preview
.done());
1275 Must(preview
.ieof());
1278 void ICAPModXact::fillPendingStatus(MemBuf
&buf
) const
1280 ICAPXaction::fillPendingStatus(buf
);
1282 if (state
.serviceWaiting
)
1285 if (virgin
.body_pipe
!= NULL
)
1288 if (connection
> 0 && !doneReading())
1291 if (!state
.doneWriting() && state
.writing
!= State::writingInit
)
1292 buf
.Printf("w(%d)", state
.writing
);
1294 if (preview
.enabled()) {
1295 if (!preview
.done())
1296 buf
.Printf("P(%d)", (int) preview
.debt());
1299 if (virginBodySending
.active())
1302 if (!state
.doneParsing() && state
.parsing
!= State::psIcapHeader
)
1303 buf
.Printf("p(%d)", state
.parsing
);
1305 if (!doneSending() && state
.sending
!= State::sendingUndecided
)
1306 buf
.Printf("S(%d)", state
.sending
);
1312 void ICAPModXact::fillDoneStatus(MemBuf
&buf
) const
1314 ICAPXaction::fillDoneStatus(buf
);
1316 if (!virgin
.body_pipe
)
1319 if (state
.doneWriting())
1322 if (preview
.enabled()) {
1324 buf
.Printf("P%s", preview
.ieof() ? "(ieof)" : "");
1330 if (state
.doneParsing())
1337 bool ICAPModXact::gotEncapsulated(const char *section
) const
1339 return icapReply
->header
.getByNameListMember("Encapsulated",
1340 section
, ',').size() > 0;
1343 // calculate whether there is a virgin HTTP body and
1344 // whether its expected size is known
1345 // TODO: rename because we do not just estimate
1346 void ICAPModXact::estimateVirginBody()
1348 // note: lack of size info may disable previews and 204s
1350 HttpMsg
*msg
= virgin
.header
;
1353 HttpRequestMethod method
;
1356 method
= virgin
.cause
->method
;
1358 if (HttpRequest
*req
= dynamic_cast<HttpRequest
*>(msg
))
1359 method
= req
->method
;
1361 method
= METHOD_NONE
;
1364 // expectingBody returns true for zero-sized bodies, but we will not
1365 // get a pipe for that body, so we treat the message as bodyless
1366 if (method
!= METHOD_NONE
&& msg
->expectingBody(method
, size
) && size
) {
1367 debugs(93, 6, "ICAPModXact expects virgin body from " <<
1368 virgin
.body_pipe
<< "; size: " << size
);
1370 virginBody
.expect(size
);
1371 virginBodyWriting
.plan();
1373 // sign up as a body consumer
1374 Must(msg
->body_pipe
!= NULL
);
1375 Must(msg
->body_pipe
== virgin
.body_pipe
);
1376 Must(virgin
.body_pipe
->setConsumerIfNotLate(this));
1378 // make sure TheBackupLimit is in-sync with the buffer size
1379 Must(TheBackupLimit
<= static_cast<size_t>(msg
->body_pipe
->unsafeBuf().max_capacity
));
1381 debugs(93, 6, "ICAPModXact does not expect virgin body");
1382 Must(msg
->body_pipe
== NULL
);
1387 void ICAPModXact::makeAdaptedBodyPipe(const char *what
)
1389 Must(!adapted
.body_pipe
);
1390 Must(!adapted
.header
->body_pipe
);
1391 adapted
.header
->body_pipe
= new BodyPipe(this);
1392 adapted
.body_pipe
= adapted
.header
->body_pipe
;
1393 debugs(93, 7, HERE
<< "will supply " << what
<< " via " <<
1394 adapted
.body_pipe
<< " pipe");
1398 // TODO: Move SizedEstimate, MemBufBackup, and ICAPPreview elsewhere
1400 SizedEstimate::SizedEstimate()
1401 : theData(dtUnexpected
)
1404 void SizedEstimate::expect(int64_t aSize
)
1406 theData
= (aSize
>= 0) ? aSize
: (int64_t)dtUnknown
;
1409 bool SizedEstimate::expected() const
1411 return theData
!= dtUnexpected
;
1414 bool SizedEstimate::knownSize() const
1417 return theData
!= dtUnknown
;
1420 uint64_t SizedEstimate::size() const
1423 return static_cast<uint64_t>(theData
);
1428 VirginBodyAct::VirginBodyAct(): theStart(0), theState(stUndecided
)
1431 void VirginBodyAct::plan()
1434 Must(!theStart
); // not started
1435 theState
= stActive
;
1438 void VirginBodyAct::disable()
1440 theState
= stDisabled
;
1443 void VirginBodyAct::progress(size_t size
)
1447 theStart
+= static_cast<int64_t>(size
);
1450 uint64_t VirginBodyAct::offset() const
1453 return static_cast<uint64_t>(theStart
);
1457 ICAPPreview::ICAPPreview(): theWritten(0), theAd(0), theState(stDisabled
)
1460 void ICAPPreview::enable(size_t anAd
)
1462 // TODO: check for anAd not exceeding preview size limit
1466 theState
= stWriting
;
1469 bool ICAPPreview::enabled() const
1471 return theState
!= stDisabled
;
1474 size_t ICAPPreview::ad() const
1480 bool ICAPPreview::done() const
1483 return theState
>= stIeof
;
1486 bool ICAPPreview::ieof() const
1489 return theState
== stIeof
;
1492 size_t ICAPPreview::debt() const
1495 return done() ? 0 : (theAd
- theWritten
);
1498 void ICAPPreview::wrote(size_t size
, bool wroteEof
)
1504 Must(theWritten
<= theAd
);
1507 theState
= stIeof
; // written size is irrelevant
1509 if (theWritten
>= theAd
)
1513 bool ICAPModXact::fillVirginHttpHeader(MemBuf
&mb
) const
1515 if (virgin
.header
== NULL
)
1518 virgin
.header
->firstLineBuf(mb
);
1524 /* ICAPModXactLauncher */
1526 ICAPModXactLauncher::ICAPModXactLauncher(Adaptation::Initiator
*anInitiator
, HttpMsg
*virginHeader
, HttpRequest
*virginCause
, Adaptation::ServicePointer aService
):
1527 AsyncJob("ICAPModXactLauncher"),
1528 ICAPLauncher("ICAPModXactLauncher", anInitiator
, aService
)
1530 virgin
.setHeader(virginHeader
);
1531 virgin
.setCause(virginCause
);
1534 ICAPXaction
*ICAPModXactLauncher::createXaction()
1536 ICAPServiceRep::Pointer s
=
1537 dynamic_cast<ICAPServiceRep
*>(theService
.getRaw());
1539 return new ICAPModXact(this, virgin
.header
, virgin
.cause
, s
);