]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ICAP/ICAPModXact.cc
Windows port: Correctly identify Windows Vista and Windows Server Longhorn
[thirdparty/squid.git] / src / ICAP / ICAPModXact.cc
CommitLineData
774c051c 1/*
2 * DEBUG: section 93 ICAP (RFC 3507) Client
3 */
4
5#include "squid.h"
6#include "comm.h"
7#include "MsgPipe.h"
8#include "MsgPipeData.h"
9#include "HttpRequest.h"
10#include "HttpReply.h"
11#include "ICAPServiceRep.h"
12#include "ICAPModXact.h"
13#include "ICAPClient.h"
14#include "ChunkedCodingParser.h"
15#include "TextException.h"
a97e82a8 16#include "AuthUserRequest.h"
12b91c99 17#include "ICAPConfig.h"
985c86bc 18#include "SquidTime.h"
774c051c 19
20// flow and terminology:
21// HTTP| --> receive --> encode --> write --> |network
22// end | <-- send <-- parse <-- read <-- |end
23
24// TODO: doneSending()/doneReceving() data members should probably be in sync
25// with this->adapted/virgin pointers. Make adapted/virgin methods?
26
27// TODO: replace gotEncapsulated() with something faster; we call it often
28
29CBDATA_CLASS_INIT(ICAPModXact);
30
31static const size_t TheBackupLimit = ICAP::MsgPipeBufSizeMax;
32
12b91c99 33extern ICAPConfig TheICAPConfig;
34
774c051c 35
36ICAPModXact::State::State()
37{
38 memset(this, sizeof(*this), 0);
39}
40
41ICAPModXact::ICAPModXact(): ICAPXaction("ICAPModXact"),
42 self(NULL), virgin(NULL), adapted(NULL),
43 icapReply(NULL), virginConsumed(0),
44 bodyParser(NULL)
45{}
46
47void ICAPModXact::init(ICAPServiceRep::Pointer &aService, MsgPipe::Pointer &aVirgin, MsgPipe::Pointer &anAdapted, Pointer &aSelf)
48{
49 assert(!self.getRaw() && !virgin.getRaw() && !adapted.getRaw());
50 assert(aSelf.getRaw() && aVirgin.getRaw() && anAdapted.getRaw());
51
52 self = aSelf;
53 service(aService);
54
55 virgin = aVirgin;
56 adapted = anAdapted;
57
58 // receiving end
59 virgin->sink = this; // should be 'self' and refcounted
60 // virgin pipe data is initiated by the source
61
62 // sending end
63 adapted->source = this; // should be 'self' and refcounted
64 adapted->data = new MsgPipeData;
65
66 adapted->data->body = new MemBuf; // XXX: make body a non-pointer?
67 adapted->data->body->init(ICAP::MsgPipeBufSizeMin, ICAP::MsgPipeBufSizeMax);
68 // headers are initialized when we parse them
69
70 // writing and reading ends are handled by ICAPXaction
71
72 // encoding
73 // nothing to do because we are using temporary buffers
74
75 // parsing
76 icapReply = new HttpReply;
77 icapReply->protoPrefix = "ICAP/"; // TODO: make an IcapReply class?
78
79 // XXX: make sure stop() cleans all buffers
80}
81
82// HTTP side starts sending virgin data
83void ICAPModXact::noteSourceStart(MsgPipe *p)
84{
85 ICAPXaction_Enter(noteSourceStart);
86
87 // make sure TheBackupLimit is in-sync with the buffer size
88 Must(TheBackupLimit <= static_cast<size_t>(virgin->data->body->max_capacity));
89
90 estimateVirginBody(); // before virgin disappears!
91
92 // it is an ICAP violation to send request to a service w/o known OPTIONS
93
94 if (service().up())
95 startWriting();
96 else
97 waitForService();
98
99 // XXX: but this has to be here to catch other errors. Thus, if
100 // commConnectStart in startWriting fails, we may get here
101 //_after_ the object got destroyed. Somebody please fix commConnectStart!
102 ICAPXaction_Exit();
103}
104
105static
106void ICAPModXact_noteServiceReady(void *data, ICAPServiceRep::Pointer &)
107{
108 ICAPModXact *x = static_cast<ICAPModXact*>(data);
109 assert(x);
110 x->noteServiceReady();
111}
112
113void ICAPModXact::waitForService()
114{
115 Must(!state.serviceWaiting);
116 debugs(93, 7, "ICAPModXact will wait for the ICAP service " << status());
117 state.serviceWaiting = true;
118 service().callWhenReady(&ICAPModXact_noteServiceReady, this);
119}
120
121void ICAPModXact::noteServiceReady()
122{
123 ICAPXaction_Enter(noteServiceReady);
124
125 Must(state.serviceWaiting);
126 state.serviceWaiting = false;
127 startWriting(); // will throw if service is not up
128
129 ICAPXaction_Exit();
130}
131
132void ICAPModXact::startWriting()
133{
134 Must(service().up());
135
136 state.writing = State::writingConnect;
137 openConnection();
138 // put nothing here as openConnection calls commConnectStart
139 // and that may call us back without waiting for the next select loop
140}
141
142// connection with the ICAP service established
143void ICAPModXact::handleCommConnected()
144{
145 Must(state.writing == State::writingConnect);
146
147 startReading(); // wait for early errors from the ICAP server
148
149 MemBuf requestBuf;
150 requestBuf.init();
151
152 makeRequestHeaders(requestBuf);
3b299123 153 debugs(93, 9, "ICAPModXact ICAP status " << status() << " will write:\n" <<
774c051c 154 (requestBuf.terminate(), requestBuf.content()));
155
156 // write headers
157 state.writing = State::writingHeaders;
158 scheduleWrite(requestBuf);
3b299123 159 virgin->sendSinkNeed();
774c051c 160}
161
b107a5a5 162void ICAPModXact::handleCommWrote(size_t sz)
774c051c 163{
b107a5a5 164 debugs(93, 5, HERE << "Wrote " << sz << " bytes");
165
774c051c 166 if (state.writing == State::writingHeaders)
167 handleCommWroteHeaders();
168 else
169 handleCommWroteBody();
170}
171
172void ICAPModXact::handleCommWroteHeaders()
173{
174 Must(state.writing == State::writingHeaders);
175
176 if (virginBody.expected()) {
177 state.writing = preview.enabled() ?
178 State::writingPreview : State::writingPrime;
179 virginWriteClaim.protectAll();
180 writeMore();
181 } else {
182 stopWriting();
183 }
184}
185
186void ICAPModXact::writeMore()
187{
188 if (writer) // already writing something
189 return;
190
191 switch (state.writing) {
192
193 case State::writingInit: // waiting for service OPTIONS
194 Must(state.serviceWaiting);
195
196 case State::writingConnect: // waiting for the connection to establish
197
198 case State::writingHeaders: // waiting for the headers to be written
199
200 case State::writingPaused: // waiting for the ICAP server response
201
202 case State::writingDone: // nothing more to write
203 return;
204
205 case State::writingPreview:
206 writePriviewBody();
207 return;
208
209 case State::writingPrime:
210 writePrimeBody();
211 return;
212
213 default:
214 throw TexcHere("ICAPModXact in bad writing state");
215 }
216}
217
218void ICAPModXact::writePriviewBody()
219{
220 debugs(93, 8, "ICAPModXact will write Preview body " << status());
221 Must(state.writing == State::writingPreview);
222
223 MsgPipeData::Body *body = virgin->data->body;
224 const size_t size = XMIN(preview.debt(), (size_t)body->contentSize());
225 writeSomeBody("preview body", size);
226
227 // change state once preview is written
228
229 if (preview.done()) {
230 debugs(93, 7, "ICAPModXact wrote entire Preview body " << status());
231
232 if (preview.ieof())
233 stopWriting();
234 else
235 state.writing = State::writingPaused;
236 }
237}
238
239void ICAPModXact::writePrimeBody()
240{
241 Must(state.writing == State::writingPrime);
242 Must(virginWriteClaim.active());
243
244 MsgPipeData::Body *body = virgin->data->body;
245 const size_t size = body->contentSize();
246 writeSomeBody("prime virgin body", size);
247
b107a5a5 248 if (state.doneReceiving) {
249 debugs(98, 5, HERE << "state.doneReceiving is set");
774c051c 250 stopWriting();
b107a5a5 251 }
774c051c 252}
253
254void ICAPModXact::writeSomeBody(const char *label, size_t size)
255{
256 Must(!writer && !state.doneWriting());
12f4b710 257 debugs(93, 8, HERE << "will write up to " << size << " bytes of " <<
774c051c 258 label);
259
260 MemBuf writeBuf; // TODO: suggest a min size based on size and lastChunk
261
262 writeBuf.init(); // note: we assume that last-chunk will fit
263
264 const size_t writeableSize = claimSize(virginWriteClaim);
265 const size_t chunkSize = XMIN(writeableSize, size);
266
267 if (chunkSize) {
12f4b710 268 debugs(93, 7, HERE << "will write " << chunkSize <<
774c051c 269 "-byte chunk of " << label);
270 } else {
271 debugs(93, 7, "ICAPModXact has no writeable " << label << " content");
272 }
273
274 moveRequestChunk(writeBuf, chunkSize);
275
276 const bool lastChunk =
277 (state.writing == State::writingPreview && preview.done()) ||
278 (state.doneReceiving && claimSize(virginWriteClaim) <= 0);
279
280 if (lastChunk && virginBody.expected()) {
12f4b710 281 debugs(93, 8, HERE << "will write last-chunk of " << label);
774c051c 282 addLastRequestChunk(writeBuf);
283 }
284
12f4b710 285 debugs(93, 7, HERE << "will write " << writeBuf.contentSize()
774c051c 286 << " raw bytes of " << label);
287
288 if (writeBuf.hasContent()) {
289 scheduleWrite(writeBuf); // comm will free the chunk
290 } else {
291 writeBuf.clean();
292 }
293}
294
295void ICAPModXact::moveRequestChunk(MemBuf &buf, size_t chunkSize)
296{
297 if (chunkSize > 0) {
298 openChunk(buf, chunkSize);
299 buf.append(claimContent(virginWriteClaim), chunkSize);
300 closeChunk(buf, false);
301
302 virginWriteClaim.release(chunkSize);
303 virginConsume();
304 }
305
306 if (state.writing == State::writingPreview)
307 preview.wrote(chunkSize, state.doneReceiving); // even if wrote nothing
308}
309
310void ICAPModXact::addLastRequestChunk(MemBuf &buf)
311{
312 openChunk(buf, 0);
313 closeChunk(buf, state.writing == State::writingPreview && preview.ieof());
314}
315
316void ICAPModXact::openChunk(MemBuf &buf, size_t chunkSize)
317{
1dd6edf2 318 buf.Printf("%x\r\n", (int) chunkSize);
774c051c 319}
320
321void ICAPModXact::closeChunk(MemBuf &buf, bool ieof)
322{
323 if (ieof)
324 buf.append("; ieof", 6);
325
326 buf.append(ICAP::crlf, 2); // chunk-terminating CRLF
327}
328
329size_t ICAPModXact::claimSize(const MemBufClaim &claim) const
330{
331 Must(claim.active());
332 const size_t start = claim.offset();
333 const size_t end = virginConsumed + virgin->data->body->contentSize();
334 Must(virginConsumed <= start && start <= end);
335 return end - start;
336}
337
338const char *ICAPModXact::claimContent(const MemBufClaim &claim) const
339{
340 Must(claim.active());
341 const size_t start = claim.offset();
342 Must(virginConsumed <= start);
343 return virgin->data->body->content() + (start - virginConsumed);
344}
345
346void ICAPModXact::virginConsume()
347{
348 MemBuf &buf = *virgin->data->body;
349 const size_t have = static_cast<size_t>(buf.contentSize());
350 const size_t end = virginConsumed + have;
351 size_t offset = end;
352
353 if (virginWriteClaim.active())
354 offset = XMIN(virginWriteClaim.offset(), offset);
355
356 if (virginSendClaim.active())
357 offset = XMIN(virginSendClaim.offset(), offset);
358
359 Must(virginConsumed <= offset && offset <= end);
360
361 if (const size_t size = offset - virginConsumed) {
b107a5a5 362 debugs(93, 8, HERE << "consuming " << size << " out of " << have <<
774c051c 363 " virgin body bytes");
364 buf.consume(size);
365 virginConsumed += size;
366
367 if (!state.doneReceiving)
368 virgin->sendSinkNeed();
369 }
370}
371
372void ICAPModXact::handleCommWroteBody()
373{
374 writeMore();
375}
376
377void ICAPModXact::stopWriting()
378{
379 if (state.writing == State::writingDone)
380 return;
381
b107a5a5 382 debugs(93, 7, HERE << "will no longer write " << status());
774c051c 383
384 state.writing = State::writingDone;
385
386 virginWriteClaim.disable();
387
388 virginConsume();
389
390 // Comm does not have an interface to clear the writer, but
391 // writeMore() will not write if our write callback is called
392 // when state.writing == State::writingDone;
393}
394
395void ICAPModXact::stopBackup()
396{
397 if (!virginSendClaim.active())
398 return;
399
400 debugs(93, 7, "ICAPModXact will no longer backup " << status());
401
402 virginSendClaim.disable();
403
404 virginConsume();
405}
406
407bool ICAPModXact::doneAll() const
408{
409 return ICAPXaction::doneAll() && !state.serviceWaiting &&
410 state.doneReceiving && doneSending() &&
411 doneReading() && state.doneWriting();
412}
413
414void ICAPModXact::startReading()
415{
416 Must(connection >= 0);
417 Must(!reader);
418 Must(adapted.getRaw());
419 Must(adapted->data);
420 Must(adapted->data->body);
421
422 // we use the same buffer for headers and body and then consume headers
423 readMore();
424}
425
426void ICAPModXact::readMore()
427{
3b299123 428 if (reader || doneReading()) {
429 debugs(32,3,HERE << "returning from readMore because reader or doneReading()");
774c051c 430 return;
3b299123 431 }
774c051c 432
433 // do not fill readBuf if we have no space to store the result
3b299123 434 if (!adapted->data->body->hasPotentialSpace()) {
435 debugs(93,1,HERE << "Not reading because ICAP reply buffer is full");
774c051c 436 return;
3b299123 437 }
774c051c 438
439 if (readBuf.hasSpace())
440 scheduleRead();
3b299123 441 else
442 debugs(93,1,HERE << "nothing to do because !readBuf.hasSpace()");
774c051c 443}
444
445// comm module read a portion of the ICAP response for us
446void ICAPModXact::handleCommRead(size_t)
447{
448 Must(!state.doneParsing());
449 parseMore();
450 readMore();
451}
452
453void ICAPModXact::echoMore()
454{
455 Must(state.sending == State::sendingVirgin);
456 Must(virginSendClaim.active());
457
458 MemBuf &from = *virgin->data->body;
459 MemBuf &to = *adapted->data->body;
460
461 const size_t sizeMax = claimSize(virginSendClaim);
462 const size_t size = XMIN(static_cast<size_t>(to.potentialSpaceSize()),
463 sizeMax);
464 debugs(93, 5, "ICAPModXact echos " << size << " out of " << sizeMax <<
465 " bytes");
466
467 if (size > 0) {
468 to.append(claimContent(virginSendClaim), size);
469 virginSendClaim.release(size);
470 virginConsume();
471 adapted->sendSourceProgress();
472 }
473
474 if (!from.hasContent() && state.doneReceiving) {
475 debugs(93, 5, "ICAPModXact echoed all " << status());
476 stopSending(true);
477 } else {
478 debugs(93, 5, "ICAPModXact has " << from.contentSize() << " bytes " <<
479 "and expects more to echo " << status());
480 virgin->sendSinkNeed(); // TODO: timeout if sink is broken
481 }
482}
483
484bool ICAPModXact::doneSending() const
485{
486 Must((state.sending == State::sendingDone) == (!adapted));
487 return state.sending == State::sendingDone;
488}
489
490void ICAPModXact::stopSending(bool nicely)
491{
492 if (doneSending())
493 return;
494
495 if (state.sending != State::sendingUndecided) {
496 debugs(93, 7, "ICAPModXact will no longer send " << status());
497
498 if (nicely)
499 adapted->sendSourceFinish();
500 else
501 adapted->sendSourceAbort();
502 } else {
503 debugs(93, 7, "ICAPModXact will not start sending " << status());
504 adapted->sendSourceAbort(); // or the sink may wait forever
505 }
506
507 state.sending = State::sendingDone;
508
774c051c 509 adapted = NULL; // refcounted
510}
511
512void ICAPModXact::stopReceiving()
513{
514 // stopSending NULLifies adapted but we do not NULLify virgin.
515 // This is assymetric because we want to keep virgin->data even
516 // though we are not expecting any more virgin->data->body.
517 // TODO: can we cache just the needed headers info instead?
518
519 // If they closed first, there is not point (or means) to notify them.
520
521 if (state.doneReceiving)
522 return;
523
524 // There is no sendSinkFinished() to notify the other side.
525 debugs(93, 7, "ICAPModXact will not receive " << status());
526
527 state.doneReceiving = true;
528}
529
530void ICAPModXact::parseMore()
531{
aa761e5f 532 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " bytes to parse" <<
774c051c 533 status());
d5cfacfb 534 debugs(93, 5, HERE << "\n" << readBuf.content());
774c051c 535
536 if (state.parsingHeaders())
537 parseHeaders();
538
539 if (state.parsing == State::psBody)
540 parseBody();
541}
542
543// note that allocation for echoing is done in handle204NoContent()
544void ICAPModXact::maybeAllocateHttpMsg()
545{
546 if (adapted->data->header) // already allocated
547 return;
548
549 if (gotEncapsulated("res-hdr")) {
d9eb9082 550 adapted->data->setHeader(new HttpReply);
774c051c 551 } else if (gotEncapsulated("req-hdr")) {
d9eb9082 552 adapted->data->setHeader(new HttpRequest);
774c051c 553 } else
554 throw TexcHere("Neither res-hdr nor req-hdr in maybeAllocateHttpMsg()");
555}
556
557void ICAPModXact::parseHeaders()
558{
559 Must(state.parsingHeaders());
560
b107a5a5 561 if (state.parsing == State::psIcapHeader) {
562 debugs(93, 5, HERE << "parse ICAP headers");
774c051c 563 parseIcapHead();
b107a5a5 564 }
774c051c 565
b107a5a5 566 if (state.parsing == State::psHttpHeader) {
567 debugs(93, 5, HERE << "parse HTTP headers");
774c051c 568 parseHttpHead();
b107a5a5 569 }
774c051c 570
571 if (state.parsingHeaders()) { // need more data
572 Must(mayReadMore());
573 return;
574 }
575
576 adapted->sendSourceStart();
577
578 if (state.sending == State::sendingVirgin)
579 echoMore();
580}
581
582void ICAPModXact::parseIcapHead()
583{
584 Must(state.sending == State::sendingUndecided);
585
586 if (!parseHead(icapReply))
587 return;
588
fc764d26 589 if (httpHeaderHasConnDir(&icapReply->header, "close")) {
590 debugs(93, 5, HERE << "found connection close");
591 reuseConnection = false;
592 }
593
774c051c 594 switch (icapReply->sline.status) {
595
596 case 100:
597 handle100Continue();
598 break;
599
600 case 200:
b559db5d 601
602 if (!validate200Ok()) {
603 throw TexcHere("Invalid ICAP Response");
604 } else {
605 handle200Ok();
606 }
607
774c051c 608 break;
609
610 case 204:
611 handle204NoContent();
612 break;
613
614 default:
b559db5d 615 debugs(93, 5, HERE << "ICAP status " << icapReply->sline.status);
774c051c 616 handleUnknownScode();
617 break;
618 }
619
620 // handle100Continue() manages state.writing on its own.
621 // Non-100 status means the server needs no postPreview data from us.
622 if (state.writing == State::writingPaused)
623 stopWriting();
624
625 // TODO: Consider applying a Squid 2.5 patch to recognize 201 responses
626}
627
b559db5d 628bool ICAPModXact::validate200Ok()
629{
630 if (ICAP::methodRespmod == service().method) {
631 if (!gotEncapsulated("res-hdr"))
632 return false;
633
634 return true;
635 }
636
637 if (ICAP::methodReqmod == service().method) {
638 if (!gotEncapsulated("res-hdr") && !gotEncapsulated("req-hdr"))
639 return false;
640
641 return true;
642 }
643
644 return false;
645}
646
774c051c 647void ICAPModXact::handle100Continue()
648{
649 Must(state.writing == State::writingPaused);
650 Must(preview.enabled() && preview.done() && !preview.ieof());
651 Must(virginSendClaim.active());
652
653 if (virginSendClaim.limited()) // preview only
654 stopBackup();
655
656 state.parsing = State::psHttpHeader; // eventually
657
658 state.writing = State::writingPrime;
659
660 writeMore();
661}
662
663void ICAPModXact::handle200Ok()
664{
665 state.parsing = State::psHttpHeader;
666 state.sending = State::sendingAdapted;
667 stopBackup();
668}
669
670void ICAPModXact::handle204NoContent()
671{
672 stopParsing();
673 Must(virginSendClaim.active());
674 virginSendClaim.protectAll(); // extends protection if needed
675 state.sending = State::sendingVirgin;
676
677 // We want to clone the HTTP message, but we do not want
678 // to copy non-HTTP state parts that HttpMsg kids carry in them.
679 // Thus, we cannot use a smart pointer, copy constructor, or equivalent.
680 // Instead, we simply write the HTTP message and "clone" it by parsing.
681
682 HttpMsg *oldHead = virgin->data->header;
683 debugs(93, 7, "ICAPModXact cloning virgin message " << oldHead);
684
685 MemBuf httpBuf;
686
687 // write the virgin message into a memory buffer
688 httpBuf.init();
689 packHead(httpBuf, oldHead);
690
691 // allocate the adapted message
7514268e 692 Must(!adapted->data->header);
693 HttpMsg *newHead = NULL;
774c051c 694
695 if (dynamic_cast<const HttpRequest*>(oldHead))
696 newHead = new HttpRequest;
697 else
698 if (dynamic_cast<const HttpReply*>(oldHead))
699 newHead = new HttpReply;
700
701 Must(newHead);
702
7514268e 703 adapted->data->setHeader(newHead);
704
774c051c 705 // parse the buffer back
706 http_status error = HTTP_STATUS_NONE;
707
708 Must(newHead->parse(&httpBuf, true, &error));
709
710 Must(newHead->hdr_sz == httpBuf.contentSize()); // no leftovers
711
712 httpBuf.clean();
713
714 debugs(93, 7, "ICAPModXact cloned virgin message " << oldHead << " to " << newHead);
715}
716
717void ICAPModXact::handleUnknownScode()
718{
719 stopParsing();
720 stopBackup();
721 // TODO: mark connection as "bad"
722
723 // Terminate the transaction; we do not know how to handle this response.
724 throw TexcHere("Unsupported ICAP status code");
725}
726
727void ICAPModXact::parseHttpHead()
728{
729 if (gotEncapsulated("res-hdr") || gotEncapsulated("req-hdr")) {
730 maybeAllocateHttpMsg();
731
732 if (!parseHead(adapted->data->header))
200ac359 733 return; // need more header data
774c051c 734 }
735
736 state.parsing = State::psBody;
737}
738
fc764d26 739/*
740 * Common routine used to parse both HTTP and ICAP headers
741 */
774c051c 742bool ICAPModXact::parseHead(HttpMsg *head)
743{
744 assert(head);
def17b6a 745 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " head bytes to parse" <<
774c051c 746 "; state: " << state.parsing);
747
748 http_status error = HTTP_STATUS_NONE;
749 const bool parsed = head->parse(&readBuf, commEof, &error);
750 Must(parsed || !error); // success or need more data
751
752 if (!parsed) { // need more data
b107a5a5 753 debugs(93, 5, HERE << "parse failed, need more data, return false");
774c051c 754 head->reset();
755 return false;
756 }
757
b107a5a5 758 debugs(93, 5, HERE << "parse success, consume " << head->hdr_sz << " bytes, return true");
774c051c 759 readBuf.consume(head->hdr_sz);
760 return true;
761}
762
763void ICAPModXact::parseBody()
764{
765 Must(state.parsing == State::psBody);
766
aa761e5f 767 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " body bytes to parse");
774c051c 768
200ac359 769 if (gotEncapsulated("res-body") || gotEncapsulated("req-body")) {
774c051c 770 if (!parsePresentBody()) // need more body data
771 return;
772 } else {
b559db5d 773 debugs(93, 5, HERE << "not expecting a body");
774c051c 774 }
775
776 stopParsing();
777 stopSending(true);
778}
779
780// returns true iff complete body was parsed
781bool ICAPModXact::parsePresentBody()
782{
783 if (!bodyParser)
784 bodyParser = new ChunkedCodingParser;
785
786 // the parser will throw on errors
787 const bool parsed = bodyParser->parse(&readBuf, adapted->data->body);
788
789 adapted->sendSourceProgress(); // TODO: do not send if parsed nothing
790
aa761e5f 791 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " body bytes after " <<
774c051c 792 "parse; parsed all: " << parsed);
793
794 if (parsed)
795 return true;
796
3b299123 797 debugs(32,3,HERE << this << " needsMoreData = " << bodyParser->needsMoreData());
798
799 if (bodyParser->needsMoreData()) {
800 debugs(32,3,HERE << this);
774c051c 801 Must(mayReadMore());
3b299123 802 readMore();
803 }
774c051c 804
805 if (bodyParser->needsMoreSpace()) {
806 Must(!doneSending()); // can hope for more space
807 Must(adapted->data->body->hasContent()); // paranoid
808 // TODO: there should be a timeout in case the sink is broken.
809 }
810
811 return false;
812}
813
814void ICAPModXact::stopParsing()
815{
816 if (state.parsing == State::psDone)
817 return;
818
819 debugs(93, 7, "ICAPModXact will no longer parse " << status());
820
821 delete bodyParser;
822
823 bodyParser = NULL;
824
825 state.parsing = State::psDone;
826}
827
828// HTTP side added virgin body data
829void ICAPModXact::noteSourceProgress(MsgPipe *p)
830{
831 ICAPXaction_Enter(noteSourceProgress);
832
833 Must(!state.doneReceiving);
834 writeMore();
835
836 if (state.sending == State::sendingVirgin)
837 echoMore();
838
839 ICAPXaction_Exit();
840}
841
842// HTTP side sent us all virgin info
843void ICAPModXact::noteSourceFinish(MsgPipe *p)
844{
845 ICAPXaction_Enter(noteSourceFinish);
846
847 Must(!state.doneReceiving);
848 stopReceiving();
849
850 // push writer and sender in case we were waiting for the last-chunk
851 writeMore();
852
853 if (state.sending == State::sendingVirgin)
854 echoMore();
855
856 ICAPXaction_Exit();
857}
858
859// HTTP side is aborting
860void ICAPModXact::noteSourceAbort(MsgPipe *p)
861{
862 ICAPXaction_Enter(noteSourceAbort);
863
864 Must(!state.doneReceiving);
865 stopReceiving();
866 mustStop("HTTP source quit");
867
868 ICAPXaction_Exit();
869}
870
871// HTTP side wants more adapted data and possibly freed some buffer space
872void ICAPModXact::noteSinkNeed(MsgPipe *p)
873{
874 ICAPXaction_Enter(noteSinkNeed);
875
876 if (state.sending == State::sendingVirgin)
877 echoMore();
3b299123 878 else if (state.sending == State::sendingAdapted)
879 parseMore();
774c051c 880 else
3b299123 881 Must(state.sending == State::sendingUndecided);
774c051c 882
883 ICAPXaction_Exit();
884}
885
886// HTTP side aborted
887void ICAPModXact::noteSinkAbort(MsgPipe *p)
888{
889 ICAPXaction_Enter(noteSinkAbort);
890
891 mustStop("HTTP sink quit");
892
893 ICAPXaction_Exit();
894}
895
896// internal cleanup
897void ICAPModXact::doStop()
898{
b107a5a5 899 debugs(98, 5, HERE << "doStop() called");
774c051c 900 ICAPXaction::doStop();
901
902 stopWriting();
903 stopBackup();
904
905 if (icapReply) {
906 delete icapReply;
907 icapReply = NULL;
908 }
909
910 stopSending(false);
911
912 // see stopReceiving() for reasons it cannot NULLify virgin there
913
914 if (virgin != NULL) {
915 if (!state.doneReceiving)
916 virgin->sendSinkAbort();
917 else
918 virgin->sink = NULL;
919
920 virgin = NULL; // refcounted
921 }
922
923 if (self != NULL) {
924 Pointer s = self;
925 self = NULL;
926 ICAPNoteXactionDone(s);
927 /* this object may be destroyed when 's' is cleared */
928 }
929}
930
931void ICAPModXact::makeRequestHeaders(MemBuf &buf)
932{
12b91c99 933 /*
934 * XXX These should use HttpHdr interfaces instead of Printfs
935 */
774c051c 936 const ICAPServiceRep &s = service();
937 buf.Printf("%s %s ICAP/1.0\r\n", s.methodStr(), s.uri.buf());
938 buf.Printf("Host: %s:%d\r\n", s.host.buf(), s.port);
12b91c99 939 buf.Printf("Date: %s\r\n", mkrfc1123(squid_curtime));
940
941 if (!TheICAPConfig.reuse_connections)
942 buf.Printf("Connection: close\r\n");
943
774c051c 944 buf.Printf("Encapsulated: ");
945
946 MemBuf httpBuf;
12b91c99 947
774c051c 948 httpBuf.init();
949
950 // build HTTP request header, if any
951 ICAP::Method m = s.method;
952
953 if (ICAP::methodRespmod == m && virgin->data->cause)
954 encapsulateHead(buf, "req-hdr", httpBuf, virgin->data->cause);
955 else if (ICAP::methodReqmod == m)
956 encapsulateHead(buf, "req-hdr", httpBuf, virgin->data->header);
957
958 if (ICAP::methodRespmod == m)
959 if (const MsgPipeData::Header *prime = virgin->data->header)
960 encapsulateHead(buf, "res-hdr", httpBuf, prime);
961
962 if (!virginBody.expected())
1dd6edf2 963 buf.Printf("null-body=%d", (int) httpBuf.contentSize());
774c051c 964 else if (ICAP::methodReqmod == m)
1dd6edf2 965 buf.Printf("req-body=%d", (int) httpBuf.contentSize());
774c051c 966 else
1dd6edf2 967 buf.Printf("res-body=%d", (int) httpBuf.contentSize());
774c051c 968
969 buf.append(ICAP::crlf, 2); // terminate Encapsulated line
970
971 if (shouldPreview()) {
972 buf.Printf("Preview: %d\r\n", (int)preview.ad());
973 virginSendClaim.protectUpTo(preview.ad());
974 }
975
976 if (shouldAllow204()) {
977 buf.Printf("Allow: 204\r\n");
978 // be robust: do not rely on the expected body size
979 virginSendClaim.protectAll();
980 }
981
a97e82a8 982 const HttpRequest *request = virgin->data->cause ?
983 virgin->data->cause :
984 dynamic_cast<const HttpRequest*>(virgin->data->header);
985
12b91c99 986 if (TheICAPConfig.send_client_ip)
987 if (request->client_addr.s_addr != any_addr.s_addr)
988 buf.Printf("X-Client-IP: %s\r\n", inet_ntoa(request->client_addr));
a97e82a8 989
12b91c99 990 if (TheICAPConfig.send_client_username)
991 if (request->auth_user_request)
992 if (request->auth_user_request->username())
993 buf.Printf("X-Client-Username: %s\r\n", request->auth_user_request->username());
a97e82a8 994
2dfede9e 995 // fprintf(stderr, "%s\n", buf.content());
a97e82a8 996
774c051c 997 buf.append(ICAP::crlf, 2); // terminate ICAP header
998
999 // start ICAP request body with encapsulated HTTP headers
1000 buf.append(httpBuf.content(), httpBuf.contentSize());
1001
1002 httpBuf.clean();
1003}
1004
1005void ICAPModXact::encapsulateHead(MemBuf &icapBuf, const char *section, MemBuf &httpBuf, const HttpMsg *head)
1006{
1007 // update ICAP header
7cab7e9f 1008 icapBuf.Printf("%s=%d, ", section, (int) httpBuf.contentSize());
774c051c 1009
1010 // pack HTTP head
1011 packHead(httpBuf, head);
1012}
1013
1014void ICAPModXact::packHead(MemBuf &httpBuf, const HttpMsg *head)
1015{
1016 Packer p;
1017 packerToMemInit(&p, &httpBuf);
1018 head->packInto(&p, true);
1019 packerClean(&p);
1020}
1021
1022// decides whether to offer a preview and calculates its size
1023bool ICAPModXact::shouldPreview()
1024{
1025 size_t wantedSize;
1026
7cdbbd47 1027 if (!TheICAPConfig.preview_enable) {
1028 debugs(93, 5, HERE << "preview disabled by squid.conf");
1029 return false;
1030 }
1031
774c051c 1032 if (!service().wantsPreview(wantedSize)) {
1033 debugs(93, 5, "ICAPModXact should not offer preview");
1034 return false;
1035 }
1036
1037 Must(wantedSize >= 0);
1038
1039 // cannot preview more than we can backup
1040 size_t ad = XMIN(wantedSize, TheBackupLimit);
1041
1042 if (virginBody.expected() && virginBody.knownSize())
1043 ad = XMIN(ad, virginBody.size()); // not more than we have
1044 else
1045 ad = 0; // questionable optimization?
1046
1047 debugs(93, 5, "ICAPModXact should offer " << ad << "-byte preview " <<
1048 "(service wanted " << wantedSize << ")");
1049
1050 preview.enable(ad);
1051
1052 return preview.enabled();
1053}
1054
1055// decides whether to allow 204 responses
1056bool ICAPModXact::shouldAllow204()
1057{
1058 if (!service().allows204())
1059 return false;
1060
1061 if (!virginBody.expected())
1062 return true; // no body means no problems with supporting 204s.
1063
1064 // if there is a body, make sure we can backup it all
1065
1066 if (!virginBody.knownSize())
1067 return false;
1068
1069 // or should we have a different backup limit?
1070 // note that '<' allows for 0-termination of the "full" backup buffer
1071 return virginBody.size() < TheBackupLimit;
1072}
1073
1074// returns a temporary string depicting transaction status, for debugging
1075void ICAPModXact::fillPendingStatus(MemBuf &buf) const
1076{
1077 if (state.serviceWaiting)
1078 buf.append("U", 1);
1079
1080 if (!state.doneWriting() && state.writing != State::writingInit)
1081 buf.Printf("w(%d)", state.writing);
1082
1083 if (preview.enabled()) {
1084 if (!preview.done())
1dd6edf2 1085 buf.Printf("P(%d)", (int) preview.debt());
774c051c 1086 }
1087
1088 if (virginSendClaim.active())
1089 buf.append("B", 1);
1090
1091 if (!state.doneParsing() && state.parsing != State::psIcapHeader)
1092 buf.Printf("p(%d)", state.parsing);
1093
1094 if (!doneSending() && state.sending != State::sendingUndecided)
1095 buf.Printf("S(%d)", state.sending);
1096}
1097
1098void ICAPModXact::fillDoneStatus(MemBuf &buf) const
1099{
1100 if (state.doneReceiving)
1101 buf.append("R", 1);
1102
1103 if (state.doneWriting())
1104 buf.append("w", 1);
1105
1106 if (preview.enabled()) {
1107 if (preview.done())
1108 buf.Printf("P%s", preview.ieof() ? "(ieof)" : "");
1109 }
1110
1111 if (doneReading())
1112 buf.append("r", 1);
1113
1114 if (state.doneParsing())
1115 buf.append("p", 1);
1116
1117 if (doneSending())
1118 buf.append("S", 1);
1119}
1120
1121bool ICAPModXact::gotEncapsulated(const char *section) const
1122{
a9925b40 1123 return icapReply->header.getByNameListMember("Encapsulated",
1124 section, ',').size() > 0;
774c051c 1125}
1126
1127// calculate whether there is a virgin HTTP body and
1128// whether its expected size is known
1129void ICAPModXact::estimateVirginBody()
1130{
1131 // note: defaults should be fine but will disable previews and 204s
1132
1133 Must(virgin != NULL && virgin->data->header);
1134
1135 method_t method;
1136
1137 if (virgin->data->cause)
1138 method = virgin->data->cause->method;
1139 else
a97e82a8 1140 if (HttpRequest *req = dynamic_cast<HttpRequest*>(virgin->data->
1141 header))
774c051c 1142 method = req->method;
1143 else
1144 return;
1145
1146 ssize_t size;
1147 if (virgin->data->header->expectingBody(method, size)) {
1148 virginBody.expect(size)
1149 ;
1150 debugs(93, 6, "ICAPModXact expects virgin body; size: " << size);
1151 } else {
1152 debugs(93, 6, "ICAPModXact does not expect virgin body");
1153 }
1154}
1155
1156
1157// TODO: Move SizedEstimate, MemBufBackup, and ICAPPreview elsewhere
1158
1159SizedEstimate::SizedEstimate()
1160 : theData(dtUnexpected)
1161{}
1162
1163void SizedEstimate::expect(ssize_t aSize)
1164{
1165 theData = (aSize >= 0) ? aSize : (ssize_t)dtUnknown;
1166}
1167
1168bool SizedEstimate::expected() const
1169{
1170 return theData != dtUnexpected;
1171}
1172
1173bool SizedEstimate::knownSize() const
1174{
1175 Must(expected());
1176 return theData != dtUnknown;
1177}
1178
1179size_t SizedEstimate::size() const
1180{
1181 Must(knownSize());
1182 return static_cast<size_t>(theData);
1183}
1184
1185
1186
1187MemBufClaim::MemBufClaim(): theStart(-1), theGoal(-1)
1188{}
1189
1190void MemBufClaim::protectAll()
1191{
1192 if (theStart < 0)
1193 theStart = 0;
1194
1195 theGoal = -1; // no specific goal
1196}
1197
1198void MemBufClaim::protectUpTo(size_t aGoal)
1199{
1200 if (theStart < 0)
1201 theStart = 0;
1202
1203 Must(aGoal >= 0);
1204
1205 theGoal = (theGoal < 0) ? static_cast<ssize_t>(aGoal) :
1206 XMIN(static_cast<ssize_t>(aGoal), theGoal);
1207}
1208
1209void MemBufClaim::disable()
1210{
1211 theStart = -1;
1212}
1213
1214void MemBufClaim::release(size_t size)
1215{
1216 Must(active());
1217 Must(size >= 0);
1218 theStart += static_cast<ssize_t>(size);
1219
1220 if (limited() && theStart >= theGoal)
1221 disable();
1222}
1223
1224size_t MemBufClaim::offset() const
1225{
1226 Must(active());
1227 return static_cast<size_t>(theStart);
1228}
1229
1230bool MemBufClaim::limited() const
1231{
1232 Must(active());
1233 return theGoal >= 0;
1234}
1235
1236
1237ICAPPreview::ICAPPreview(): theWritten(0), theAd(0), theState(stDisabled)
1238{}
1239
1240void ICAPPreview::enable(size_t anAd)
1241{
1242 // TODO: check for anAd not exceeding preview size limit
1243 Must(anAd >= 0);
1244 Must(!enabled());
1245 theAd = anAd;
1246 theState = stWriting;
1247}
1248
1249bool ICAPPreview::enabled() const
1250{
1251 return theState != stDisabled;
1252}
1253
1254size_t ICAPPreview::ad() const
1255{
1256 Must(enabled());
1257 return theAd;
1258}
1259
1260bool ICAPPreview::done() const
1261{
1262 Must(enabled());
1263 return theState >= stIeof;
1264}
1265
1266bool ICAPPreview::ieof() const
1267{
1268 Must(enabled());
1269 return theState == stIeof;
1270}
1271
1272size_t ICAPPreview::debt() const
1273{
1274 Must(enabled());
1275 return done() ? 0 : (theAd - theWritten);
1276}
1277
1278void ICAPPreview::wrote(size_t size, bool sawEof)
1279{
1280 Must(enabled());
1281 theWritten += size;
1282
1283 if (theWritten >= theAd)
1284 theState = stDone; // sawEof is irrelevant
1285 else
1286 if (sawEof)
1287 theState = stIeof;
1288}
1289
3cfc19b3 1290bool ICAPModXact::fillVirginHttpHeader(MemBuf &mb) const
1291{
1292 if (virgin == NULL)
1293 return false;
1294
1295 if (virgin->data == NULL)
1296 return false;
1297
1298 if (virgin->data->header == NULL)
1299 return false;
1300
1301 virgin->data->header->firstLineBuf(mb);
1302
1303 return true;
1304}