]> git.ipfire.org Git - thirdparty/squid.git/blob - src/clients/Client.cc
Docs: Copyright updates for 2018 (#114)
[thirdparty/squid.git] / src / clients / Client.cc
1 /*
2 * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 #include "squid.h"
10 #include "acl/FilledChecklist.h"
11 #include "acl/Gadgets.h"
12 #include "base/TextException.h"
13 #include "clients/Client.h"
14 #include "comm/Connection.h"
15 #include "comm/forward.h"
16 #include "comm/Write.h"
17 #include "err_detail_type.h"
18 #include "errorpage.h"
19 #include "fd.h"
20 #include "HttpHdrContRange.h"
21 #include "HttpReply.h"
22 #include "HttpRequest.h"
23 #include "SquidConfig.h"
24 #include "SquidTime.h"
25 #include "StatCounters.h"
26 #include "Store.h"
27 #include "tools.h"
28 #include "URL.h"
29
30 #if USE_ADAPTATION
31 #include "adaptation/AccessCheck.h"
32 #include "adaptation/Answer.h"
33 #include "adaptation/Iterator.h"
34 #include "base/AsyncCall.h"
35 #endif
36
37 // implemented in client_side_reply.cc until sides have a common parent
38 void purgeEntriesByUrl(HttpRequest * req, const char *url);
39
40 Client::Client(FwdState *theFwdState) :
41 AsyncJob("Client"),
42 fwd(theFwdState),
43 request(fwd->request)
44 {
45 entry = fwd->entry;
46 entry->lock("Client");
47 }
48
49 Client::~Client()
50 {
51 // paranoid: check that swanSong has been called
52 assert(!requestBodySource);
53 #if USE_ADAPTATION
54 assert(!virginBodyDestination);
55 assert(!adaptedBodySource);
56 #endif
57
58 entry->unlock("Client");
59
60 HTTPMSGUNLOCK(theVirginReply);
61 HTTPMSGUNLOCK(theFinalReply);
62
63 if (responseBodyBuffer != NULL) {
64 delete responseBodyBuffer;
65 responseBodyBuffer = NULL;
66 }
67 }
68
69 void
70 Client::swanSong()
71 {
72 // get rid of our piping obligations
73 if (requestBodySource != NULL)
74 stopConsumingFrom(requestBodySource);
75
76 #if USE_ADAPTATION
77 cleanAdaptation();
78 #endif
79
80 if (!doneWithServer())
81 closeServer();
82
83 if (!doneWithFwd) {
84 doneWithFwd = "swanSong()";
85 fwd->handleUnregisteredServerEnd();
86 }
87
88 BodyConsumer::swanSong();
89 #if USE_ADAPTATION
90 Initiator::swanSong();
91 BodyProducer::swanSong();
92 #endif
93
94 // paranoid: check that swanSong has been called
95 // extra paranoid: yeah, I really mean it. they MUST pass here.
96 assert(!requestBodySource);
97 #if USE_ADAPTATION
98 assert(!virginBodyDestination);
99 assert(!adaptedBodySource);
100 #endif
101 }
102
103 HttpReply *
104 Client::virginReply()
105 {
106 assert(theVirginReply);
107 return theVirginReply;
108 }
109
110 const HttpReply *
111 Client::virginReply() const
112 {
113 assert(theVirginReply);
114 return theVirginReply;
115 }
116
117 HttpReply *
118 Client::setVirginReply(HttpReply *rep)
119 {
120 debugs(11,5, HERE << this << " setting virgin reply to " << rep);
121 assert(!theVirginReply);
122 assert(rep);
123 theVirginReply = rep;
124 HTTPMSGLOCK(theVirginReply);
125 return theVirginReply;
126 }
127
128 HttpReply *
129 Client::finalReply()
130 {
131 assert(theFinalReply);
132 return theFinalReply;
133 }
134
135 HttpReply *
136 Client::setFinalReply(HttpReply *rep)
137 {
138 debugs(11,5, HERE << this << " setting final reply to " << rep);
139
140 assert(!theFinalReply);
141 assert(rep);
142 theFinalReply = rep;
143 HTTPMSGLOCK(theFinalReply);
144
145 // give entry the reply because haveParsedReplyHeaders() expects it there
146 entry->replaceHttpReply(theFinalReply, false); // but do not write yet
147 haveParsedReplyHeaders(); // update the entry/reply (e.g., set timestamps)
148 if (!EBIT_TEST(entry->flags, RELEASE_REQUEST) && blockCaching())
149 entry->release();
150 entry->startWriting(); // write the updated entry to store
151
152 return theFinalReply;
153 }
154
155 // called when no more server communication is expected; may quit
156 void
157 Client::serverComplete()
158 {
159 debugs(11,5,HERE << "serverComplete " << this);
160
161 if (!doneWithServer()) {
162 closeServer();
163 assert(doneWithServer());
164 }
165
166 completed = true;
167 originalRequest()->hier.stopPeerClock(true);
168
169 if (requestBodySource != NULL)
170 stopConsumingFrom(requestBodySource);
171
172 if (responseBodyBuffer != NULL)
173 return;
174
175 serverComplete2();
176 }
177
178 void
179 Client::serverComplete2()
180 {
181 debugs(11,5,HERE << "serverComplete2 " << this);
182
183 #if USE_ADAPTATION
184 if (virginBodyDestination != NULL)
185 stopProducingFor(virginBodyDestination, true);
186
187 if (!doneWithAdaptation())
188 return;
189 #endif
190
191 completeForwarding();
192 }
193
194 bool Client::doneAll() const
195 {
196 return doneWithServer() &&
197 #if USE_ADAPTATION
198 doneWithAdaptation() &&
199 Adaptation::Initiator::doneAll() &&
200 BodyProducer::doneAll() &&
201 #endif
202 BodyConsumer::doneAll();
203 }
204
205 // FTP side overloads this to work around multiple calls to fwd->complete
206 void
207 Client::completeForwarding()
208 {
209 debugs(11,5, HERE << "completing forwarding for " << fwd);
210 assert(fwd != NULL);
211 doneWithFwd = "completeForwarding()";
212 fwd->complete();
213 }
214
215 // Register to receive request body
216 bool Client::startRequestBodyFlow()
217 {
218 HttpRequestPointer r(originalRequest());
219 assert(r->body_pipe != NULL);
220 requestBodySource = r->body_pipe;
221 if (requestBodySource->setConsumerIfNotLate(this)) {
222 debugs(11,3, HERE << "expecting request body from " <<
223 requestBodySource->status());
224 return true;
225 }
226
227 debugs(11,3, HERE << "aborting on partially consumed request body: " <<
228 requestBodySource->status());
229 requestBodySource = NULL;
230 return false;
231 }
232
233 // Entry-dependent callbacks use this check to quit if the entry went bad
234 bool
235 Client::abortOnBadEntry(const char *abortReason)
236 {
237 if (entry->isAccepting())
238 return false;
239
240 debugs(11,5, HERE << "entry is not Accepting!");
241 abortOnData(abortReason);
242 return true;
243 }
244
245 // more request or adapted response body is available
246 void
247 Client::noteMoreBodyDataAvailable(BodyPipe::Pointer bp)
248 {
249 #if USE_ADAPTATION
250 if (adaptedBodySource == bp) {
251 handleMoreAdaptedBodyAvailable();
252 return;
253 }
254 #endif
255 if (requestBodySource == bp)
256 handleMoreRequestBodyAvailable();
257 }
258
259 // the entire request or adapted response body was provided, successfully
260 void
261 Client::noteBodyProductionEnded(BodyPipe::Pointer bp)
262 {
263 #if USE_ADAPTATION
264 if (adaptedBodySource == bp) {
265 handleAdaptedBodyProductionEnded();
266 return;
267 }
268 #endif
269 if (requestBodySource == bp)
270 handleRequestBodyProductionEnded();
271 }
272
273 // premature end of the request or adapted response body production
274 void
275 Client::noteBodyProducerAborted(BodyPipe::Pointer bp)
276 {
277 #if USE_ADAPTATION
278 if (adaptedBodySource == bp) {
279 handleAdaptedBodyProducerAborted();
280 return;
281 }
282 #endif
283 if (requestBodySource == bp)
284 handleRequestBodyProducerAborted();
285 }
286
287 bool
288 Client::abortOnData(const char *reason)
289 {
290 abortAll(reason);
291 return true;
292 }
293
294 // more origin request body data is available
295 void
296 Client::handleMoreRequestBodyAvailable()
297 {
298 if (!requestSender)
299 sendMoreRequestBody();
300 else
301 debugs(9,3, HERE << "waiting for request body write to complete");
302 }
303
304 // there will be no more handleMoreRequestBodyAvailable calls
305 void
306 Client::handleRequestBodyProductionEnded()
307 {
308 receivedWholeRequestBody = true;
309 if (!requestSender)
310 doneSendingRequestBody();
311 else
312 debugs(9,3, HERE << "waiting for request body write to complete");
313 }
314
315 // called when we are done sending request body; kids extend this
316 void
317 Client::doneSendingRequestBody()
318 {
319 debugs(9,3, HERE << "done sending request body");
320 assert(requestBodySource != NULL);
321 stopConsumingFrom(requestBodySource);
322
323 // kids extend this
324 }
325
326 // called when body producers aborts; kids extend this
327 void
328 Client::handleRequestBodyProducerAborted()
329 {
330 if (requestSender != NULL)
331 debugs(9,3, HERE << "fyi: request body aborted while we were sending");
332
333 fwd->dontRetry(true); // the problem is not with the server
334 stopConsumingFrom(requestBodySource); // requestSender, if any, will notice
335
336 // kids extend this
337 }
338
339 // called when we wrote request headers(!) or a part of the body
340 void
341 Client::sentRequestBody(const CommIoCbParams &io)
342 {
343 debugs(11, 5, "sentRequestBody: FD " << io.fd << ": size " << io.size << ": errflag " << io.flag << ".");
344 debugs(32,3,HERE << "sentRequestBody called");
345
346 requestSender = NULL;
347
348 if (io.size > 0) {
349 fd_bytes(io.fd, io.size, FD_WRITE);
350 statCounter.server.all.kbytes_out += io.size;
351 // kids should increment their counters
352 }
353
354 if (io.flag == Comm::ERR_CLOSING)
355 return;
356
357 if (!requestBodySource) {
358 debugs(9,3, HERE << "detected while-we-were-sending abort");
359 return; // do nothing;
360 }
361
362 // both successful and failed writes affect response times
363 request->hier.notePeerWrite();
364
365 if (io.flag) {
366 debugs(11, DBG_IMPORTANT, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(io.xerrno));
367 ErrorState *err;
368 err = new ErrorState(ERR_WRITE_ERROR, Http::scBadGateway, fwd->request);
369 err->xerrno = io.xerrno;
370 fwd->fail(err);
371 abortOnData("I/O error while sending request body");
372 return;
373 }
374
375 if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
376 abortOnData("store entry aborted while sending request body");
377 return;
378 }
379
380 if (!requestBodySource->exhausted())
381 sendMoreRequestBody();
382 else if (receivedWholeRequestBody)
383 doneSendingRequestBody();
384 else
385 debugs(9,3, HERE << "waiting for body production end or abort");
386 }
387
388 void
389 Client::sendMoreRequestBody()
390 {
391 assert(requestBodySource != NULL);
392 assert(!requestSender);
393
394 const Comm::ConnectionPointer conn = dataConnection();
395
396 if (!Comm::IsConnOpen(conn)) {
397 debugs(9,3, HERE << "cannot send request body to closing " << conn);
398 return; // wait for the kid's close handler; TODO: assert(closer);
399 }
400
401 MemBuf buf;
402 if (getMoreRequestBody(buf) && buf.contentSize() > 0) {
403 debugs(9,3, HERE << "will write " << buf.contentSize() << " request body bytes");
404 typedef CommCbMemFunT<Client, CommIoCbParams> Dialer;
405 requestSender = JobCallback(93,3, Dialer, this, Client::sentRequestBody);
406 Comm::Write(conn, &buf, requestSender);
407 } else {
408 debugs(9,3, HERE << "will wait for more request body bytes or eof");
409 requestSender = NULL;
410 }
411 }
412
413 /// either fill buf with available [encoded] request body bytes or return false
414 bool
415 Client::getMoreRequestBody(MemBuf &buf)
416 {
417 // default implementation does not encode request body content
418 Must(requestBodySource != NULL);
419 return requestBodySource->getMoreData(buf);
420 }
421
422 // Compares hosts in urls, returns false if different, no sheme, or no host.
423 static bool
424 sameUrlHosts(const char *url1, const char *url2)
425 {
426 // XXX: Want urlHostname() here, but it uses static storage and copying
427 const char *host1 = strchr(url1, ':');
428 const char *host2 = strchr(url2, ':');
429
430 if (host1 && host2) {
431 // skip scheme slashes
432 do {
433 ++host1;
434 ++host2;
435 } while (*host1 == '/' && *host2 == '/');
436
437 if (!*host1)
438 return false; // no host
439
440 // increment while the same until we reach the end of the URL/host
441 while (*host1 && *host1 != '/' && *host1 == *host2) {
442 ++host1;
443 ++host2;
444 }
445 return *host1 == *host2;
446 }
447
448 return false; // no URL scheme
449 }
450
451 // purges entries that match the value of a given HTTP [response] header
452 static void
453 purgeEntriesByHeader(HttpRequest *req, const char *reqUrl, Http::Message *rep, Http::HdrType hdr)
454 {
455 const char *hdrUrl, *absUrl;
456
457 absUrl = NULL;
458 hdrUrl = rep->header.getStr(hdr);
459 if (hdrUrl == NULL) {
460 return;
461 }
462
463 /*
464 * If the URL is relative, make it absolute so we can find it.
465 * If it's absolute, make sure the host parts match to avoid DOS attacks
466 * as per RFC 2616 13.10.
467 */
468 if (urlIsRelative(hdrUrl)) {
469 absUrl = urlMakeAbsolute(req, hdrUrl);
470 if (absUrl != NULL) {
471 hdrUrl = absUrl;
472 }
473 } else if (!sameUrlHosts(reqUrl, hdrUrl)) {
474 return;
475 }
476
477 purgeEntriesByUrl(req, hdrUrl);
478
479 if (absUrl != NULL) {
480 safe_free(absUrl);
481 }
482 }
483
484 // some HTTP methods should purge matching cache entries
485 void
486 Client::maybePurgeOthers()
487 {
488 // only some HTTP methods should purge matching cache entries
489 if (!request->method.purgesOthers())
490 return;
491
492 // and probably only if the response was successful
493 if (theFinalReply->sline.status() >= 400)
494 return;
495
496 // XXX: should we use originalRequest() here?
497 SBuf tmp(request->effectiveRequestUri());
498 const char *reqUrl = tmp.c_str();
499 debugs(88, 5, "maybe purging due to " << request->method << ' ' << tmp);
500 purgeEntriesByUrl(request.getRaw(), reqUrl);
501 purgeEntriesByHeader(request.getRaw(), reqUrl, theFinalReply, Http::HdrType::LOCATION);
502 purgeEntriesByHeader(request.getRaw(), reqUrl, theFinalReply, Http::HdrType::CONTENT_LOCATION);
503 }
504
505 /// called when we have final (possibly adapted) reply headers; kids extend
506 void
507 Client::haveParsedReplyHeaders()
508 {
509 Must(theFinalReply);
510 maybePurgeOthers();
511
512 // adaptation may overwrite old offset computed using the virgin response
513 const bool partial = theFinalReply->contentRange();
514 currentOffset = partial ? theFinalReply->contentRange()->spec.offset : 0;
515 }
516
517 /// whether to prevent caching of an otherwise cachable response
518 bool
519 Client::blockCaching()
520 {
521 if (const Acl::Tree *acl = Config.accessList.storeMiss) {
522 // This relatively expensive check is not in StoreEntry::checkCachable:
523 // That method lacks HttpRequest and may be called too many times.
524 ACLFilledChecklist ch(acl, originalRequest().getRaw());
525 ch.reply = const_cast<HttpReply*>(entry->getReply()); // ACLFilledChecklist API bug
526 HTTPMSGLOCK(ch.reply);
527 if (!ch.fastCheck().allowed()) { // when in doubt, block
528 debugs(20, 3, "store_miss prohibits caching");
529 return true;
530 }
531 }
532 return false;
533 }
534
535 HttpRequestPointer
536 Client::originalRequest()
537 {
538 return request;
539 }
540
541 #if USE_ADAPTATION
542 /// Initiate an asynchronous adaptation transaction which will call us back.
543 void
544 Client::startAdaptation(const Adaptation::ServiceGroupPointer &group, HttpRequest *cause)
545 {
546 debugs(11, 5, "Client::startAdaptation() called");
547 // check whether we should be sending a body as well
548 // start body pipe to feed ICAP transaction if needed
549 assert(!virginBodyDestination);
550 HttpReply *vrep = virginReply();
551 assert(!vrep->body_pipe);
552 int64_t size = 0;
553 if (vrep->expectingBody(cause->method, size) && size) {
554 virginBodyDestination = new BodyPipe(this);
555 vrep->body_pipe = virginBodyDestination;
556 debugs(93, 6, HERE << "will send virgin reply body to " <<
557 virginBodyDestination << "; size: " << size);
558 if (size > 0)
559 virginBodyDestination->setBodySize(size);
560 }
561
562 adaptedHeadSource = initiateAdaptation(
563 new Adaptation::Iterator(vrep, cause, fwd->al, group));
564 startedAdaptation = initiated(adaptedHeadSource);
565 Must(startedAdaptation);
566 }
567
568 // properly cleans up ICAP-related state
569 // may be called multiple times
570 void Client::cleanAdaptation()
571 {
572 debugs(11,5, HERE << "cleaning ICAP; ACL: " << adaptationAccessCheckPending);
573
574 if (virginBodyDestination != NULL)
575 stopProducingFor(virginBodyDestination, false);
576
577 announceInitiatorAbort(adaptedHeadSource);
578
579 if (adaptedBodySource != NULL)
580 stopConsumingFrom(adaptedBodySource);
581
582 if (!adaptationAccessCheckPending) // we cannot cancel a pending callback
583 assert(doneWithAdaptation()); // make sure the two methods are in sync
584 }
585
586 bool
587 Client::doneWithAdaptation() const
588 {
589 return !adaptationAccessCheckPending &&
590 !virginBodyDestination && !adaptedHeadSource && !adaptedBodySource;
591 }
592
593 // sends virgin reply body to ICAP, buffering excesses if needed
594 void
595 Client::adaptVirginReplyBody(const char *data, ssize_t len)
596 {
597 assert(startedAdaptation);
598
599 if (!virginBodyDestination) {
600 debugs(11,3, HERE << "ICAP does not want more virgin body");
601 return;
602 }
603
604 // grow overflow area if already overflowed
605 if (responseBodyBuffer) {
606 responseBodyBuffer->append(data, len);
607 data = responseBodyBuffer->content();
608 len = responseBodyBuffer->contentSize();
609 }
610
611 const ssize_t putSize = virginBodyDestination->putMoreData(data, len);
612 data += putSize;
613 len -= putSize;
614
615 // if we had overflow area, shrink it as necessary
616 if (responseBodyBuffer) {
617 if (putSize == responseBodyBuffer->contentSize()) {
618 delete responseBodyBuffer;
619 responseBodyBuffer = NULL;
620 } else {
621 responseBodyBuffer->consume(putSize);
622 }
623 return;
624 }
625
626 // if we did not have an overflow area, create it as needed
627 if (len > 0) {
628 assert(!responseBodyBuffer);
629 responseBodyBuffer = new MemBuf;
630 responseBodyBuffer->init(4096, SQUID_TCP_SO_RCVBUF * 10);
631 responseBodyBuffer->append(data, len);
632 }
633 }
634
635 // can supply more virgin response body data
636 void
637 Client::noteMoreBodySpaceAvailable(BodyPipe::Pointer)
638 {
639 if (responseBodyBuffer) {
640 addVirginReplyBody(NULL, 0); // kick the buffered fragment alive again
641 if (completed && !responseBodyBuffer) {
642 serverComplete2();
643 return;
644 }
645 }
646 maybeReadVirginBody();
647 }
648
649 // the consumer of our virgin response body aborted
650 void
651 Client::noteBodyConsumerAborted(BodyPipe::Pointer)
652 {
653 stopProducingFor(virginBodyDestination, false);
654
655 // do not force closeServer here in case we need to bypass AdaptationQueryAbort
656
657 if (doneWithAdaptation()) // we may still be receiving adapted response
658 handleAdaptationCompleted();
659 }
660
661 // received adapted response headers (body may follow)
662 void
663 Client::noteAdaptationAnswer(const Adaptation::Answer &answer)
664 {
665 clearAdaptation(adaptedHeadSource); // we do not expect more messages
666
667 switch (answer.kind) {
668 case Adaptation::Answer::akForward:
669 handleAdaptedHeader(const_cast<Http::Message*>(answer.message.getRaw()));
670 break;
671
672 case Adaptation::Answer::akBlock:
673 handleAdaptationBlocked(answer);
674 break;
675
676 case Adaptation::Answer::akError:
677 handleAdaptationAborted(!answer.final);
678 break;
679 }
680 }
681
682 void
683 Client::handleAdaptedHeader(Http::Message *msg)
684 {
685 if (abortOnBadEntry("entry went bad while waiting for adapted headers")) {
686 // If the adapted response has a body, the ICAP side needs to know
687 // that nobody will consume that body. We will be destroyed upon
688 // return. Tell the ICAP side that it is on its own.
689 HttpReply *rep = dynamic_cast<HttpReply*>(msg);
690 assert(rep);
691 if (rep->body_pipe != NULL)
692 rep->body_pipe->expectNoConsumption();
693
694 return;
695 }
696
697 HttpReply *rep = dynamic_cast<HttpReply*>(msg);
698 assert(rep);
699 debugs(11,5, HERE << this << " setting adapted reply to " << rep);
700 setFinalReply(rep);
701
702 assert(!adaptedBodySource);
703 if (rep->body_pipe != NULL) {
704 // subscribe to receive adapted body
705 adaptedBodySource = rep->body_pipe;
706 // assume that ICAP does not auto-consume on failures
707 const bool result = adaptedBodySource->setConsumerIfNotLate(this);
708 assert(result);
709 } else {
710 // no body
711 if (doneWithAdaptation()) // we may still be sending virgin response
712 handleAdaptationCompleted();
713 }
714 }
715
716 void
717 Client::resumeBodyStorage()
718 {
719 if (abortOnBadEntry("store entry aborted while kick producer callback"))
720 return;
721
722 if (!adaptedBodySource)
723 return;
724
725 handleMoreAdaptedBodyAvailable();
726
727 if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
728 endAdaptedBodyConsumption();
729 }
730
731 // more adapted response body is available
732 void
733 Client::handleMoreAdaptedBodyAvailable()
734 {
735 if (abortOnBadEntry("entry refuses adapted body"))
736 return;
737
738 assert(entry);
739
740 size_t contentSize = adaptedBodySource->buf().contentSize();
741
742 if (!contentSize)
743 return; // XXX: bytesWanted asserts on zero-size ranges
744
745 const size_t spaceAvailable = entry->bytesWanted(Range<size_t>(0, contentSize), true);
746
747 if (spaceAvailable < contentSize ) {
748 // No or partial body data consuming
749 typedef NullaryMemFunT<Client> Dialer;
750 AsyncCall::Pointer call = asyncCall(93, 5, "Client::resumeBodyStorage",
751 Dialer(this, &Client::resumeBodyStorage));
752 entry->deferProducer(call);
753 }
754
755 if (!spaceAvailable) {
756 debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " <<
757 "response body at offset " << adaptedBodySource->consumedSize());
758 return;
759 }
760
761 if (spaceAvailable < contentSize ) {
762 debugs(11, 5, HERE << "postponing storage of " <<
763 (contentSize - spaceAvailable) << " body bytes");
764 contentSize = spaceAvailable;
765 }
766
767 debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " <<
768 "response body at offset " << adaptedBodySource->consumedSize());
769
770 BodyPipeCheckout bpc(*adaptedBodySource);
771 const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize);
772 currentOffset += ioBuf.length;
773 entry->write(ioBuf);
774 bpc.buf.consume(contentSize);
775 bpc.checkIn();
776 }
777
778 // the entire adapted response body was produced, successfully
779 void
780 Client::handleAdaptedBodyProductionEnded()
781 {
782 if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
783 return;
784
785 // end consumption if we consumed everything
786 if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
787 endAdaptedBodyConsumption();
788 // else resumeBodyStorage() will eventually consume the rest
789 }
790
791 void
792 Client::endAdaptedBodyConsumption()
793 {
794 stopConsumingFrom(adaptedBodySource);
795 handleAdaptationCompleted();
796 }
797
798 // premature end of the adapted response body
799 void Client::handleAdaptedBodyProducerAborted()
800 {
801 if (abortOnBadEntry("entry went bad while waiting for the now-aborted adapted body"))
802 return;
803
804 Must(adaptedBodySource != nullptr);
805 if (!adaptedBodySource->exhausted()) {
806 debugs(11,5, "waiting to consume the remainder of the aborted adapted body");
807 return; // resumeBodyStorage() should eventually consume the rest
808 }
809
810 stopConsumingFrom(adaptedBodySource);
811
812 if (handledEarlyAdaptationAbort())
813 return;
814
815 entry->lengthWentBad("body adaptation aborted");
816 handleAdaptationCompleted(); // the user should get a truncated response
817 }
818
819 // common part of noteAdaptationAnswer and handleAdaptedBodyProductionEnded
820 void
821 Client::handleAdaptationCompleted()
822 {
823 debugs(11,5, HERE << "handleAdaptationCompleted");
824 cleanAdaptation();
825
826 // We stop reading origin response because we have no place to put it(*) and
827 // cannot use it. If some origin servers do not like that or if we want to
828 // reuse more pconns, we can add code to discard unneeded origin responses.
829 // (*) TODO: Is it possible that the adaptation xaction is still running?
830 if (mayReadVirginReplyBody()) {
831 debugs(11,3, HERE << "closing origin conn due to ICAP completion");
832 closeServer();
833 }
834
835 completeForwarding();
836 }
837
838 // common part of noteAdaptation*Aborted and noteBodyConsumerAborted methods
839 void
840 Client::handleAdaptationAborted(bool bypassable)
841 {
842 debugs(11,5, HERE << "handleAdaptationAborted; bypassable: " << bypassable <<
843 ", entry empty: " << entry->isEmpty());
844
845 if (abortOnBadEntry("entry went bad while ICAP aborted"))
846 return;
847
848 // TODO: bypass if possible
849 if (!handledEarlyAdaptationAbort())
850 abortAll("adaptation failure with a filled entry");
851 }
852
853 /// If the store entry is still empty, fully handles adaptation abort, returning
854 /// true. Otherwise just updates the request error detail and returns false.
855 bool
856 Client::handledEarlyAdaptationAbort()
857 {
858 if (entry->isEmpty()) {
859 debugs(11,8, "adaptation failure with an empty entry: " << *entry);
860 ErrorState *err = new ErrorState(ERR_ICAP_FAILURE, Http::scInternalServerError, request.getRaw());
861 err->detailError(ERR_DETAIL_ICAP_RESPMOD_EARLY);
862 fwd->fail(err);
863 fwd->dontRetry(true);
864 abortAll("adaptation failure with an empty entry");
865 return true; // handled
866 }
867
868 if (request) // update logged info directly
869 request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_ICAP_RESPMOD_LATE);
870
871 return false; // the caller must handle
872 }
873
874 // adaptation service wants us to deny HTTP client access to this response
875 void
876 Client::handleAdaptationBlocked(const Adaptation::Answer &answer)
877 {
878 debugs(11,5, HERE << answer.ruleId);
879
880 if (abortOnBadEntry("entry went bad while ICAP aborted"))
881 return;
882
883 if (!entry->isEmpty()) { // too late to block (should not really happen)
884 if (request)
885 request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_RESPMOD_BLOCK_LATE);
886 abortAll("late adaptation block");
887 return;
888 }
889
890 debugs(11,7, HERE << "creating adaptation block response");
891
892 err_type page_id =
893 aclGetDenyInfoPage(&Config.denyInfoList, answer.ruleId.termedBuf(), 1);
894 if (page_id == ERR_NONE)
895 page_id = ERR_ACCESS_DENIED;
896
897 ErrorState *err = new ErrorState(page_id, Http::scForbidden, request.getRaw());
898 err->detailError(ERR_DETAIL_RESPMOD_BLOCK_EARLY);
899 fwd->fail(err);
900 fwd->dontRetry(true);
901
902 abortOnData("timely adaptation block");
903 }
904
905 void
906 Client::noteAdaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
907 {
908 adaptationAccessCheckPending = false;
909
910 if (abortOnBadEntry("entry went bad while waiting for ICAP ACL check"))
911 return;
912
913 // TODO: Should non-ICAP and ICAP REPMOD pre-cache paths check this?
914 // That check now only happens on REQMOD pre-cache and REPMOD post-cache, in processReplyAccess().
915 if (virginReply()->expectedBodyTooLarge(*request)) {
916 sendBodyIsTooLargeError();
917 return;
918 }
919 // TODO: Should we check receivedBodyTooLarge as well?
920
921 if (!group) {
922 debugs(11,3, HERE << "no adapation needed");
923 setFinalReply(virginReply());
924 processReplyBody();
925 return;
926 }
927
928 startAdaptation(group, originalRequest().getRaw());
929 processReplyBody();
930 }
931 #endif
932
933 void
934 Client::sendBodyIsTooLargeError()
935 {
936 ErrorState *err = new ErrorState(ERR_TOO_BIG, Http::scForbidden, request.getRaw());
937 fwd->fail(err);
938 fwd->dontRetry(true);
939 abortOnData("Virgin body too large.");
940 }
941
942 // TODO: when HttpStateData sends all errors to ICAP,
943 // we should be able to move this at the end of setVirginReply().
944 void
945 Client::adaptOrFinalizeReply()
946 {
947 #if USE_ADAPTATION
948 // TODO: merge with client side and return void to hide the on/off logic?
949 // The callback can be called with a NULL service if adaptation is off.
950 adaptationAccessCheckPending = Adaptation::AccessCheck::Start(
951 Adaptation::methodRespmod, Adaptation::pointPreCache,
952 originalRequest().getRaw(), virginReply(), fwd->al, this);
953 debugs(11,5, HERE << "adaptationAccessCheckPending=" << adaptationAccessCheckPending);
954 if (adaptationAccessCheckPending)
955 return;
956 #endif
957
958 setFinalReply(virginReply());
959 }
960
961 /// initializes bodyBytesRead stats if needed and applies delta
962 void
963 Client::adjustBodyBytesRead(const int64_t delta)
964 {
965 int64_t &bodyBytesRead = originalRequest()->hier.bodyBytesRead;
966
967 // if we got here, do not log a dash even if we got nothing from the server
968 if (bodyBytesRead < 0)
969 bodyBytesRead = 0;
970
971 bodyBytesRead += delta; // supports negative and zero deltas
972
973 // check for overflows ("infinite" response?) and undeflows (a bug)
974 Must(bodyBytesRead >= 0);
975 }
976
977 void
978 Client::addVirginReplyBody(const char *data, ssize_t len)
979 {
980 adjustBodyBytesRead(len);
981
982 #if USE_ADAPTATION
983 assert(!adaptationAccessCheckPending); // or would need to buffer while waiting
984 if (startedAdaptation) {
985 adaptVirginReplyBody(data, len);
986 return;
987 }
988 #endif
989 storeReplyBody(data, len);
990 }
991
992 // writes virgin or adapted reply body to store
993 void
994 Client::storeReplyBody(const char *data, ssize_t len)
995 {
996 // write even if len is zero to push headers towards the client side
997 entry->write (StoreIOBuffer(len, currentOffset, (char*)data));
998
999 currentOffset += len;
1000 }
1001
1002 size_t
1003 Client::calcBufferSpaceToReserve(size_t space, const size_t wantSpace) const
1004 {
1005 if (space < wantSpace) {
1006 const size_t maxSpace = SBuf::maxSize; // absolute best
1007 space = min(wantSpace, maxSpace); // do not promise more than asked
1008 }
1009
1010 #if USE_ADAPTATION
1011 if (responseBodyBuffer) {
1012 return 0; // Stop reading if already overflowed waiting for ICAP to catch up
1013 }
1014
1015 if (virginBodyDestination != NULL) {
1016 /*
1017 * BodyPipe buffer has a finite size limit. We
1018 * should not read more data from the network than will fit
1019 * into the pipe buffer or we _lose_ what did not fit if
1020 * the response ends sooner that BodyPipe frees up space:
1021 * There is no code to keep pumping data into the pipe once
1022 * response ends and serverComplete() is called.
1023 */
1024 const size_t adaptor_space = virginBodyDestination->buf().potentialSpaceSize();
1025
1026 debugs(11,9, "Client may read up to min(" <<
1027 adaptor_space << ", " << space << ") bytes");
1028
1029 if (adaptor_space < space)
1030 space = adaptor_space;
1031 }
1032 #endif
1033
1034 return space;
1035 }
1036
1037 size_t
1038 Client::replyBodySpace(const MemBuf &readBuf, const size_t minSpace) const
1039 {
1040 size_t space = readBuf.spaceSize(); // available space w/o heroic measures
1041 if (space < minSpace) {
1042 const size_t maxSpace = readBuf.potentialSpaceSize(); // absolute best
1043 space = min(minSpace, maxSpace); // do not promise more than asked
1044 }
1045
1046 #if USE_ADAPTATION
1047 if (responseBodyBuffer) {
1048 return 0; // Stop reading if already overflowed waiting for ICAP to catch up
1049 }
1050
1051 if (virginBodyDestination != NULL) {
1052 /*
1053 * BodyPipe buffer has a finite size limit. We
1054 * should not read more data from the network than will fit
1055 * into the pipe buffer or we _lose_ what did not fit if
1056 * the response ends sooner that BodyPipe frees up space:
1057 * There is no code to keep pumping data into the pipe once
1058 * response ends and serverComplete() is called.
1059 *
1060 * If the pipe is totally full, don't register the read handler.
1061 * The BodyPipe will call our noteMoreBodySpaceAvailable() method
1062 * when it has free space again.
1063 */
1064 size_t adaptation_space =
1065 virginBodyDestination->buf().potentialSpaceSize();
1066
1067 debugs(11,9, "Client may read up to min(" <<
1068 adaptation_space << ", " << space << ") bytes");
1069
1070 if (adaptation_space < space)
1071 space = adaptation_space;
1072 }
1073 #endif
1074
1075 return space;
1076 }
1077