]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ICAP/ICAPModXact.cc
- Replaced BodyReader with BodyPipe. BodyReader was a
[thirdparty/squid.git] / src / ICAP / ICAPModXact.cc
CommitLineData
774c051c 1/*
2 * DEBUG: section 93 ICAP (RFC 3507) Client
3 */
4
5#include "squid.h"
6#include "comm.h"
7#include "MsgPipe.h"
8#include "MsgPipeData.h"
9#include "HttpRequest.h"
10#include "HttpReply.h"
11#include "ICAPServiceRep.h"
12#include "ICAPModXact.h"
13#include "ICAPClient.h"
14#include "ChunkedCodingParser.h"
15#include "TextException.h"
a97e82a8 16#include "AuthUserRequest.h"
12b91c99 17#include "ICAPConfig.h"
985c86bc 18#include "SquidTime.h"
774c051c 19
20// flow and terminology:
21// HTTP| --> receive --> encode --> write --> |network
22// end | <-- send <-- parse <-- read <-- |end
23
24// TODO: doneSending()/doneReceving() data members should probably be in sync
25// with this->adapted/virgin pointers. Make adapted/virgin methods?
26
27// TODO: replace gotEncapsulated() with something faster; we call it often
28
29CBDATA_CLASS_INIT(ICAPModXact);
30
31static const size_t TheBackupLimit = ICAP::MsgPipeBufSizeMax;
32
12b91c99 33extern ICAPConfig TheICAPConfig;
34
774c051c 35
36ICAPModXact::State::State()
37{
38 memset(this, sizeof(*this), 0);
39}
40
41ICAPModXact::ICAPModXact(): ICAPXaction("ICAPModXact"),
42 self(NULL), virgin(NULL), adapted(NULL),
43 icapReply(NULL), virginConsumed(0),
44 bodyParser(NULL)
45{}
46
47void ICAPModXact::init(ICAPServiceRep::Pointer &aService, MsgPipe::Pointer &aVirgin, MsgPipe::Pointer &anAdapted, Pointer &aSelf)
48{
49 assert(!self.getRaw() && !virgin.getRaw() && !adapted.getRaw());
50 assert(aSelf.getRaw() && aVirgin.getRaw() && anAdapted.getRaw());
51
52 self = aSelf;
53 service(aService);
54
55 virgin = aVirgin;
56 adapted = anAdapted;
57
58 // receiving end
59 virgin->sink = this; // should be 'self' and refcounted
60 // virgin pipe data is initiated by the source
61
62 // sending end
63 adapted->source = this; // should be 'self' and refcounted
64 adapted->data = new MsgPipeData;
65
66 adapted->data->body = new MemBuf; // XXX: make body a non-pointer?
67 adapted->data->body->init(ICAP::MsgPipeBufSizeMin, ICAP::MsgPipeBufSizeMax);
68 // headers are initialized when we parse them
69
70 // writing and reading ends are handled by ICAPXaction
71
72 // encoding
73 // nothing to do because we are using temporary buffers
74
75 // parsing
76 icapReply = new HttpReply;
77 icapReply->protoPrefix = "ICAP/"; // TODO: make an IcapReply class?
78
79 // XXX: make sure stop() cleans all buffers
80}
81
82// HTTP side starts sending virgin data
83void ICAPModXact::noteSourceStart(MsgPipe *p)
84{
85 ICAPXaction_Enter(noteSourceStart);
86
87 // make sure TheBackupLimit is in-sync with the buffer size
88 Must(TheBackupLimit <= static_cast<size_t>(virgin->data->body->max_capacity));
89
90 estimateVirginBody(); // before virgin disappears!
91
92 // it is an ICAP violation to send request to a service w/o known OPTIONS
93
94 if (service().up())
95 startWriting();
96 else
97 waitForService();
98
99 // XXX: but this has to be here to catch other errors. Thus, if
100 // commConnectStart in startWriting fails, we may get here
101 //_after_ the object got destroyed. Somebody please fix commConnectStart!
102 ICAPXaction_Exit();
103}
104
105static
106void ICAPModXact_noteServiceReady(void *data, ICAPServiceRep::Pointer &)
107{
108 ICAPModXact *x = static_cast<ICAPModXact*>(data);
109 assert(x);
110 x->noteServiceReady();
111}
112
113void ICAPModXact::waitForService()
114{
115 Must(!state.serviceWaiting);
116 debugs(93, 7, "ICAPModXact will wait for the ICAP service " << status());
117 state.serviceWaiting = true;
118 service().callWhenReady(&ICAPModXact_noteServiceReady, this);
119}
120
121void ICAPModXact::noteServiceReady()
122{
123 ICAPXaction_Enter(noteServiceReady);
124
125 Must(state.serviceWaiting);
126 state.serviceWaiting = false;
c99de607 127
128 Must(service().up());
129
130 startWriting();
774c051c 131
132 ICAPXaction_Exit();
133}
134
135void ICAPModXact::startWriting()
136{
774c051c 137 state.writing = State::writingConnect;
138 openConnection();
139 // put nothing here as openConnection calls commConnectStart
140 // and that may call us back without waiting for the next select loop
141}
142
143// connection with the ICAP service established
144void ICAPModXact::handleCommConnected()
145{
146 Must(state.writing == State::writingConnect);
147
148 startReading(); // wait for early errors from the ICAP server
149
150 MemBuf requestBuf;
151 requestBuf.init();
152
153 makeRequestHeaders(requestBuf);
3b299123 154 debugs(93, 9, "ICAPModXact ICAP status " << status() << " will write:\n" <<
774c051c 155 (requestBuf.terminate(), requestBuf.content()));
156
157 // write headers
158 state.writing = State::writingHeaders;
159 scheduleWrite(requestBuf);
3b299123 160 virgin->sendSinkNeed();
774c051c 161}
162
b107a5a5 163void ICAPModXact::handleCommWrote(size_t sz)
774c051c 164{
b107a5a5 165 debugs(93, 5, HERE << "Wrote " << sz << " bytes");
166
774c051c 167 if (state.writing == State::writingHeaders)
168 handleCommWroteHeaders();
169 else
170 handleCommWroteBody();
171}
172
173void ICAPModXact::handleCommWroteHeaders()
174{
175 Must(state.writing == State::writingHeaders);
176
177 if (virginBody.expected()) {
178 state.writing = preview.enabled() ?
179 State::writingPreview : State::writingPrime;
180 virginWriteClaim.protectAll();
181 writeMore();
182 } else {
c99de607 183 stopWriting(true);
774c051c 184 }
185}
186
187void ICAPModXact::writeMore()
188{
189 if (writer) // already writing something
190 return;
191
192 switch (state.writing) {
193
194 case State::writingInit: // waiting for service OPTIONS
195 Must(state.serviceWaiting);
196
197 case State::writingConnect: // waiting for the connection to establish
198
199 case State::writingHeaders: // waiting for the headers to be written
200
201 case State::writingPaused: // waiting for the ICAP server response
202
c99de607 203 case State::writingReallyDone: // nothing more to write
204 return;
205
206 case State::writingAlmostDone: // was waiting for the last write
207 stopWriting(false);
774c051c 208 return;
209
210 case State::writingPreview:
211 writePriviewBody();
212 return;
213
214 case State::writingPrime:
215 writePrimeBody();
216 return;
217
218 default:
219 throw TexcHere("ICAPModXact in bad writing state");
220 }
221}
222
223void ICAPModXact::writePriviewBody()
224{
225 debugs(93, 8, "ICAPModXact will write Preview body " << status());
226 Must(state.writing == State::writingPreview);
227
228 MsgPipeData::Body *body = virgin->data->body;
229 const size_t size = XMIN(preview.debt(), (size_t)body->contentSize());
230 writeSomeBody("preview body", size);
231
232 // change state once preview is written
233
234 if (preview.done()) {
235 debugs(93, 7, "ICAPModXact wrote entire Preview body " << status());
236
237 if (preview.ieof())
c99de607 238 stopWriting(true);
774c051c 239 else
240 state.writing = State::writingPaused;
241 }
242}
243
244void ICAPModXact::writePrimeBody()
245{
246 Must(state.writing == State::writingPrime);
247 Must(virginWriteClaim.active());
248
249 MsgPipeData::Body *body = virgin->data->body;
250 const size_t size = body->contentSize();
251 writeSomeBody("prime virgin body", size);
252
c99de607 253 if (state.doneReceiving && claimSize(virginWriteClaim) <= 0) {
254 debugs(93, 5, HERE << "state.doneReceiving is set and wrote all");
255 stopWriting(true);
b107a5a5 256 }
774c051c 257}
258
259void ICAPModXact::writeSomeBody(const char *label, size_t size)
260{
c99de607 261 Must(!writer && state.writing < state.writingAlmostDone);
12f4b710 262 debugs(93, 8, HERE << "will write up to " << size << " bytes of " <<
774c051c 263 label);
264
265 MemBuf writeBuf; // TODO: suggest a min size based on size and lastChunk
266
267 writeBuf.init(); // note: we assume that last-chunk will fit
268
c99de607 269 const size_t writableSize = claimSize(virginWriteClaim);
270 const size_t chunkSize = XMIN(writableSize, size);
774c051c 271
272 if (chunkSize) {
12f4b710 273 debugs(93, 7, HERE << "will write " << chunkSize <<
774c051c 274 "-byte chunk of " << label);
275 } else {
c99de607 276 debugs(93, 7, "ICAPModXact has no writable " << label << " content");
774c051c 277 }
278
279 moveRequestChunk(writeBuf, chunkSize);
280
281 const bool lastChunk =
282 (state.writing == State::writingPreview && preview.done()) ||
283 (state.doneReceiving && claimSize(virginWriteClaim) <= 0);
284
285 if (lastChunk && virginBody.expected()) {
12f4b710 286 debugs(93, 8, HERE << "will write last-chunk of " << label);
774c051c 287 addLastRequestChunk(writeBuf);
288 }
289
12f4b710 290 debugs(93, 7, HERE << "will write " << writeBuf.contentSize()
774c051c 291 << " raw bytes of " << label);
292
293 if (writeBuf.hasContent()) {
294 scheduleWrite(writeBuf); // comm will free the chunk
295 } else {
296 writeBuf.clean();
297 }
298}
299
300void ICAPModXact::moveRequestChunk(MemBuf &buf, size_t chunkSize)
301{
302 if (chunkSize > 0) {
c99de607 303 openChunk(buf, chunkSize, false);
774c051c 304 buf.append(claimContent(virginWriteClaim), chunkSize);
c99de607 305 closeChunk(buf);
774c051c 306
307 virginWriteClaim.release(chunkSize);
308 virginConsume();
309 }
310
c99de607 311 if (state.writing == State::writingPreview) {
312 // even if we are doneReceiving, we may not have written everything
313 const bool wroteEof = state.doneReceiving &&
314 claimSize(virginWriteClaim) <= 0;
315 preview.wrote(chunkSize, wroteEof); // even if wrote nothing
316 }
774c051c 317}
318
319void ICAPModXact::addLastRequestChunk(MemBuf &buf)
320{
c99de607 321 const bool ieof = state.writing == State::writingPreview && preview.ieof();
322 openChunk(buf, 0, ieof);
323 closeChunk(buf);
774c051c 324}
325
c99de607 326void ICAPModXact::openChunk(MemBuf &buf, size_t chunkSize, bool ieof)
774c051c 327{
c99de607 328 buf.Printf((ieof ? "%x; ieof\r\n" : "%x\r\n"), (int) chunkSize);
774c051c 329}
330
c99de607 331void ICAPModXact::closeChunk(MemBuf &buf)
774c051c 332{
774c051c 333 buf.append(ICAP::crlf, 2); // chunk-terminating CRLF
334}
335
336size_t ICAPModXact::claimSize(const MemBufClaim &claim) const
337{
338 Must(claim.active());
339 const size_t start = claim.offset();
340 const size_t end = virginConsumed + virgin->data->body->contentSize();
341 Must(virginConsumed <= start && start <= end);
342 return end - start;
343}
344
345const char *ICAPModXact::claimContent(const MemBufClaim &claim) const
346{
347 Must(claim.active());
348 const size_t start = claim.offset();
349 Must(virginConsumed <= start);
350 return virgin->data->body->content() + (start - virginConsumed);
351}
352
353void ICAPModXact::virginConsume()
354{
355 MemBuf &buf = *virgin->data->body;
356 const size_t have = static_cast<size_t>(buf.contentSize());
357 const size_t end = virginConsumed + have;
358 size_t offset = end;
359
360 if (virginWriteClaim.active())
361 offset = XMIN(virginWriteClaim.offset(), offset);
362
363 if (virginSendClaim.active())
364 offset = XMIN(virginSendClaim.offset(), offset);
365
366 Must(virginConsumed <= offset && offset <= end);
367
368 if (const size_t size = offset - virginConsumed) {
b107a5a5 369 debugs(93, 8, HERE << "consuming " << size << " out of " << have <<
774c051c 370 " virgin body bytes");
371 buf.consume(size);
372 virginConsumed += size;
373
374 if (!state.doneReceiving)
375 virgin->sendSinkNeed();
376 }
377}
378
379void ICAPModXact::handleCommWroteBody()
380{
381 writeMore();
382}
383
c99de607 384// Called when we do not expect to call comm_write anymore.
385// We may have a pending write though.
386// If stopping nicely, we will just wait for that pending write, if any.
387void ICAPModXact::stopWriting(bool nicely)
774c051c 388{
c99de607 389 if (state.writing == State::writingReallyDone)
774c051c 390 return;
391
c99de607 392 if (writer) {
393 if (nicely) {
394 debugs(93, 7, HERE << "will wait for the last write " << status());
395 state.writing = State::writingAlmostDone; // may already be set
396 return;
397 }
398 debugs(93, 2, HERE << "will NOT wait for the last write " << status());
774c051c 399
c99de607 400 // Comm does not have an interface to clear the writer callback nicely,
401 // but without clearing the writer we cannot recycle the connection.
402 // We prevent connection reuse and hope that we can handle a callback
403 // call at any time. Somebody should either fix this code or add
404 // comm_remove_write_handler() to comm API.
405 reuseConnection = false;
406 }
407
408 debugs(93, 7, HERE << "will no longer write " << status());
409 state.writing = State::writingReallyDone;
774c051c 410
411 virginWriteClaim.disable();
412
413 virginConsume();
774c051c 414}
415
416void ICAPModXact::stopBackup()
417{
418 if (!virginSendClaim.active())
419 return;
420
421 debugs(93, 7, "ICAPModXact will no longer backup " << status());
422
423 virginSendClaim.disable();
424
425 virginConsume();
426}
427
428bool ICAPModXact::doneAll() const
429{
430 return ICAPXaction::doneAll() && !state.serviceWaiting &&
431 state.doneReceiving && doneSending() &&
432 doneReading() && state.doneWriting();
433}
434
435void ICAPModXact::startReading()
436{
437 Must(connection >= 0);
438 Must(!reader);
439 Must(adapted.getRaw());
440 Must(adapted->data);
441 Must(adapted->data->body);
442
443 // we use the same buffer for headers and body and then consume headers
444 readMore();
445}
446
447void ICAPModXact::readMore()
448{
3b299123 449 if (reader || doneReading()) {
c99de607 450 debugs(93,3,HERE << "returning from readMore because reader or doneReading()");
774c051c 451 return;
3b299123 452 }
774c051c 453
454 // do not fill readBuf if we have no space to store the result
3b299123 455 if (!adapted->data->body->hasPotentialSpace()) {
c99de607 456 debugs(93,3,HERE << "not reading because ICAP reply buffer is full");
774c051c 457 return;
3b299123 458 }
774c051c 459
460 if (readBuf.hasSpace())
461 scheduleRead();
3b299123 462 else
c99de607 463 debugs(93,3,HERE << "nothing to do because !readBuf.hasSpace()");
774c051c 464}
465
466// comm module read a portion of the ICAP response for us
467void ICAPModXact::handleCommRead(size_t)
468{
469 Must(!state.doneParsing());
470 parseMore();
471 readMore();
472}
473
474void ICAPModXact::echoMore()
475{
476 Must(state.sending == State::sendingVirgin);
477 Must(virginSendClaim.active());
478
479 MemBuf &from = *virgin->data->body;
480 MemBuf &to = *adapted->data->body;
481
482 const size_t sizeMax = claimSize(virginSendClaim);
483 const size_t size = XMIN(static_cast<size_t>(to.potentialSpaceSize()),
484 sizeMax);
485 debugs(93, 5, "ICAPModXact echos " << size << " out of " << sizeMax <<
486 " bytes");
487
488 if (size > 0) {
489 to.append(claimContent(virginSendClaim), size);
490 virginSendClaim.release(size);
491 virginConsume();
492 adapted->sendSourceProgress();
493 }
494
c99de607 495 if (state.doneReceiving && claimSize(virginSendClaim) <= 0) {
774c051c 496 debugs(93, 5, "ICAPModXact echoed all " << status());
497 stopSending(true);
498 } else {
499 debugs(93, 5, "ICAPModXact has " << from.contentSize() << " bytes " <<
500 "and expects more to echo " << status());
501 virgin->sendSinkNeed(); // TODO: timeout if sink is broken
502 }
503}
504
505bool ICAPModXact::doneSending() const
506{
507 Must((state.sending == State::sendingDone) == (!adapted));
508 return state.sending == State::sendingDone;
509}
510
511void ICAPModXact::stopSending(bool nicely)
512{
513 if (doneSending())
514 return;
515
516 if (state.sending != State::sendingUndecided) {
517 debugs(93, 7, "ICAPModXact will no longer send " << status());
518
519 if (nicely)
520 adapted->sendSourceFinish();
521 else
522 adapted->sendSourceAbort();
523 } else {
524 debugs(93, 7, "ICAPModXact will not start sending " << status());
525 adapted->sendSourceAbort(); // or the sink may wait forever
526 }
527
528 state.sending = State::sendingDone;
529
774c051c 530 adapted = NULL; // refcounted
531}
532
533void ICAPModXact::stopReceiving()
534{
535 // stopSending NULLifies adapted but we do not NULLify virgin.
536 // This is assymetric because we want to keep virgin->data even
537 // though we are not expecting any more virgin->data->body.
538 // TODO: can we cache just the needed headers info instead?
539
540 // If they closed first, there is not point (or means) to notify them.
541
542 if (state.doneReceiving)
543 return;
544
545 // There is no sendSinkFinished() to notify the other side.
546 debugs(93, 7, "ICAPModXact will not receive " << status());
547
548 state.doneReceiving = true;
549}
550
551void ICAPModXact::parseMore()
552{
aa761e5f 553 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " bytes to parse" <<
774c051c 554 status());
d5cfacfb 555 debugs(93, 5, HERE << "\n" << readBuf.content());
774c051c 556
557 if (state.parsingHeaders())
558 parseHeaders();
559
560 if (state.parsing == State::psBody)
561 parseBody();
562}
563
564// note that allocation for echoing is done in handle204NoContent()
565void ICAPModXact::maybeAllocateHttpMsg()
566{
567 if (adapted->data->header) // already allocated
568 return;
569
570 if (gotEncapsulated("res-hdr")) {
d9eb9082 571 adapted->data->setHeader(new HttpReply);
774c051c 572 } else if (gotEncapsulated("req-hdr")) {
d9eb9082 573 adapted->data->setHeader(new HttpRequest);
774c051c 574 } else
575 throw TexcHere("Neither res-hdr nor req-hdr in maybeAllocateHttpMsg()");
576}
577
578void ICAPModXact::parseHeaders()
579{
580 Must(state.parsingHeaders());
581
b107a5a5 582 if (state.parsing == State::psIcapHeader) {
583 debugs(93, 5, HERE << "parse ICAP headers");
774c051c 584 parseIcapHead();
b107a5a5 585 }
774c051c 586
b107a5a5 587 if (state.parsing == State::psHttpHeader) {
588 debugs(93, 5, HERE << "parse HTTP headers");
774c051c 589 parseHttpHead();
b107a5a5 590 }
774c051c 591
592 if (state.parsingHeaders()) { // need more data
593 Must(mayReadMore());
594 return;
595 }
596
597 adapted->sendSourceStart();
598
599 if (state.sending == State::sendingVirgin)
600 echoMore();
601}
602
603void ICAPModXact::parseIcapHead()
604{
605 Must(state.sending == State::sendingUndecided);
606
607 if (!parseHead(icapReply))
608 return;
609
fc764d26 610 if (httpHeaderHasConnDir(&icapReply->header, "close")) {
611 debugs(93, 5, HERE << "found connection close");
612 reuseConnection = false;
613 }
614
774c051c 615 switch (icapReply->sline.status) {
616
617 case 100:
618 handle100Continue();
619 break;
620
621 case 200:
b559db5d 622
623 if (!validate200Ok()) {
624 throw TexcHere("Invalid ICAP Response");
625 } else {
626 handle200Ok();
627 }
628
774c051c 629 break;
630
631 case 204:
632 handle204NoContent();
633 break;
634
635 default:
b559db5d 636 debugs(93, 5, HERE << "ICAP status " << icapReply->sline.status);
774c051c 637 handleUnknownScode();
638 break;
639 }
640
641 // handle100Continue() manages state.writing on its own.
642 // Non-100 status means the server needs no postPreview data from us.
643 if (state.writing == State::writingPaused)
c99de607 644 stopWriting(true);
774c051c 645
646 // TODO: Consider applying a Squid 2.5 patch to recognize 201 responses
647}
648
b559db5d 649bool ICAPModXact::validate200Ok()
650{
651 if (ICAP::methodRespmod == service().method) {
652 if (!gotEncapsulated("res-hdr"))
653 return false;
654
655 return true;
656 }
657
658 if (ICAP::methodReqmod == service().method) {
659 if (!gotEncapsulated("res-hdr") && !gotEncapsulated("req-hdr"))
660 return false;
661
662 return true;
663 }
664
665 return false;
666}
667
774c051c 668void ICAPModXact::handle100Continue()
669{
670 Must(state.writing == State::writingPaused);
671 Must(preview.enabled() && preview.done() && !preview.ieof());
672 Must(virginSendClaim.active());
673
674 if (virginSendClaim.limited()) // preview only
675 stopBackup();
676
c99de607 677 state.parsing = State::psIcapHeader; // eventually
678 icapReply->reset();
774c051c 679
680 state.writing = State::writingPrime;
681
682 writeMore();
683}
684
685void ICAPModXact::handle200Ok()
686{
687 state.parsing = State::psHttpHeader;
688 state.sending = State::sendingAdapted;
689 stopBackup();
690}
691
692void ICAPModXact::handle204NoContent()
693{
694 stopParsing();
695 Must(virginSendClaim.active());
696 virginSendClaim.protectAll(); // extends protection if needed
697 state.sending = State::sendingVirgin;
698
699 // We want to clone the HTTP message, but we do not want
700 // to copy non-HTTP state parts that HttpMsg kids carry in them.
701 // Thus, we cannot use a smart pointer, copy constructor, or equivalent.
702 // Instead, we simply write the HTTP message and "clone" it by parsing.
703
704 HttpMsg *oldHead = virgin->data->header;
705 debugs(93, 7, "ICAPModXact cloning virgin message " << oldHead);
706
707 MemBuf httpBuf;
708
709 // write the virgin message into a memory buffer
710 httpBuf.init();
711 packHead(httpBuf, oldHead);
712
c99de607 713 // allocate the adapted message and copy metainfo
7514268e 714 Must(!adapted->data->header);
715 HttpMsg *newHead = NULL;
c99de607 716 if (const HttpRequest *oldR = dynamic_cast<const HttpRequest*>(oldHead)) {
717 HttpRequest *newR = new HttpRequest;
718 newR->client_addr = oldR->client_addr;
719 newHead = newR;
720 } else
721 if (dynamic_cast<const HttpReply*>(oldHead))
722 newHead = new HttpReply;
774c051c 723 Must(newHead);
724
7514268e 725 adapted->data->setHeader(newHead);
726
774c051c 727 // parse the buffer back
728 http_status error = HTTP_STATUS_NONE;
729
730 Must(newHead->parse(&httpBuf, true, &error));
731
732 Must(newHead->hdr_sz == httpBuf.contentSize()); // no leftovers
733
734 httpBuf.clean();
735
736 debugs(93, 7, "ICAPModXact cloned virgin message " << oldHead << " to " << newHead);
737}
738
739void ICAPModXact::handleUnknownScode()
740{
741 stopParsing();
742 stopBackup();
743 // TODO: mark connection as "bad"
744
745 // Terminate the transaction; we do not know how to handle this response.
746 throw TexcHere("Unsupported ICAP status code");
747}
748
749void ICAPModXact::parseHttpHead()
750{
751 if (gotEncapsulated("res-hdr") || gotEncapsulated("req-hdr")) {
752 maybeAllocateHttpMsg();
753
754 if (!parseHead(adapted->data->header))
c99de607 755 return; // need more header data
774c051c 756 }
757
758 state.parsing = State::psBody;
759}
760
c99de607 761// parses both HTTP and ICAP headers
774c051c 762bool ICAPModXact::parseHead(HttpMsg *head)
763{
c99de607 764 Must(head);
def17b6a 765 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " head bytes to parse" <<
774c051c 766 "; state: " << state.parsing);
767
768 http_status error = HTTP_STATUS_NONE;
769 const bool parsed = head->parse(&readBuf, commEof, &error);
770 Must(parsed || !error); // success or need more data
771
c99de607 772 if (!parsed) { // need more data
b107a5a5 773 debugs(93, 5, HERE << "parse failed, need more data, return false");
774c051c 774 head->reset();
775 return false;
776 }
777
b107a5a5 778 debugs(93, 5, HERE << "parse success, consume " << head->hdr_sz << " bytes, return true");
774c051c 779 readBuf.consume(head->hdr_sz);
780 return true;
781}
782
783void ICAPModXact::parseBody()
784{
785 Must(state.parsing == State::psBody);
786
aa761e5f 787 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " body bytes to parse");
774c051c 788
200ac359 789 if (gotEncapsulated("res-body") || gotEncapsulated("req-body")) {
774c051c 790 if (!parsePresentBody()) // need more body data
791 return;
792 } else {
b559db5d 793 debugs(93, 5, HERE << "not expecting a body");
774c051c 794 }
795
796 stopParsing();
797 stopSending(true);
798}
799
800// returns true iff complete body was parsed
801bool ICAPModXact::parsePresentBody()
802{
803 if (!bodyParser)
804 bodyParser = new ChunkedCodingParser;
805
806 // the parser will throw on errors
807 const bool parsed = bodyParser->parse(&readBuf, adapted->data->body);
808
809 adapted->sendSourceProgress(); // TODO: do not send if parsed nothing
810
aa761e5f 811 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " body bytes after " <<
774c051c 812 "parse; parsed all: " << parsed);
813
814 if (parsed)
815 return true;
816
c99de607 817 debugs(93,3,HERE << this << " needsMoreData = " << bodyParser->needsMoreData());
3b299123 818
819 if (bodyParser->needsMoreData()) {
c99de607 820 debugs(93,3,HERE << this);
774c051c 821 Must(mayReadMore());
3b299123 822 readMore();
823 }
774c051c 824
825 if (bodyParser->needsMoreSpace()) {
826 Must(!doneSending()); // can hope for more space
827 Must(adapted->data->body->hasContent()); // paranoid
828 // TODO: there should be a timeout in case the sink is broken.
829 }
830
831 return false;
832}
833
834void ICAPModXact::stopParsing()
835{
836 if (state.parsing == State::psDone)
837 return;
838
839 debugs(93, 7, "ICAPModXact will no longer parse " << status());
840
841 delete bodyParser;
842
843 bodyParser = NULL;
844
845 state.parsing = State::psDone;
846}
847
848// HTTP side added virgin body data
849void ICAPModXact::noteSourceProgress(MsgPipe *p)
850{
851 ICAPXaction_Enter(noteSourceProgress);
852
853 Must(!state.doneReceiving);
854 writeMore();
855
856 if (state.sending == State::sendingVirgin)
857 echoMore();
858
859 ICAPXaction_Exit();
860}
861
862// HTTP side sent us all virgin info
863void ICAPModXact::noteSourceFinish(MsgPipe *p)
864{
865 ICAPXaction_Enter(noteSourceFinish);
866
867 Must(!state.doneReceiving);
868 stopReceiving();
869
870 // push writer and sender in case we were waiting for the last-chunk
871 writeMore();
872
873 if (state.sending == State::sendingVirgin)
874 echoMore();
875
876 ICAPXaction_Exit();
877}
878
879// HTTP side is aborting
880void ICAPModXact::noteSourceAbort(MsgPipe *p)
881{
882 ICAPXaction_Enter(noteSourceAbort);
883
884 Must(!state.doneReceiving);
885 stopReceiving();
886 mustStop("HTTP source quit");
887
888 ICAPXaction_Exit();
889}
890
891// HTTP side wants more adapted data and possibly freed some buffer space
892void ICAPModXact::noteSinkNeed(MsgPipe *p)
893{
894 ICAPXaction_Enter(noteSinkNeed);
895
896 if (state.sending == State::sendingVirgin)
897 echoMore();
3b299123 898 else if (state.sending == State::sendingAdapted)
899 parseMore();
774c051c 900 else
3b299123 901 Must(state.sending == State::sendingUndecided);
774c051c 902
903 ICAPXaction_Exit();
904}
905
906// HTTP side aborted
907void ICAPModXact::noteSinkAbort(MsgPipe *p)
908{
909 ICAPXaction_Enter(noteSinkAbort);
910
911 mustStop("HTTP sink quit");
912
913 ICAPXaction_Exit();
914}
915
916// internal cleanup
917void ICAPModXact::doStop()
918{
c99de607 919 debugs(93, 5, HERE << "doStop() called");
774c051c 920 ICAPXaction::doStop();
921
c99de607 922 stopWriting(false);
774c051c 923 stopBackup();
924
925 if (icapReply) {
926 delete icapReply;
927 icapReply = NULL;
928 }
929
930 stopSending(false);
931
932 // see stopReceiving() for reasons it cannot NULLify virgin there
933
934 if (virgin != NULL) {
935 if (!state.doneReceiving)
936 virgin->sendSinkAbort();
937 else
938 virgin->sink = NULL;
939
940 virgin = NULL; // refcounted
941 }
942
943 if (self != NULL) {
944 Pointer s = self;
945 self = NULL;
946 ICAPNoteXactionDone(s);
947 /* this object may be destroyed when 's' is cleared */
948 }
949}
950
951void ICAPModXact::makeRequestHeaders(MemBuf &buf)
952{
12b91c99 953 /*
954 * XXX These should use HttpHdr interfaces instead of Printfs
955 */
774c051c 956 const ICAPServiceRep &s = service();
957 buf.Printf("%s %s ICAP/1.0\r\n", s.methodStr(), s.uri.buf());
958 buf.Printf("Host: %s:%d\r\n", s.host.buf(), s.port);
12b91c99 959 buf.Printf("Date: %s\r\n", mkrfc1123(squid_curtime));
960
961 if (!TheICAPConfig.reuse_connections)
962 buf.Printf("Connection: close\r\n");
963
774c051c 964 buf.Printf("Encapsulated: ");
965
966 MemBuf httpBuf;
12b91c99 967
774c051c 968 httpBuf.init();
969
970 // build HTTP request header, if any
971 ICAP::Method m = s.method;
972
c99de607 973 const HttpRequest *request = virgin->data->cause ?
974 virgin->data->cause :
975 dynamic_cast<const HttpRequest*>(virgin->data->header);
976
977 // to simplify, we could we assume that request is always available
978
979 String urlPath;
980 if (request) {
981 urlPath = request->urlpath;
982 if (ICAP::methodRespmod == m)
983 encapsulateHead(buf, "req-hdr", httpBuf, request);
984 else
985 if (ICAP::methodReqmod == m)
986 encapsulateHead(buf, "req-hdr", httpBuf, virgin->data->header);
987 }
774c051c 988
989 if (ICAP::methodRespmod == m)
990 if (const MsgPipeData::Header *prime = virgin->data->header)
991 encapsulateHead(buf, "res-hdr", httpBuf, prime);
992
993 if (!virginBody.expected())
1dd6edf2 994 buf.Printf("null-body=%d", (int) httpBuf.contentSize());
774c051c 995 else if (ICAP::methodReqmod == m)
1dd6edf2 996 buf.Printf("req-body=%d", (int) httpBuf.contentSize());
774c051c 997 else
1dd6edf2 998 buf.Printf("res-body=%d", (int) httpBuf.contentSize());
774c051c 999
1000 buf.append(ICAP::crlf, 2); // terminate Encapsulated line
1001
c99de607 1002 if (shouldPreview(urlPath)) {
774c051c 1003 buf.Printf("Preview: %d\r\n", (int)preview.ad());
1004 virginSendClaim.protectUpTo(preview.ad());
1005 }
1006
1007 if (shouldAllow204()) {
1008 buf.Printf("Allow: 204\r\n");
1009 // be robust: do not rely on the expected body size
1010 virginSendClaim.protectAll();
1011 }
1012
c99de607 1013 if (TheICAPConfig.send_client_ip && request)
1014 if (request->client_addr.s_addr != any_addr.s_addr &&
1015 request->client_addr.s_addr != no_addr.s_addr)
12b91c99 1016 buf.Printf("X-Client-IP: %s\r\n", inet_ntoa(request->client_addr));
a97e82a8 1017
c99de607 1018 if (TheICAPConfig.send_client_username && request)
12b91c99 1019 if (request->auth_user_request)
1020 if (request->auth_user_request->username())
1021 buf.Printf("X-Client-Username: %s\r\n", request->auth_user_request->username());
a97e82a8 1022
2dfede9e 1023 // fprintf(stderr, "%s\n", buf.content());
a97e82a8 1024
774c051c 1025 buf.append(ICAP::crlf, 2); // terminate ICAP header
1026
1027 // start ICAP request body with encapsulated HTTP headers
1028 buf.append(httpBuf.content(), httpBuf.contentSize());
1029
1030 httpBuf.clean();
1031}
1032
1033void ICAPModXact::encapsulateHead(MemBuf &icapBuf, const char *section, MemBuf &httpBuf, const HttpMsg *head)
1034{
1035 // update ICAP header
7cab7e9f 1036 icapBuf.Printf("%s=%d, ", section, (int) httpBuf.contentSize());
774c051c 1037
1038 // pack HTTP head
1039 packHead(httpBuf, head);
1040}
1041
1042void ICAPModXact::packHead(MemBuf &httpBuf, const HttpMsg *head)
1043{
1044 Packer p;
1045 packerToMemInit(&p, &httpBuf);
1046 head->packInto(&p, true);
1047 packerClean(&p);
1048}
1049
1050// decides whether to offer a preview and calculates its size
c99de607 1051bool ICAPModXact::shouldPreview(const String &urlPath)
774c051c 1052{
1053 size_t wantedSize;
1054
7cdbbd47 1055 if (!TheICAPConfig.preview_enable) {
1056 debugs(93, 5, HERE << "preview disabled by squid.conf");
1057 return false;
1058 }
1059
c99de607 1060 if (!service().wantsPreview(urlPath, wantedSize)) {
1061 debugs(93, 5, "ICAPModXact should not offer preview for " << urlPath);
774c051c 1062 return false;
1063 }
1064
1065 Must(wantedSize >= 0);
1066
1067 // cannot preview more than we can backup
1068 size_t ad = XMIN(wantedSize, TheBackupLimit);
1069
1070 if (virginBody.expected() && virginBody.knownSize())
1071 ad = XMIN(ad, virginBody.size()); // not more than we have
1072 else
1073 ad = 0; // questionable optimization?
1074
1075 debugs(93, 5, "ICAPModXact should offer " << ad << "-byte preview " <<
1076 "(service wanted " << wantedSize << ")");
1077
1078 preview.enable(ad);
1079
1080 return preview.enabled();
1081}
1082
1083// decides whether to allow 204 responses
1084bool ICAPModXact::shouldAllow204()
1085{
1086 if (!service().allows204())
1087 return false;
1088
1089 if (!virginBody.expected())
1090 return true; // no body means no problems with supporting 204s.
1091
1092 // if there is a body, make sure we can backup it all
1093
1094 if (!virginBody.knownSize())
1095 return false;
1096
1097 // or should we have a different backup limit?
1098 // note that '<' allows for 0-termination of the "full" backup buffer
1099 return virginBody.size() < TheBackupLimit;
1100}
1101
774c051c 1102void ICAPModXact::fillPendingStatus(MemBuf &buf) const
1103{
c99de607 1104 ICAPXaction::fillPendingStatus(buf);
1105
774c051c 1106 if (state.serviceWaiting)
1107 buf.append("U", 1);
1108
c99de607 1109 if (!state.doneReceiving)
1110 buf.append("R", 1);
1111
1112 if (!doneReading())
1113 buf.append("r", 1);
1114
774c051c 1115 if (!state.doneWriting() && state.writing != State::writingInit)
1116 buf.Printf("w(%d)", state.writing);
1117
1118 if (preview.enabled()) {
1119 if (!preview.done())
1dd6edf2 1120 buf.Printf("P(%d)", (int) preview.debt());
774c051c 1121 }
1122
1123 if (virginSendClaim.active())
1124 buf.append("B", 1);
1125
1126 if (!state.doneParsing() && state.parsing != State::psIcapHeader)
1127 buf.Printf("p(%d)", state.parsing);
1128
1129 if (!doneSending() && state.sending != State::sendingUndecided)
1130 buf.Printf("S(%d)", state.sending);
1131}
1132
1133void ICAPModXact::fillDoneStatus(MemBuf &buf) const
1134{
c99de607 1135 ICAPXaction::fillDoneStatus(buf);
1136
774c051c 1137 if (state.doneReceiving)
1138 buf.append("R", 1);
1139
1140 if (state.doneWriting())
1141 buf.append("w", 1);
1142
1143 if (preview.enabled()) {
1144 if (preview.done())
1145 buf.Printf("P%s", preview.ieof() ? "(ieof)" : "");
1146 }
1147
1148 if (doneReading())
1149 buf.append("r", 1);
1150
1151 if (state.doneParsing())
1152 buf.append("p", 1);
1153
1154 if (doneSending())
1155 buf.append("S", 1);
1156}
1157
1158bool ICAPModXact::gotEncapsulated(const char *section) const
1159{
a9925b40 1160 return icapReply->header.getByNameListMember("Encapsulated",
1161 section, ',').size() > 0;
774c051c 1162}
1163
1164// calculate whether there is a virgin HTTP body and
1165// whether its expected size is known
1166void ICAPModXact::estimateVirginBody()
1167{
1168 // note: defaults should be fine but will disable previews and 204s
1169
1170 Must(virgin != NULL && virgin->data->header);
1171
1172 method_t method;
1173
1174 if (virgin->data->cause)
1175 method = virgin->data->cause->method;
1176 else
a97e82a8 1177 if (HttpRequest *req = dynamic_cast<HttpRequest*>(virgin->data->
1178 header))
774c051c 1179 method = req->method;
1180 else
1181 return;
1182
1183 ssize_t size;
1184 if (virgin->data->header->expectingBody(method, size)) {
1185 virginBody.expect(size)
1186 ;
1187 debugs(93, 6, "ICAPModXact expects virgin body; size: " << size);
1188 } else {
1189 debugs(93, 6, "ICAPModXact does not expect virgin body");
1190 }
1191}
1192
1193
1194// TODO: Move SizedEstimate, MemBufBackup, and ICAPPreview elsewhere
1195
1196SizedEstimate::SizedEstimate()
1197 : theData(dtUnexpected)
1198{}
1199
1200void SizedEstimate::expect(ssize_t aSize)
1201{
1202 theData = (aSize >= 0) ? aSize : (ssize_t)dtUnknown;
1203}
1204
1205bool SizedEstimate::expected() const
1206{
1207 return theData != dtUnexpected;
1208}
1209
1210bool SizedEstimate::knownSize() const
1211{
1212 Must(expected());
1213 return theData != dtUnknown;
1214}
1215
1216size_t SizedEstimate::size() const
1217{
1218 Must(knownSize());
1219 return static_cast<size_t>(theData);
1220}
1221
1222
1223
1224MemBufClaim::MemBufClaim(): theStart(-1), theGoal(-1)
1225{}
1226
1227void MemBufClaim::protectAll()
1228{
1229 if (theStart < 0)
1230 theStart = 0;
1231
1232 theGoal = -1; // no specific goal
1233}
1234
1235void MemBufClaim::protectUpTo(size_t aGoal)
1236{
1237 if (theStart < 0)
1238 theStart = 0;
1239
1240 Must(aGoal >= 0);
1241
1242 theGoal = (theGoal < 0) ? static_cast<ssize_t>(aGoal) :
1243 XMIN(static_cast<ssize_t>(aGoal), theGoal);
1244}
1245
1246void MemBufClaim::disable()
1247{
1248 theStart = -1;
1249}
1250
1251void MemBufClaim::release(size_t size)
1252{
1253 Must(active());
1254 Must(size >= 0);
1255 theStart += static_cast<ssize_t>(size);
1256
1257 if (limited() && theStart >= theGoal)
1258 disable();
1259}
1260
1261size_t MemBufClaim::offset() const
1262{
1263 Must(active());
1264 return static_cast<size_t>(theStart);
1265}
1266
1267bool MemBufClaim::limited() const
1268{
1269 Must(active());
1270 return theGoal >= 0;
1271}
1272
1273
1274ICAPPreview::ICAPPreview(): theWritten(0), theAd(0), theState(stDisabled)
1275{}
1276
1277void ICAPPreview::enable(size_t anAd)
1278{
1279 // TODO: check for anAd not exceeding preview size limit
1280 Must(anAd >= 0);
1281 Must(!enabled());
1282 theAd = anAd;
1283 theState = stWriting;
1284}
1285
1286bool ICAPPreview::enabled() const
1287{
1288 return theState != stDisabled;
1289}
1290
1291size_t ICAPPreview::ad() const
1292{
1293 Must(enabled());
1294 return theAd;
1295}
1296
1297bool ICAPPreview::done() const
1298{
1299 Must(enabled());
1300 return theState >= stIeof;
1301}
1302
1303bool ICAPPreview::ieof() const
1304{
1305 Must(enabled());
1306 return theState == stIeof;
1307}
1308
1309size_t ICAPPreview::debt() const
1310{
1311 Must(enabled());
1312 return done() ? 0 : (theAd - theWritten);
1313}
1314
c99de607 1315void ICAPPreview::wrote(size_t size, bool wroteEof)
774c051c 1316{
1317 Must(enabled());
1318 theWritten += size;
1319
1320 if (theWritten >= theAd)
c99de607 1321 theState = stDone; // wroteEof is irrelevant
774c051c 1322 else
c99de607 1323 if (wroteEof)
774c051c 1324 theState = stIeof;
1325}
1326
3cfc19b3 1327bool ICAPModXact::fillVirginHttpHeader(MemBuf &mb) const
1328{
1329 if (virgin == NULL)
1330 return false;
1331
1332 if (virgin->data == NULL)
1333 return false;
1334
1335 if (virgin->data->header == NULL)
1336 return false;
1337
1338 virgin->data->header->firstLineBuf(mb);
1339
1340 return true;
1341}