]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ICAP/ICAPModXact.cc
This patch adds sorting to cachemgr:mem output to make it easier to find
[thirdparty/squid.git] / src / ICAP / ICAPModXact.cc
CommitLineData
774c051c 1/*
2 * DEBUG: section 93 ICAP (RFC 3507) Client
3 */
4
5#include "squid.h"
6#include "comm.h"
7#include "MsgPipe.h"
8#include "MsgPipeData.h"
9#include "HttpRequest.h"
10#include "HttpReply.h"
11#include "ICAPServiceRep.h"
12#include "ICAPModXact.h"
13#include "ICAPClient.h"
14#include "ChunkedCodingParser.h"
15#include "TextException.h"
a97e82a8 16#include "AuthUserRequest.h"
12b91c99 17#include "ICAPConfig.h"
774c051c 18
19// flow and terminology:
20// HTTP| --> receive --> encode --> write --> |network
21// end | <-- send <-- parse <-- read <-- |end
22
23// TODO: doneSending()/doneReceving() data members should probably be in sync
24// with this->adapted/virgin pointers. Make adapted/virgin methods?
25
26// TODO: replace gotEncapsulated() with something faster; we call it often
27
28CBDATA_CLASS_INIT(ICAPModXact);
29
30static const size_t TheBackupLimit = ICAP::MsgPipeBufSizeMax;
31
12b91c99 32extern ICAPConfig TheICAPConfig;
33
774c051c 34
35ICAPModXact::State::State()
36{
37 memset(this, sizeof(*this), 0);
38}
39
40ICAPModXact::ICAPModXact(): ICAPXaction("ICAPModXact"),
41 self(NULL), virgin(NULL), adapted(NULL),
42 icapReply(NULL), virginConsumed(0),
43 bodyParser(NULL)
44{}
45
46void ICAPModXact::init(ICAPServiceRep::Pointer &aService, MsgPipe::Pointer &aVirgin, MsgPipe::Pointer &anAdapted, Pointer &aSelf)
47{
48 assert(!self.getRaw() && !virgin.getRaw() && !adapted.getRaw());
49 assert(aSelf.getRaw() && aVirgin.getRaw() && anAdapted.getRaw());
50
51 self = aSelf;
52 service(aService);
53
54 virgin = aVirgin;
55 adapted = anAdapted;
56
57 // receiving end
58 virgin->sink = this; // should be 'self' and refcounted
59 // virgin pipe data is initiated by the source
60
61 // sending end
62 adapted->source = this; // should be 'self' and refcounted
63 adapted->data = new MsgPipeData;
64
65 adapted->data->body = new MemBuf; // XXX: make body a non-pointer?
66 adapted->data->body->init(ICAP::MsgPipeBufSizeMin, ICAP::MsgPipeBufSizeMax);
67 // headers are initialized when we parse them
68
69 // writing and reading ends are handled by ICAPXaction
70
71 // encoding
72 // nothing to do because we are using temporary buffers
73
74 // parsing
75 icapReply = new HttpReply;
76 icapReply->protoPrefix = "ICAP/"; // TODO: make an IcapReply class?
77
78 // XXX: make sure stop() cleans all buffers
79}
80
81// HTTP side starts sending virgin data
82void ICAPModXact::noteSourceStart(MsgPipe *p)
83{
84 ICAPXaction_Enter(noteSourceStart);
85
86 // make sure TheBackupLimit is in-sync with the buffer size
87 Must(TheBackupLimit <= static_cast<size_t>(virgin->data->body->max_capacity));
88
89 estimateVirginBody(); // before virgin disappears!
90
91 // it is an ICAP violation to send request to a service w/o known OPTIONS
92
93 if (service().up())
94 startWriting();
95 else
96 waitForService();
97
98 // XXX: but this has to be here to catch other errors. Thus, if
99 // commConnectStart in startWriting fails, we may get here
100 //_after_ the object got destroyed. Somebody please fix commConnectStart!
101 ICAPXaction_Exit();
102}
103
104static
105void ICAPModXact_noteServiceReady(void *data, ICAPServiceRep::Pointer &)
106{
107 ICAPModXact *x = static_cast<ICAPModXact*>(data);
108 assert(x);
109 x->noteServiceReady();
110}
111
112void ICAPModXact::waitForService()
113{
114 Must(!state.serviceWaiting);
115 debugs(93, 7, "ICAPModXact will wait for the ICAP service " << status());
116 state.serviceWaiting = true;
117 service().callWhenReady(&ICAPModXact_noteServiceReady, this);
118}
119
120void ICAPModXact::noteServiceReady()
121{
122 ICAPXaction_Enter(noteServiceReady);
123
124 Must(state.serviceWaiting);
125 state.serviceWaiting = false;
126 startWriting(); // will throw if service is not up
127
128 ICAPXaction_Exit();
129}
130
131void ICAPModXact::startWriting()
132{
133 Must(service().up());
134
135 state.writing = State::writingConnect;
136 openConnection();
137 // put nothing here as openConnection calls commConnectStart
138 // and that may call us back without waiting for the next select loop
139}
140
141// connection with the ICAP service established
142void ICAPModXact::handleCommConnected()
143{
144 Must(state.writing == State::writingConnect);
145
146 startReading(); // wait for early errors from the ICAP server
147
148 MemBuf requestBuf;
149 requestBuf.init();
150
151 makeRequestHeaders(requestBuf);
152 debugs(93, 9, "ICAPModXact ICAP request prefix " << status() << ":\n" <<
153 (requestBuf.terminate(), requestBuf.content()));
154
155 // write headers
156 state.writing = State::writingHeaders;
157 scheduleWrite(requestBuf);
158}
159
160void ICAPModXact::handleCommWrote(size_t)
161{
162 if (state.writing == State::writingHeaders)
163 handleCommWroteHeaders();
164 else
165 handleCommWroteBody();
166}
167
168void ICAPModXact::handleCommWroteHeaders()
169{
170 Must(state.writing == State::writingHeaders);
171
172 if (virginBody.expected()) {
173 state.writing = preview.enabled() ?
174 State::writingPreview : State::writingPrime;
175 virginWriteClaim.protectAll();
176 writeMore();
177 } else {
178 stopWriting();
179 }
180}
181
182void ICAPModXact::writeMore()
183{
184 if (writer) // already writing something
185 return;
186
187 switch (state.writing) {
188
189 case State::writingInit: // waiting for service OPTIONS
190 Must(state.serviceWaiting);
191
192 case State::writingConnect: // waiting for the connection to establish
193
194 case State::writingHeaders: // waiting for the headers to be written
195
196 case State::writingPaused: // waiting for the ICAP server response
197
198 case State::writingDone: // nothing more to write
199 return;
200
201 case State::writingPreview:
202 writePriviewBody();
203 return;
204
205 case State::writingPrime:
206 writePrimeBody();
207 return;
208
209 default:
210 throw TexcHere("ICAPModXact in bad writing state");
211 }
212}
213
214void ICAPModXact::writePriviewBody()
215{
216 debugs(93, 8, "ICAPModXact will write Preview body " << status());
217 Must(state.writing == State::writingPreview);
218
219 MsgPipeData::Body *body = virgin->data->body;
220 const size_t size = XMIN(preview.debt(), (size_t)body->contentSize());
221 writeSomeBody("preview body", size);
222
223 // change state once preview is written
224
225 if (preview.done()) {
226 debugs(93, 7, "ICAPModXact wrote entire Preview body " << status());
227
228 if (preview.ieof())
229 stopWriting();
230 else
231 state.writing = State::writingPaused;
232 }
233}
234
235void ICAPModXact::writePrimeBody()
236{
237 Must(state.writing == State::writingPrime);
238 Must(virginWriteClaim.active());
239
240 MsgPipeData::Body *body = virgin->data->body;
241 const size_t size = body->contentSize();
242 writeSomeBody("prime virgin body", size);
243
244 if (state.doneReceiving)
245 stopWriting();
246}
247
248void ICAPModXact::writeSomeBody(const char *label, size_t size)
249{
250 Must(!writer && !state.doneWriting());
12f4b710 251 debugs(93, 8, HERE << "will write up to " << size << " bytes of " <<
774c051c 252 label);
253
254 MemBuf writeBuf; // TODO: suggest a min size based on size and lastChunk
255
256 writeBuf.init(); // note: we assume that last-chunk will fit
257
258 const size_t writeableSize = claimSize(virginWriteClaim);
259 const size_t chunkSize = XMIN(writeableSize, size);
260
261 if (chunkSize) {
12f4b710 262 debugs(93, 7, HERE << "will write " << chunkSize <<
774c051c 263 "-byte chunk of " << label);
264 } else {
265 debugs(93, 7, "ICAPModXact has no writeable " << label << " content");
266 }
267
268 moveRequestChunk(writeBuf, chunkSize);
269
270 const bool lastChunk =
271 (state.writing == State::writingPreview && preview.done()) ||
272 (state.doneReceiving && claimSize(virginWriteClaim) <= 0);
273
274 if (lastChunk && virginBody.expected()) {
12f4b710 275 debugs(93, 8, HERE << "will write last-chunk of " << label);
774c051c 276 addLastRequestChunk(writeBuf);
277 }
278
12f4b710 279 debugs(93, 7, HERE << "will write " << writeBuf.contentSize()
774c051c 280 << " raw bytes of " << label);
281
282 if (writeBuf.hasContent()) {
283 scheduleWrite(writeBuf); // comm will free the chunk
284 } else {
285 writeBuf.clean();
286 }
287}
288
289void ICAPModXact::moveRequestChunk(MemBuf &buf, size_t chunkSize)
290{
291 if (chunkSize > 0) {
292 openChunk(buf, chunkSize);
293 buf.append(claimContent(virginWriteClaim), chunkSize);
294 closeChunk(buf, false);
295
296 virginWriteClaim.release(chunkSize);
297 virginConsume();
298 }
299
300 if (state.writing == State::writingPreview)
301 preview.wrote(chunkSize, state.doneReceiving); // even if wrote nothing
302}
303
304void ICAPModXact::addLastRequestChunk(MemBuf &buf)
305{
306 openChunk(buf, 0);
307 closeChunk(buf, state.writing == State::writingPreview && preview.ieof());
308}
309
310void ICAPModXact::openChunk(MemBuf &buf, size_t chunkSize)
311{
1dd6edf2 312 buf.Printf("%x\r\n", (int) chunkSize);
774c051c 313}
314
315void ICAPModXact::closeChunk(MemBuf &buf, bool ieof)
316{
317 if (ieof)
318 buf.append("; ieof", 6);
319
320 buf.append(ICAP::crlf, 2); // chunk-terminating CRLF
321}
322
323size_t ICAPModXact::claimSize(const MemBufClaim &claim) const
324{
325 Must(claim.active());
326 const size_t start = claim.offset();
327 const size_t end = virginConsumed + virgin->data->body->contentSize();
328 Must(virginConsumed <= start && start <= end);
329 return end - start;
330}
331
332const char *ICAPModXact::claimContent(const MemBufClaim &claim) const
333{
334 Must(claim.active());
335 const size_t start = claim.offset();
336 Must(virginConsumed <= start);
337 return virgin->data->body->content() + (start - virginConsumed);
338}
339
340void ICAPModXact::virginConsume()
341{
342 MemBuf &buf = *virgin->data->body;
343 const size_t have = static_cast<size_t>(buf.contentSize());
344 const size_t end = virginConsumed + have;
345 size_t offset = end;
346
347 if (virginWriteClaim.active())
348 offset = XMIN(virginWriteClaim.offset(), offset);
349
350 if (virginSendClaim.active())
351 offset = XMIN(virginSendClaim.offset(), offset);
352
353 Must(virginConsumed <= offset && offset <= end);
354
355 if (const size_t size = offset - virginConsumed) {
356 debugs(93, 8, "ICAPModXact consumes " << size << " out of " << have <<
357 " virgin body bytes");
358 buf.consume(size);
359 virginConsumed += size;
360
361 if (!state.doneReceiving)
362 virgin->sendSinkNeed();
363 }
364}
365
366void ICAPModXact::handleCommWroteBody()
367{
368 writeMore();
369}
370
371void ICAPModXact::stopWriting()
372{
373 if (state.writing == State::writingDone)
374 return;
375
376 debugs(93, 7, "ICAPModXact will no longer write " << status());
377
378 state.writing = State::writingDone;
379
380 virginWriteClaim.disable();
381
382 virginConsume();
383
384 // Comm does not have an interface to clear the writer, but
385 // writeMore() will not write if our write callback is called
386 // when state.writing == State::writingDone;
387}
388
389void ICAPModXact::stopBackup()
390{
391 if (!virginSendClaim.active())
392 return;
393
394 debugs(93, 7, "ICAPModXact will no longer backup " << status());
395
396 virginSendClaim.disable();
397
398 virginConsume();
399}
400
401bool ICAPModXact::doneAll() const
402{
403 return ICAPXaction::doneAll() && !state.serviceWaiting &&
404 state.doneReceiving && doneSending() &&
405 doneReading() && state.doneWriting();
406}
407
408void ICAPModXact::startReading()
409{
410 Must(connection >= 0);
411 Must(!reader);
412 Must(adapted.getRaw());
413 Must(adapted->data);
414 Must(adapted->data->body);
415
416 // we use the same buffer for headers and body and then consume headers
417 readMore();
418}
419
420void ICAPModXact::readMore()
421{
422 if (reader || doneReading())
423 return;
424
425 // do not fill readBuf if we have no space to store the result
426 if (!adapted->data->body->hasPotentialSpace())
427 return;
428
429 if (readBuf.hasSpace())
430 scheduleRead();
431}
432
433// comm module read a portion of the ICAP response for us
434void ICAPModXact::handleCommRead(size_t)
435{
436 Must(!state.doneParsing());
437 parseMore();
438 readMore();
439}
440
441void ICAPModXact::echoMore()
442{
443 Must(state.sending == State::sendingVirgin);
444 Must(virginSendClaim.active());
445
446 MemBuf &from = *virgin->data->body;
447 MemBuf &to = *adapted->data->body;
448
449 const size_t sizeMax = claimSize(virginSendClaim);
450 const size_t size = XMIN(static_cast<size_t>(to.potentialSpaceSize()),
451 sizeMax);
452 debugs(93, 5, "ICAPModXact echos " << size << " out of " << sizeMax <<
453 " bytes");
454
455 if (size > 0) {
456 to.append(claimContent(virginSendClaim), size);
457 virginSendClaim.release(size);
458 virginConsume();
459 adapted->sendSourceProgress();
460 }
461
462 if (!from.hasContent() && state.doneReceiving) {
463 debugs(93, 5, "ICAPModXact echoed all " << status());
464 stopSending(true);
465 } else {
466 debugs(93, 5, "ICAPModXact has " << from.contentSize() << " bytes " <<
467 "and expects more to echo " << status());
468 virgin->sendSinkNeed(); // TODO: timeout if sink is broken
469 }
470}
471
472bool ICAPModXact::doneSending() const
473{
474 Must((state.sending == State::sendingDone) == (!adapted));
475 return state.sending == State::sendingDone;
476}
477
478void ICAPModXact::stopSending(bool nicely)
479{
480 if (doneSending())
481 return;
482
483 if (state.sending != State::sendingUndecided) {
484 debugs(93, 7, "ICAPModXact will no longer send " << status());
485
486 if (nicely)
487 adapted->sendSourceFinish();
488 else
489 adapted->sendSourceAbort();
490 } else {
491 debugs(93, 7, "ICAPModXact will not start sending " << status());
492 adapted->sendSourceAbort(); // or the sink may wait forever
493 }
494
495 state.sending = State::sendingDone;
496
497 /*
9d9f2bc0 498 * adapted->data->header should be a link_count'ed HttpRequest
499 */
500
501 if (adapted->data->header) {
502 requestUnlink(dynamic_cast<HttpRequest*>(adapted->data->header));
503 adapted->data->header = NULL;
504 }
505
774c051c 506 adapted = NULL; // refcounted
507}
508
509void ICAPModXact::stopReceiving()
510{
511 // stopSending NULLifies adapted but we do not NULLify virgin.
512 // This is assymetric because we want to keep virgin->data even
513 // though we are not expecting any more virgin->data->body.
514 // TODO: can we cache just the needed headers info instead?
515
516 // If they closed first, there is not point (or means) to notify them.
517
518 if (state.doneReceiving)
519 return;
520
521 // There is no sendSinkFinished() to notify the other side.
522 debugs(93, 7, "ICAPModXact will not receive " << status());
523
524 state.doneReceiving = true;
525}
526
527void ICAPModXact::parseMore()
528{
aa761e5f 529 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " bytes to parse" <<
774c051c 530 status());
d5cfacfb 531 debugs(93, 5, HERE << "\n" << readBuf.content());
774c051c 532
533 if (state.parsingHeaders())
534 parseHeaders();
535
536 if (state.parsing == State::psBody)
537 parseBody();
538}
539
540// note that allocation for echoing is done in handle204NoContent()
541void ICAPModXact::maybeAllocateHttpMsg()
542{
543 if (adapted->data->header) // already allocated
544 return;
545
546 if (gotEncapsulated("res-hdr")) {
547 adapted->data->header = new HttpReply;
548 } else if (gotEncapsulated("req-hdr")) {
9d9f2bc0 549 adapted->data->header = requestLink(new HttpRequest);
774c051c 550 } else
551 throw TexcHere("Neither res-hdr nor req-hdr in maybeAllocateHttpMsg()");
552}
553
554void ICAPModXact::parseHeaders()
555{
556 Must(state.parsingHeaders());
557
558 if (state.parsing == State::psIcapHeader)
559 parseIcapHead();
560
561 if (state.parsing == State::psHttpHeader)
562 parseHttpHead();
563
564 if (state.parsingHeaders()) { // need more data
565 Must(mayReadMore());
566 return;
567 }
568
569 adapted->sendSourceStart();
570
571 if (state.sending == State::sendingVirgin)
572 echoMore();
573}
574
575void ICAPModXact::parseIcapHead()
576{
577 Must(state.sending == State::sendingUndecided);
578
579 if (!parseHead(icapReply))
580 return;
581
fc764d26 582 if (httpHeaderHasConnDir(&icapReply->header, "close")) {
583 debugs(93, 5, HERE << "found connection close");
584 reuseConnection = false;
585 }
586
774c051c 587 switch (icapReply->sline.status) {
588
589 case 100:
590 handle100Continue();
591 break;
592
593 case 200:
b559db5d 594
595 if (!validate200Ok()) {
596 throw TexcHere("Invalid ICAP Response");
597 } else {
598 handle200Ok();
599 }
600
774c051c 601 break;
602
603 case 204:
604 handle204NoContent();
605 break;
606
607 default:
b559db5d 608 debugs(93, 5, HERE << "ICAP status " << icapReply->sline.status);
774c051c 609 handleUnknownScode();
610 break;
611 }
612
613 // handle100Continue() manages state.writing on its own.
614 // Non-100 status means the server needs no postPreview data from us.
615 if (state.writing == State::writingPaused)
616 stopWriting();
617
618 // TODO: Consider applying a Squid 2.5 patch to recognize 201 responses
619}
620
b559db5d 621bool ICAPModXact::validate200Ok()
622{
623 if (ICAP::methodRespmod == service().method) {
624 if (!gotEncapsulated("res-hdr"))
625 return false;
626
627 return true;
628 }
629
630 if (ICAP::methodReqmod == service().method) {
631 if (!gotEncapsulated("res-hdr") && !gotEncapsulated("req-hdr"))
632 return false;
633
634 return true;
635 }
636
637 return false;
638}
639
774c051c 640void ICAPModXact::handle100Continue()
641{
642 Must(state.writing == State::writingPaused);
643 Must(preview.enabled() && preview.done() && !preview.ieof());
644 Must(virginSendClaim.active());
645
646 if (virginSendClaim.limited()) // preview only
647 stopBackup();
648
649 state.parsing = State::psHttpHeader; // eventually
650
651 state.writing = State::writingPrime;
652
653 writeMore();
654}
655
656void ICAPModXact::handle200Ok()
657{
658 state.parsing = State::psHttpHeader;
659 state.sending = State::sendingAdapted;
660 stopBackup();
661}
662
663void ICAPModXact::handle204NoContent()
664{
665 stopParsing();
666 Must(virginSendClaim.active());
667 virginSendClaim.protectAll(); // extends protection if needed
668 state.sending = State::sendingVirgin;
669
670 // We want to clone the HTTP message, but we do not want
671 // to copy non-HTTP state parts that HttpMsg kids carry in them.
672 // Thus, we cannot use a smart pointer, copy constructor, or equivalent.
673 // Instead, we simply write the HTTP message and "clone" it by parsing.
674
675 HttpMsg *oldHead = virgin->data->header;
676 debugs(93, 7, "ICAPModXact cloning virgin message " << oldHead);
677
678 MemBuf httpBuf;
679
680 // write the virgin message into a memory buffer
681 httpBuf.init();
682 packHead(httpBuf, oldHead);
683
684 // allocate the adapted message
685 HttpMsg *&newHead = adapted->data->header;
686 Must(!newHead);
687
688 if (dynamic_cast<const HttpRequest*>(oldHead))
689 newHead = new HttpRequest;
690 else
691 if (dynamic_cast<const HttpReply*>(oldHead))
692 newHead = new HttpReply;
693
694 Must(newHead);
695
696 // parse the buffer back
697 http_status error = HTTP_STATUS_NONE;
698
699 Must(newHead->parse(&httpBuf, true, &error));
700
701 Must(newHead->hdr_sz == httpBuf.contentSize()); // no leftovers
702
703 httpBuf.clean();
704
705 debugs(93, 7, "ICAPModXact cloned virgin message " << oldHead << " to " << newHead);
706}
707
708void ICAPModXact::handleUnknownScode()
709{
710 stopParsing();
711 stopBackup();
712 // TODO: mark connection as "bad"
713
714 // Terminate the transaction; we do not know how to handle this response.
715 throw TexcHere("Unsupported ICAP status code");
716}
717
718void ICAPModXact::parseHttpHead()
719{
720 if (gotEncapsulated("res-hdr") || gotEncapsulated("req-hdr")) {
721 maybeAllocateHttpMsg();
722
723 if (!parseHead(adapted->data->header))
200ac359 724 return; // need more header data
774c051c 725 }
726
727 state.parsing = State::psBody;
728}
729
fc764d26 730/*
731 * Common routine used to parse both HTTP and ICAP headers
732 */
774c051c 733bool ICAPModXact::parseHead(HttpMsg *head)
734{
735 assert(head);
def17b6a 736 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " head bytes to parse" <<
774c051c 737 "; state: " << state.parsing);
738
739 http_status error = HTTP_STATUS_NONE;
740 const bool parsed = head->parse(&readBuf, commEof, &error);
741 Must(parsed || !error); // success or need more data
742
743 if (!parsed) { // need more data
d5cfacfb 744 debugs(93, 5, HERE << "parse failed, need more data");
774c051c 745 head->reset();
746 return false;
747 }
748
749 readBuf.consume(head->hdr_sz);
750 return true;
751}
752
753void ICAPModXact::parseBody()
754{
755 Must(state.parsing == State::psBody);
756
aa761e5f 757 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " body bytes to parse");
774c051c 758
200ac359 759 if (gotEncapsulated("res-body") || gotEncapsulated("req-body")) {
774c051c 760 if (!parsePresentBody()) // need more body data
761 return;
762 } else {
b559db5d 763 debugs(93, 5, HERE << "not expecting a body");
774c051c 764 }
765
766 stopParsing();
767 stopSending(true);
768}
769
770// returns true iff complete body was parsed
771bool ICAPModXact::parsePresentBody()
772{
773 if (!bodyParser)
774 bodyParser = new ChunkedCodingParser;
775
776 // the parser will throw on errors
777 const bool parsed = bodyParser->parse(&readBuf, adapted->data->body);
778
779 adapted->sendSourceProgress(); // TODO: do not send if parsed nothing
780
aa761e5f 781 debugs(93, 5, HERE << "have " << readBuf.contentSize() << " body bytes after " <<
774c051c 782 "parse; parsed all: " << parsed);
783
784 if (parsed)
785 return true;
786
787 if (bodyParser->needsMoreData())
788 Must(mayReadMore());
789
790 if (bodyParser->needsMoreSpace()) {
791 Must(!doneSending()); // can hope for more space
792 Must(adapted->data->body->hasContent()); // paranoid
793 // TODO: there should be a timeout in case the sink is broken.
794 }
795
796 return false;
797}
798
799void ICAPModXact::stopParsing()
800{
801 if (state.parsing == State::psDone)
802 return;
803
804 debugs(93, 7, "ICAPModXact will no longer parse " << status());
805
806 delete bodyParser;
807
808 bodyParser = NULL;
809
810 state.parsing = State::psDone;
811}
812
813// HTTP side added virgin body data
814void ICAPModXact::noteSourceProgress(MsgPipe *p)
815{
816 ICAPXaction_Enter(noteSourceProgress);
817
818 Must(!state.doneReceiving);
819 writeMore();
820
821 if (state.sending == State::sendingVirgin)
822 echoMore();
823
824 ICAPXaction_Exit();
825}
826
827// HTTP side sent us all virgin info
828void ICAPModXact::noteSourceFinish(MsgPipe *p)
829{
830 ICAPXaction_Enter(noteSourceFinish);
831
832 Must(!state.doneReceiving);
833 stopReceiving();
834
835 // push writer and sender in case we were waiting for the last-chunk
836 writeMore();
837
838 if (state.sending == State::sendingVirgin)
839 echoMore();
840
841 ICAPXaction_Exit();
842}
843
844// HTTP side is aborting
845void ICAPModXact::noteSourceAbort(MsgPipe *p)
846{
847 ICAPXaction_Enter(noteSourceAbort);
848
849 Must(!state.doneReceiving);
850 stopReceiving();
851 mustStop("HTTP source quit");
852
853 ICAPXaction_Exit();
854}
855
856// HTTP side wants more adapted data and possibly freed some buffer space
857void ICAPModXact::noteSinkNeed(MsgPipe *p)
858{
859 ICAPXaction_Enter(noteSinkNeed);
860
861 if (state.sending == State::sendingVirgin)
862 echoMore();
863 else
864 if (state.sending == State::sendingAdapted)
865 parseMore();
866 else
867 Must(state.sending == State::sendingUndecided);
868
869 ICAPXaction_Exit();
870}
871
872// HTTP side aborted
873void ICAPModXact::noteSinkAbort(MsgPipe *p)
874{
875 ICAPXaction_Enter(noteSinkAbort);
876
877 mustStop("HTTP sink quit");
878
879 ICAPXaction_Exit();
880}
881
882// internal cleanup
883void ICAPModXact::doStop()
884{
885 ICAPXaction::doStop();
886
887 stopWriting();
888 stopBackup();
889
890 if (icapReply) {
891 delete icapReply;
892 icapReply = NULL;
893 }
894
895 stopSending(false);
896
897 // see stopReceiving() for reasons it cannot NULLify virgin there
898
899 if (virgin != NULL) {
900 if (!state.doneReceiving)
901 virgin->sendSinkAbort();
902 else
903 virgin->sink = NULL;
904
905 virgin = NULL; // refcounted
906 }
907
908 if (self != NULL) {
909 Pointer s = self;
910 self = NULL;
911 ICAPNoteXactionDone(s);
912 /* this object may be destroyed when 's' is cleared */
913 }
914}
915
916void ICAPModXact::makeRequestHeaders(MemBuf &buf)
917{
12b91c99 918 /*
919 * XXX These should use HttpHdr interfaces instead of Printfs
920 */
774c051c 921 const ICAPServiceRep &s = service();
922 buf.Printf("%s %s ICAP/1.0\r\n", s.methodStr(), s.uri.buf());
923 buf.Printf("Host: %s:%d\r\n", s.host.buf(), s.port);
12b91c99 924 buf.Printf("Date: %s\r\n", mkrfc1123(squid_curtime));
925
926 if (!TheICAPConfig.reuse_connections)
927 buf.Printf("Connection: close\r\n");
928
774c051c 929 buf.Printf("Encapsulated: ");
930
931 MemBuf httpBuf;
12b91c99 932
774c051c 933 httpBuf.init();
934
935 // build HTTP request header, if any
936 ICAP::Method m = s.method;
937
938 if (ICAP::methodRespmod == m && virgin->data->cause)
939 encapsulateHead(buf, "req-hdr", httpBuf, virgin->data->cause);
940 else if (ICAP::methodReqmod == m)
941 encapsulateHead(buf, "req-hdr", httpBuf, virgin->data->header);
942
943 if (ICAP::methodRespmod == m)
944 if (const MsgPipeData::Header *prime = virgin->data->header)
945 encapsulateHead(buf, "res-hdr", httpBuf, prime);
946
947 if (!virginBody.expected())
1dd6edf2 948 buf.Printf("null-body=%d", (int) httpBuf.contentSize());
774c051c 949 else if (ICAP::methodReqmod == m)
1dd6edf2 950 buf.Printf("req-body=%d", (int) httpBuf.contentSize());
774c051c 951 else
1dd6edf2 952 buf.Printf("res-body=%d", (int) httpBuf.contentSize());
774c051c 953
954 buf.append(ICAP::crlf, 2); // terminate Encapsulated line
955
956 if (shouldPreview()) {
957 buf.Printf("Preview: %d\r\n", (int)preview.ad());
958 virginSendClaim.protectUpTo(preview.ad());
959 }
960
961 if (shouldAllow204()) {
962 buf.Printf("Allow: 204\r\n");
963 // be robust: do not rely on the expected body size
964 virginSendClaim.protectAll();
965 }
966
a97e82a8 967 const HttpRequest *request = virgin->data->cause ?
968 virgin->data->cause :
969 dynamic_cast<const HttpRequest*>(virgin->data->header);
970
12b91c99 971 if (TheICAPConfig.send_client_ip)
972 if (request->client_addr.s_addr != any_addr.s_addr)
973 buf.Printf("X-Client-IP: %s\r\n", inet_ntoa(request->client_addr));
a97e82a8 974
12b91c99 975 if (TheICAPConfig.send_client_username)
976 if (request->auth_user_request)
977 if (request->auth_user_request->username())
978 buf.Printf("X-Client-Username: %s\r\n", request->auth_user_request->username());
a97e82a8 979
2dfede9e 980 // fprintf(stderr, "%s\n", buf.content());
a97e82a8 981
774c051c 982 buf.append(ICAP::crlf, 2); // terminate ICAP header
983
984 // start ICAP request body with encapsulated HTTP headers
985 buf.append(httpBuf.content(), httpBuf.contentSize());
986
987 httpBuf.clean();
988}
989
990void ICAPModXact::encapsulateHead(MemBuf &icapBuf, const char *section, MemBuf &httpBuf, const HttpMsg *head)
991{
992 // update ICAP header
7cab7e9f 993 icapBuf.Printf("%s=%d, ", section, (int) httpBuf.contentSize());
774c051c 994
995 // pack HTTP head
996 packHead(httpBuf, head);
997}
998
999void ICAPModXact::packHead(MemBuf &httpBuf, const HttpMsg *head)
1000{
1001 Packer p;
1002 packerToMemInit(&p, &httpBuf);
1003 head->packInto(&p, true);
1004 packerClean(&p);
1005}
1006
1007// decides whether to offer a preview and calculates its size
1008bool ICAPModXact::shouldPreview()
1009{
1010 size_t wantedSize;
1011
7cdbbd47 1012 if (!TheICAPConfig.preview_enable) {
1013 debugs(93, 5, HERE << "preview disabled by squid.conf");
1014 return false;
1015 }
1016
774c051c 1017 if (!service().wantsPreview(wantedSize)) {
1018 debugs(93, 5, "ICAPModXact should not offer preview");
1019 return false;
1020 }
1021
1022 Must(wantedSize >= 0);
1023
1024 // cannot preview more than we can backup
1025 size_t ad = XMIN(wantedSize, TheBackupLimit);
1026
1027 if (virginBody.expected() && virginBody.knownSize())
1028 ad = XMIN(ad, virginBody.size()); // not more than we have
1029 else
1030 ad = 0; // questionable optimization?
1031
1032 debugs(93, 5, "ICAPModXact should offer " << ad << "-byte preview " <<
1033 "(service wanted " << wantedSize << ")");
1034
1035 preview.enable(ad);
1036
1037 return preview.enabled();
1038}
1039
1040// decides whether to allow 204 responses
1041bool ICAPModXact::shouldAllow204()
1042{
1043 if (!service().allows204())
1044 return false;
1045
1046 if (!virginBody.expected())
1047 return true; // no body means no problems with supporting 204s.
1048
1049 // if there is a body, make sure we can backup it all
1050
1051 if (!virginBody.knownSize())
1052 return false;
1053
1054 // or should we have a different backup limit?
1055 // note that '<' allows for 0-termination of the "full" backup buffer
1056 return virginBody.size() < TheBackupLimit;
1057}
1058
1059// returns a temporary string depicting transaction status, for debugging
1060void ICAPModXact::fillPendingStatus(MemBuf &buf) const
1061{
1062 if (state.serviceWaiting)
1063 buf.append("U", 1);
1064
1065 if (!state.doneWriting() && state.writing != State::writingInit)
1066 buf.Printf("w(%d)", state.writing);
1067
1068 if (preview.enabled()) {
1069 if (!preview.done())
1dd6edf2 1070 buf.Printf("P(%d)", (int) preview.debt());
774c051c 1071 }
1072
1073 if (virginSendClaim.active())
1074 buf.append("B", 1);
1075
1076 if (!state.doneParsing() && state.parsing != State::psIcapHeader)
1077 buf.Printf("p(%d)", state.parsing);
1078
1079 if (!doneSending() && state.sending != State::sendingUndecided)
1080 buf.Printf("S(%d)", state.sending);
1081}
1082
1083void ICAPModXact::fillDoneStatus(MemBuf &buf) const
1084{
1085 if (state.doneReceiving)
1086 buf.append("R", 1);
1087
1088 if (state.doneWriting())
1089 buf.append("w", 1);
1090
1091 if (preview.enabled()) {
1092 if (preview.done())
1093 buf.Printf("P%s", preview.ieof() ? "(ieof)" : "");
1094 }
1095
1096 if (doneReading())
1097 buf.append("r", 1);
1098
1099 if (state.doneParsing())
1100 buf.append("p", 1);
1101
1102 if (doneSending())
1103 buf.append("S", 1);
1104}
1105
1106bool ICAPModXact::gotEncapsulated(const char *section) const
1107{
1108 return httpHeaderGetByNameListMember(&icapReply->header, "Encapsulated",
1109 section, ',').size() > 0;
1110}
1111
1112// calculate whether there is a virgin HTTP body and
1113// whether its expected size is known
1114void ICAPModXact::estimateVirginBody()
1115{
1116 // note: defaults should be fine but will disable previews and 204s
1117
1118 Must(virgin != NULL && virgin->data->header);
1119
1120 method_t method;
1121
1122 if (virgin->data->cause)
1123 method = virgin->data->cause->method;
1124 else
a97e82a8 1125 if (HttpRequest *req = dynamic_cast<HttpRequest*>(virgin->data->
1126 header))
774c051c 1127 method = req->method;
1128 else
1129 return;
1130
1131 ssize_t size;
1132 if (virgin->data->header->expectingBody(method, size)) {
1133 virginBody.expect(size)
1134 ;
1135 debugs(93, 6, "ICAPModXact expects virgin body; size: " << size);
1136 } else {
1137 debugs(93, 6, "ICAPModXact does not expect virgin body");
1138 }
1139}
1140
1141
1142// TODO: Move SizedEstimate, MemBufBackup, and ICAPPreview elsewhere
1143
1144SizedEstimate::SizedEstimate()
1145 : theData(dtUnexpected)
1146{}
1147
1148void SizedEstimate::expect(ssize_t aSize)
1149{
1150 theData = (aSize >= 0) ? aSize : (ssize_t)dtUnknown;
1151}
1152
1153bool SizedEstimate::expected() const
1154{
1155 return theData != dtUnexpected;
1156}
1157
1158bool SizedEstimate::knownSize() const
1159{
1160 Must(expected());
1161 return theData != dtUnknown;
1162}
1163
1164size_t SizedEstimate::size() const
1165{
1166 Must(knownSize());
1167 return static_cast<size_t>(theData);
1168}
1169
1170
1171
1172MemBufClaim::MemBufClaim(): theStart(-1), theGoal(-1)
1173{}
1174
1175void MemBufClaim::protectAll()
1176{
1177 if (theStart < 0)
1178 theStart = 0;
1179
1180 theGoal = -1; // no specific goal
1181}
1182
1183void MemBufClaim::protectUpTo(size_t aGoal)
1184{
1185 if (theStart < 0)
1186 theStart = 0;
1187
1188 Must(aGoal >= 0);
1189
1190 theGoal = (theGoal < 0) ? static_cast<ssize_t>(aGoal) :
1191 XMIN(static_cast<ssize_t>(aGoal), theGoal);
1192}
1193
1194void MemBufClaim::disable()
1195{
1196 theStart = -1;
1197}
1198
1199void MemBufClaim::release(size_t size)
1200{
1201 Must(active());
1202 Must(size >= 0);
1203 theStart += static_cast<ssize_t>(size);
1204
1205 if (limited() && theStart >= theGoal)
1206 disable();
1207}
1208
1209size_t MemBufClaim::offset() const
1210{
1211 Must(active());
1212 return static_cast<size_t>(theStart);
1213}
1214
1215bool MemBufClaim::limited() const
1216{
1217 Must(active());
1218 return theGoal >= 0;
1219}
1220
1221
1222ICAPPreview::ICAPPreview(): theWritten(0), theAd(0), theState(stDisabled)
1223{}
1224
1225void ICAPPreview::enable(size_t anAd)
1226{
1227 // TODO: check for anAd not exceeding preview size limit
1228 Must(anAd >= 0);
1229 Must(!enabled());
1230 theAd = anAd;
1231 theState = stWriting;
1232}
1233
1234bool ICAPPreview::enabled() const
1235{
1236 return theState != stDisabled;
1237}
1238
1239size_t ICAPPreview::ad() const
1240{
1241 Must(enabled());
1242 return theAd;
1243}
1244
1245bool ICAPPreview::done() const
1246{
1247 Must(enabled());
1248 return theState >= stIeof;
1249}
1250
1251bool ICAPPreview::ieof() const
1252{
1253 Must(enabled());
1254 return theState == stIeof;
1255}
1256
1257size_t ICAPPreview::debt() const
1258{
1259 Must(enabled());
1260 return done() ? 0 : (theAd - theWritten);
1261}
1262
1263void ICAPPreview::wrote(size_t size, bool sawEof)
1264{
1265 Must(enabled());
1266 theWritten += size;
1267
1268 if (theWritten >= theAd)
1269 theState = stDone; // sawEof is irrelevant
1270 else
1271 if (sawEof)
1272 theState = stIeof;
1273}
1274