]> git.ipfire.org Git - thirdparty/squid.git/blob - src/clients/Client.cc
220ec275ad4d8eb60865608786607a4ed11f3046
[thirdparty/squid.git] / src / clients / Client.cc
1 /*
2 * Copyright (C) 1996-2019 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 #include "squid.h"
10 #include "acl/FilledChecklist.h"
11 #include "acl/Gadgets.h"
12 #include "base/TextException.h"
13 #include "clients/Client.h"
14 #include "comm/Connection.h"
15 #include "comm/forward.h"
16 #include "comm/Write.h"
17 #include "err_detail_type.h"
18 #include "errorpage.h"
19 #include "fd.h"
20 #include "HttpHdrContRange.h"
21 #include "HttpReply.h"
22 #include "HttpRequest.h"
23 #include "SquidConfig.h"
24 #include "SquidTime.h"
25 #include "StatCounters.h"
26 #include "Store.h"
27 #include "tools.h"
28
29 #if USE_ADAPTATION
30 #include "adaptation/AccessCheck.h"
31 #include "adaptation/Answer.h"
32 #include "adaptation/Iterator.h"
33 #include "base/AsyncCall.h"
34 #endif
35
36 // implemented in client_side_reply.cc until sides have a common parent
37 void purgeEntriesByUrl(HttpRequest * req, const char *url);
38
39 Client::Client(FwdState *theFwdState) :
40 AsyncJob("Client"),
41 fwd(theFwdState),
42 request(fwd->request)
43 {
44 entry = fwd->entry;
45 entry->lock("Client");
46 }
47
48 Client::~Client()
49 {
50 // paranoid: check that swanSong has been called
51 assert(!requestBodySource);
52 #if USE_ADAPTATION
53 assert(!virginBodyDestination);
54 assert(!adaptedBodySource);
55 #endif
56
57 entry->unlock("Client");
58
59 HTTPMSGUNLOCK(theVirginReply);
60 HTTPMSGUNLOCK(theFinalReply);
61
62 if (responseBodyBuffer != NULL) {
63 delete responseBodyBuffer;
64 responseBodyBuffer = NULL;
65 }
66 }
67
68 void
69 Client::swanSong()
70 {
71 // get rid of our piping obligations
72 if (requestBodySource != NULL)
73 stopConsumingFrom(requestBodySource);
74
75 #if USE_ADAPTATION
76 cleanAdaptation();
77 #endif
78
79 if (!doneWithServer())
80 closeServer();
81
82 if (!doneWithFwd) {
83 doneWithFwd = "swanSong()";
84 fwd->handleUnregisteredServerEnd();
85 }
86
87 BodyConsumer::swanSong();
88 #if USE_ADAPTATION
89 Initiator::swanSong();
90 BodyProducer::swanSong();
91 #endif
92
93 // paranoid: check that swanSong has been called
94 // extra paranoid: yeah, I really mean it. they MUST pass here.
95 assert(!requestBodySource);
96 #if USE_ADAPTATION
97 assert(!virginBodyDestination);
98 assert(!adaptedBodySource);
99 #endif
100 }
101
102 HttpReply *
103 Client::virginReply()
104 {
105 assert(theVirginReply);
106 return theVirginReply;
107 }
108
109 const HttpReply *
110 Client::virginReply() const
111 {
112 assert(theVirginReply);
113 return theVirginReply;
114 }
115
116 HttpReply *
117 Client::setVirginReply(HttpReply *rep)
118 {
119 debugs(11,5, HERE << this << " setting virgin reply to " << rep);
120 assert(!theVirginReply);
121 assert(rep);
122 theVirginReply = rep;
123 HTTPMSGLOCK(theVirginReply);
124 if (fwd->al)
125 fwd->al->reply = theVirginReply;
126 return theVirginReply;
127 }
128
129 HttpReply *
130 Client::finalReply()
131 {
132 assert(theFinalReply);
133 return theFinalReply;
134 }
135
136 HttpReply *
137 Client::setFinalReply(HttpReply *rep)
138 {
139 debugs(11,5, HERE << this << " setting final reply to " << rep);
140
141 assert(!theFinalReply);
142 assert(rep);
143 theFinalReply = rep;
144 HTTPMSGLOCK(theFinalReply);
145 if (fwd->al)
146 fwd->al->reply = theFinalReply;
147
148 // give entry the reply because haveParsedReplyHeaders() expects it there
149 entry->replaceHttpReply(theFinalReply, false); // but do not write yet
150 haveParsedReplyHeaders(); // update the entry/reply (e.g., set timestamps)
151 if (!EBIT_TEST(entry->flags, RELEASE_REQUEST) && blockCaching())
152 entry->release();
153 entry->startWriting(); // write the updated entry to store
154
155 return theFinalReply;
156 }
157
158 // called when no more server communication is expected; may quit
159 void
160 Client::serverComplete()
161 {
162 debugs(11,5,HERE << "serverComplete " << this);
163
164 if (!doneWithServer()) {
165 closeServer();
166 assert(doneWithServer());
167 }
168
169 completed = true;
170 originalRequest()->hier.stopPeerClock(true);
171
172 if (requestBodySource != NULL)
173 stopConsumingFrom(requestBodySource);
174
175 if (responseBodyBuffer != NULL)
176 return;
177
178 serverComplete2();
179 }
180
181 void
182 Client::serverComplete2()
183 {
184 debugs(11,5,HERE << "serverComplete2 " << this);
185
186 #if USE_ADAPTATION
187 if (virginBodyDestination != NULL)
188 stopProducingFor(virginBodyDestination, true);
189
190 if (!doneWithAdaptation())
191 return;
192 #endif
193
194 completeForwarding();
195 }
196
197 bool Client::doneAll() const
198 {
199 return doneWithServer() &&
200 #if USE_ADAPTATION
201 doneWithAdaptation() &&
202 Adaptation::Initiator::doneAll() &&
203 BodyProducer::doneAll() &&
204 #endif
205 BodyConsumer::doneAll();
206 }
207
208 // FTP side overloads this to work around multiple calls to fwd->complete
209 void
210 Client::completeForwarding()
211 {
212 debugs(11,5, HERE << "completing forwarding for " << fwd);
213 assert(fwd != NULL);
214 doneWithFwd = "completeForwarding()";
215 fwd->complete();
216 }
217
218 // Register to receive request body
219 bool Client::startRequestBodyFlow()
220 {
221 HttpRequestPointer r(originalRequest());
222 assert(r->body_pipe != NULL);
223 requestBodySource = r->body_pipe;
224 if (requestBodySource->setConsumerIfNotLate(this)) {
225 debugs(11,3, HERE << "expecting request body from " <<
226 requestBodySource->status());
227 return true;
228 }
229
230 debugs(11,3, HERE << "aborting on partially consumed request body: " <<
231 requestBodySource->status());
232 requestBodySource = NULL;
233 return false;
234 }
235
236 // Entry-dependent callbacks use this check to quit if the entry went bad
237 bool
238 Client::abortOnBadEntry(const char *abortReason)
239 {
240 if (entry->isAccepting())
241 return false;
242
243 debugs(11,5, HERE << "entry is not Accepting!");
244 abortOnData(abortReason);
245 return true;
246 }
247
248 // more request or adapted response body is available
249 void
250 Client::noteMoreBodyDataAvailable(BodyPipe::Pointer bp)
251 {
252 #if USE_ADAPTATION
253 if (adaptedBodySource == bp) {
254 handleMoreAdaptedBodyAvailable();
255 return;
256 }
257 #endif
258 if (requestBodySource == bp)
259 handleMoreRequestBodyAvailable();
260 }
261
262 // the entire request or adapted response body was provided, successfully
263 void
264 Client::noteBodyProductionEnded(BodyPipe::Pointer bp)
265 {
266 #if USE_ADAPTATION
267 if (adaptedBodySource == bp) {
268 handleAdaptedBodyProductionEnded();
269 return;
270 }
271 #endif
272 if (requestBodySource == bp)
273 handleRequestBodyProductionEnded();
274 }
275
276 // premature end of the request or adapted response body production
277 void
278 Client::noteBodyProducerAborted(BodyPipe::Pointer bp)
279 {
280 #if USE_ADAPTATION
281 if (adaptedBodySource == bp) {
282 handleAdaptedBodyProducerAborted();
283 return;
284 }
285 #endif
286 if (requestBodySource == bp)
287 handleRequestBodyProducerAborted();
288 }
289
290 bool
291 Client::abortOnData(const char *reason)
292 {
293 abortAll(reason);
294 return true;
295 }
296
297 // more origin request body data is available
298 void
299 Client::handleMoreRequestBodyAvailable()
300 {
301 if (!requestSender)
302 sendMoreRequestBody();
303 else
304 debugs(9,3, HERE << "waiting for request body write to complete");
305 }
306
307 // there will be no more handleMoreRequestBodyAvailable calls
308 void
309 Client::handleRequestBodyProductionEnded()
310 {
311 receivedWholeRequestBody = true;
312 if (!requestSender)
313 doneSendingRequestBody();
314 else
315 debugs(9,3, HERE << "waiting for request body write to complete");
316 }
317
318 // called when we are done sending request body; kids extend this
319 void
320 Client::doneSendingRequestBody()
321 {
322 debugs(9,3, HERE << "done sending request body");
323 assert(requestBodySource != NULL);
324 stopConsumingFrom(requestBodySource);
325
326 // kids extend this
327 }
328
329 // called when body producers aborts; kids extend this
330 void
331 Client::handleRequestBodyProducerAborted()
332 {
333 if (requestSender != NULL)
334 debugs(9,3, HERE << "fyi: request body aborted while we were sending");
335
336 fwd->dontRetry(true); // the problem is not with the server
337 stopConsumingFrom(requestBodySource); // requestSender, if any, will notice
338
339 // kids extend this
340 }
341
342 // called when we wrote request headers(!) or a part of the body
343 void
344 Client::sentRequestBody(const CommIoCbParams &io)
345 {
346 debugs(11, 5, "sentRequestBody: FD " << io.fd << ": size " << io.size << ": errflag " << io.flag << ".");
347 debugs(32,3,HERE << "sentRequestBody called");
348
349 requestSender = NULL;
350
351 if (io.size > 0) {
352 fd_bytes(io.fd, io.size, FD_WRITE);
353 statCounter.server.all.kbytes_out += io.size;
354 // kids should increment their counters
355 }
356
357 if (io.flag == Comm::ERR_CLOSING)
358 return;
359
360 if (!requestBodySource) {
361 debugs(9,3, HERE << "detected while-we-were-sending abort");
362 return; // do nothing;
363 }
364
365 // both successful and failed writes affect response times
366 request->hier.notePeerWrite();
367
368 if (io.flag) {
369 debugs(11, DBG_IMPORTANT, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(io.xerrno));
370 ErrorState *err;
371 err = new ErrorState(ERR_WRITE_ERROR, Http::scBadGateway, fwd->request, fwd->al);
372 err->xerrno = io.xerrno;
373 fwd->fail(err);
374 abortOnData("I/O error while sending request body");
375 return;
376 }
377
378 if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
379 abortOnData("store entry aborted while sending request body");
380 return;
381 }
382
383 if (!requestBodySource->exhausted())
384 sendMoreRequestBody();
385 else if (receivedWholeRequestBody)
386 doneSendingRequestBody();
387 else
388 debugs(9,3, HERE << "waiting for body production end or abort");
389 }
390
391 void
392 Client::sendMoreRequestBody()
393 {
394 assert(requestBodySource != NULL);
395 assert(!requestSender);
396
397 const Comm::ConnectionPointer conn = dataConnection();
398
399 if (!Comm::IsConnOpen(conn)) {
400 debugs(9,3, HERE << "cannot send request body to closing " << conn);
401 return; // wait for the kid's close handler; TODO: assert(closer);
402 }
403
404 MemBuf buf;
405 if (getMoreRequestBody(buf) && buf.contentSize() > 0) {
406 debugs(9,3, HERE << "will write " << buf.contentSize() << " request body bytes");
407 typedef CommCbMemFunT<Client, CommIoCbParams> Dialer;
408 requestSender = JobCallback(93,3, Dialer, this, Client::sentRequestBody);
409 Comm::Write(conn, &buf, requestSender);
410 } else {
411 debugs(9,3, HERE << "will wait for more request body bytes or eof");
412 requestSender = NULL;
413 }
414 }
415
416 /// either fill buf with available [encoded] request body bytes or return false
417 bool
418 Client::getMoreRequestBody(MemBuf &buf)
419 {
420 // default implementation does not encode request body content
421 Must(requestBodySource != NULL);
422 return requestBodySource->getMoreData(buf);
423 }
424
425 // Compares hosts in urls, returns false if different, no sheme, or no host.
426 static bool
427 sameUrlHosts(const char *url1, const char *url2)
428 {
429 // XXX: Want urlHostname() here, but it uses static storage and copying
430 const char *host1 = strchr(url1, ':');
431 const char *host2 = strchr(url2, ':');
432
433 if (host1 && host2) {
434 // skip scheme slashes
435 do {
436 ++host1;
437 ++host2;
438 } while (*host1 == '/' && *host2 == '/');
439
440 if (!*host1)
441 return false; // no host
442
443 // increment while the same until we reach the end of the URL/host
444 while (*host1 && *host1 != '/' && *host1 == *host2) {
445 ++host1;
446 ++host2;
447 }
448 return *host1 == *host2;
449 }
450
451 return false; // no URL scheme
452 }
453
454 // purges entries that match the value of a given HTTP [response] header
455 static void
456 purgeEntriesByHeader(HttpRequest *req, const char *reqUrl, Http::Message *rep, Http::HdrType hdr)
457 {
458 const char *hdrUrl, *absUrl;
459
460 absUrl = NULL;
461 hdrUrl = rep->header.getStr(hdr);
462 if (hdrUrl == NULL) {
463 return;
464 }
465
466 /*
467 * If the URL is relative, make it absolute so we can find it.
468 * If it's absolute, make sure the host parts match to avoid DOS attacks
469 * as per RFC 2616 13.10.
470 */
471 if (urlIsRelative(hdrUrl)) {
472 absUrl = urlMakeAbsolute(req, hdrUrl);
473 if (absUrl != NULL) {
474 hdrUrl = absUrl;
475 }
476 } else if (!sameUrlHosts(reqUrl, hdrUrl)) {
477 return;
478 }
479
480 purgeEntriesByUrl(req, hdrUrl);
481
482 if (absUrl != NULL) {
483 safe_free(absUrl);
484 }
485 }
486
487 // some HTTP methods should purge matching cache entries
488 void
489 Client::maybePurgeOthers()
490 {
491 // only some HTTP methods should purge matching cache entries
492 if (!request->method.purgesOthers())
493 return;
494
495 // and probably only if the response was successful
496 if (theFinalReply->sline.status() >= 400)
497 return;
498
499 // XXX: should we use originalRequest() here?
500 SBuf tmp(request->effectiveRequestUri());
501 const char *reqUrl = tmp.c_str();
502 debugs(88, 5, "maybe purging due to " << request->method << ' ' << tmp);
503 purgeEntriesByUrl(request.getRaw(), reqUrl);
504 purgeEntriesByHeader(request.getRaw(), reqUrl, theFinalReply, Http::HdrType::LOCATION);
505 purgeEntriesByHeader(request.getRaw(), reqUrl, theFinalReply, Http::HdrType::CONTENT_LOCATION);
506 }
507
508 /// called when we have final (possibly adapted) reply headers; kids extend
509 void
510 Client::haveParsedReplyHeaders()
511 {
512 Must(theFinalReply);
513 maybePurgeOthers();
514
515 // adaptation may overwrite old offset computed using the virgin response
516 const bool partial = theFinalReply->contentRange();
517 currentOffset = partial ? theFinalReply->contentRange()->spec.offset : 0;
518 }
519
520 /// whether to prevent caching of an otherwise cachable response
521 bool
522 Client::blockCaching()
523 {
524 if (const Acl::Tree *acl = Config.accessList.storeMiss) {
525 // This relatively expensive check is not in StoreEntry::checkCachable:
526 // That method lacks HttpRequest and may be called too many times.
527 ACLFilledChecklist ch(acl, originalRequest().getRaw());
528 ch.reply = const_cast<HttpReply*>(&entry->mem().freshestReply()); // ACLFilledChecklist API bug
529 HTTPMSGLOCK(ch.reply);
530 ch.al = fwd->al;
531 if (!ch.fastCheck().allowed()) { // when in doubt, block
532 debugs(20, 3, "store_miss prohibits caching");
533 return true;
534 }
535 }
536 return false;
537 }
538
539 HttpRequestPointer
540 Client::originalRequest()
541 {
542 return request;
543 }
544
545 #if USE_ADAPTATION
546 /// Initiate an asynchronous adaptation transaction which will call us back.
547 void
548 Client::startAdaptation(const Adaptation::ServiceGroupPointer &group, HttpRequest *cause)
549 {
550 debugs(11, 5, "Client::startAdaptation() called");
551 // check whether we should be sending a body as well
552 // start body pipe to feed ICAP transaction if needed
553 assert(!virginBodyDestination);
554 HttpReply *vrep = virginReply();
555 assert(!vrep->body_pipe);
556 int64_t size = 0;
557 if (vrep->expectingBody(cause->method, size) && size) {
558 virginBodyDestination = new BodyPipe(this);
559 vrep->body_pipe = virginBodyDestination;
560 debugs(93, 6, HERE << "will send virgin reply body to " <<
561 virginBodyDestination << "; size: " << size);
562 if (size > 0)
563 virginBodyDestination->setBodySize(size);
564 }
565
566 adaptedHeadSource = initiateAdaptation(
567 new Adaptation::Iterator(vrep, cause, fwd->al, group));
568 startedAdaptation = initiated(adaptedHeadSource);
569 Must(startedAdaptation);
570 }
571
572 // properly cleans up ICAP-related state
573 // may be called multiple times
574 void Client::cleanAdaptation()
575 {
576 debugs(11,5, HERE << "cleaning ICAP; ACL: " << adaptationAccessCheckPending);
577
578 if (virginBodyDestination != NULL)
579 stopProducingFor(virginBodyDestination, false);
580
581 announceInitiatorAbort(adaptedHeadSource);
582
583 if (adaptedBodySource != NULL)
584 stopConsumingFrom(adaptedBodySource);
585
586 if (!adaptationAccessCheckPending) // we cannot cancel a pending callback
587 assert(doneWithAdaptation()); // make sure the two methods are in sync
588 }
589
590 bool
591 Client::doneWithAdaptation() const
592 {
593 return !adaptationAccessCheckPending &&
594 !virginBodyDestination && !adaptedHeadSource && !adaptedBodySource;
595 }
596
597 // sends virgin reply body to ICAP, buffering excesses if needed
598 void
599 Client::adaptVirginReplyBody(const char *data, ssize_t len)
600 {
601 assert(startedAdaptation);
602
603 if (!virginBodyDestination) {
604 debugs(11,3, HERE << "ICAP does not want more virgin body");
605 return;
606 }
607
608 // grow overflow area if already overflowed
609 if (responseBodyBuffer) {
610 responseBodyBuffer->append(data, len);
611 data = responseBodyBuffer->content();
612 len = responseBodyBuffer->contentSize();
613 }
614
615 const ssize_t putSize = virginBodyDestination->putMoreData(data, len);
616 data += putSize;
617 len -= putSize;
618
619 // if we had overflow area, shrink it as necessary
620 if (responseBodyBuffer) {
621 if (putSize == responseBodyBuffer->contentSize()) {
622 delete responseBodyBuffer;
623 responseBodyBuffer = NULL;
624 } else {
625 responseBodyBuffer->consume(putSize);
626 }
627 return;
628 }
629
630 // if we did not have an overflow area, create it as needed
631 if (len > 0) {
632 assert(!responseBodyBuffer);
633 responseBodyBuffer = new MemBuf;
634 responseBodyBuffer->init(4096, SQUID_TCP_SO_RCVBUF * 10);
635 responseBodyBuffer->append(data, len);
636 }
637 }
638
639 // can supply more virgin response body data
640 void
641 Client::noteMoreBodySpaceAvailable(BodyPipe::Pointer)
642 {
643 if (responseBodyBuffer) {
644 addVirginReplyBody(NULL, 0); // kick the buffered fragment alive again
645 if (completed && !responseBodyBuffer) {
646 serverComplete2();
647 return;
648 }
649 }
650 maybeReadVirginBody();
651 }
652
653 // the consumer of our virgin response body aborted
654 void
655 Client::noteBodyConsumerAborted(BodyPipe::Pointer)
656 {
657 stopProducingFor(virginBodyDestination, false);
658
659 // do not force closeServer here in case we need to bypass AdaptationQueryAbort
660
661 if (doneWithAdaptation()) // we may still be receiving adapted response
662 handleAdaptationCompleted();
663 }
664
665 // received adapted response headers (body may follow)
666 void
667 Client::noteAdaptationAnswer(const Adaptation::Answer &answer)
668 {
669 clearAdaptation(adaptedHeadSource); // we do not expect more messages
670
671 switch (answer.kind) {
672 case Adaptation::Answer::akForward:
673 handleAdaptedHeader(const_cast<Http::Message*>(answer.message.getRaw()));
674 break;
675
676 case Adaptation::Answer::akBlock:
677 handleAdaptationBlocked(answer);
678 break;
679
680 case Adaptation::Answer::akError:
681 handleAdaptationAborted(!answer.final);
682 break;
683 }
684 }
685
686 void
687 Client::handleAdaptedHeader(Http::Message *msg)
688 {
689 if (abortOnBadEntry("entry went bad while waiting for adapted headers")) {
690 // If the adapted response has a body, the ICAP side needs to know
691 // that nobody will consume that body. We will be destroyed upon
692 // return. Tell the ICAP side that it is on its own.
693 HttpReply *rep = dynamic_cast<HttpReply*>(msg);
694 assert(rep);
695 if (rep->body_pipe != NULL)
696 rep->body_pipe->expectNoConsumption();
697
698 return;
699 }
700
701 HttpReply *rep = dynamic_cast<HttpReply*>(msg);
702 assert(rep);
703 debugs(11,5, HERE << this << " setting adapted reply to " << rep);
704 setFinalReply(rep);
705
706 assert(!adaptedBodySource);
707 if (rep->body_pipe != NULL) {
708 // subscribe to receive adapted body
709 adaptedBodySource = rep->body_pipe;
710 // assume that ICAP does not auto-consume on failures
711 const bool result = adaptedBodySource->setConsumerIfNotLate(this);
712 assert(result);
713 } else {
714 // no body
715 if (doneWithAdaptation()) // we may still be sending virgin response
716 handleAdaptationCompleted();
717 }
718 }
719
720 void
721 Client::resumeBodyStorage()
722 {
723 if (abortOnBadEntry("store entry aborted while kick producer callback"))
724 return;
725
726 if (!adaptedBodySource)
727 return;
728
729 handleMoreAdaptedBodyAvailable();
730
731 if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
732 endAdaptedBodyConsumption();
733 }
734
735 // more adapted response body is available
736 void
737 Client::handleMoreAdaptedBodyAvailable()
738 {
739 if (abortOnBadEntry("entry refuses adapted body"))
740 return;
741
742 assert(entry);
743
744 size_t contentSize = adaptedBodySource->buf().contentSize();
745
746 if (!contentSize)
747 return; // XXX: bytesWanted asserts on zero-size ranges
748
749 const size_t spaceAvailable = entry->bytesWanted(Range<size_t>(0, contentSize), true);
750
751 if (spaceAvailable < contentSize ) {
752 // No or partial body data consuming
753 typedef NullaryMemFunT<Client> Dialer;
754 AsyncCall::Pointer call = asyncCall(93, 5, "Client::resumeBodyStorage",
755 Dialer(this, &Client::resumeBodyStorage));
756 entry->deferProducer(call);
757 }
758
759 if (!spaceAvailable) {
760 debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " <<
761 "response body at offset " << adaptedBodySource->consumedSize());
762 return;
763 }
764
765 if (spaceAvailable < contentSize ) {
766 debugs(11, 5, HERE << "postponing storage of " <<
767 (contentSize - spaceAvailable) << " body bytes");
768 contentSize = spaceAvailable;
769 }
770
771 debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " <<
772 "response body at offset " << adaptedBodySource->consumedSize());
773
774 BodyPipeCheckout bpc(*adaptedBodySource);
775 const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize);
776 currentOffset += ioBuf.length;
777 entry->write(ioBuf);
778 bpc.buf.consume(contentSize);
779 bpc.checkIn();
780 }
781
782 // the entire adapted response body was produced, successfully
783 void
784 Client::handleAdaptedBodyProductionEnded()
785 {
786 if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
787 return;
788
789 // end consumption if we consumed everything
790 if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
791 endAdaptedBodyConsumption();
792 // else resumeBodyStorage() will eventually consume the rest
793 }
794
795 void
796 Client::endAdaptedBodyConsumption()
797 {
798 stopConsumingFrom(adaptedBodySource);
799 handleAdaptationCompleted();
800 }
801
802 // premature end of the adapted response body
803 void Client::handleAdaptedBodyProducerAborted()
804 {
805 if (abortOnBadEntry("entry went bad while waiting for the now-aborted adapted body"))
806 return;
807
808 Must(adaptedBodySource != nullptr);
809 if (!adaptedBodySource->exhausted()) {
810 debugs(11,5, "waiting to consume the remainder of the aborted adapted body");
811 return; // resumeBodyStorage() should eventually consume the rest
812 }
813
814 stopConsumingFrom(adaptedBodySource);
815
816 if (handledEarlyAdaptationAbort())
817 return;
818
819 entry->lengthWentBad("body adaptation aborted");
820 handleAdaptationCompleted(); // the user should get a truncated response
821 }
822
823 // common part of noteAdaptationAnswer and handleAdaptedBodyProductionEnded
824 void
825 Client::handleAdaptationCompleted()
826 {
827 debugs(11,5, HERE << "handleAdaptationCompleted");
828 cleanAdaptation();
829
830 // We stop reading origin response because we have no place to put it(*) and
831 // cannot use it. If some origin servers do not like that or if we want to
832 // reuse more pconns, we can add code to discard unneeded origin responses.
833 // (*) TODO: Is it possible that the adaptation xaction is still running?
834 if (mayReadVirginReplyBody()) {
835 debugs(11,3, HERE << "closing origin conn due to ICAP completion");
836 closeServer();
837 }
838
839 completeForwarding();
840 }
841
842 // common part of noteAdaptation*Aborted and noteBodyConsumerAborted methods
843 void
844 Client::handleAdaptationAborted(bool bypassable)
845 {
846 debugs(11,5, HERE << "handleAdaptationAborted; bypassable: " << bypassable <<
847 ", entry empty: " << entry->isEmpty());
848
849 if (abortOnBadEntry("entry went bad while ICAP aborted"))
850 return;
851
852 // TODO: bypass if possible
853 if (!handledEarlyAdaptationAbort())
854 abortAll("adaptation failure with a filled entry");
855 }
856
857 /// If the store entry is still empty, fully handles adaptation abort, returning
858 /// true. Otherwise just updates the request error detail and returns false.
859 bool
860 Client::handledEarlyAdaptationAbort()
861 {
862 if (entry->isEmpty()) {
863 debugs(11,8, "adaptation failure with an empty entry: " << *entry);
864 const auto err = new ErrorState(ERR_ICAP_FAILURE, Http::scInternalServerError, request.getRaw(), fwd->al);
865 err->detailError(ERR_DETAIL_ICAP_RESPMOD_EARLY);
866 fwd->fail(err);
867 fwd->dontRetry(true);
868 abortAll("adaptation failure with an empty entry");
869 return true; // handled
870 }
871
872 if (request) // update logged info directly
873 request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_ICAP_RESPMOD_LATE);
874
875 return false; // the caller must handle
876 }
877
878 // adaptation service wants us to deny HTTP client access to this response
879 void
880 Client::handleAdaptationBlocked(const Adaptation::Answer &answer)
881 {
882 debugs(11,5, HERE << answer.ruleId);
883
884 if (abortOnBadEntry("entry went bad while ICAP aborted"))
885 return;
886
887 if (!entry->isEmpty()) { // too late to block (should not really happen)
888 if (request)
889 request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_RESPMOD_BLOCK_LATE);
890 abortAll("late adaptation block");
891 return;
892 }
893
894 debugs(11,7, HERE << "creating adaptation block response");
895
896 err_type page_id =
897 aclGetDenyInfoPage(&Config.denyInfoList, answer.ruleId.termedBuf(), 1);
898 if (page_id == ERR_NONE)
899 page_id = ERR_ACCESS_DENIED;
900
901 const auto err = new ErrorState(page_id, Http::scForbidden, request.getRaw(), fwd->al);
902 err->detailError(ERR_DETAIL_RESPMOD_BLOCK_EARLY);
903 fwd->fail(err);
904 fwd->dontRetry(true);
905
906 abortOnData("timely adaptation block");
907 }
908
909 void
910 Client::noteAdaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
911 {
912 adaptationAccessCheckPending = false;
913
914 if (abortOnBadEntry("entry went bad while waiting for ICAP ACL check"))
915 return;
916
917 // TODO: Should non-ICAP and ICAP REPMOD pre-cache paths check this?
918 // That check now only happens on REQMOD pre-cache and REPMOD post-cache, in processReplyAccess().
919 if (virginReply()->expectedBodyTooLarge(*request)) {
920 sendBodyIsTooLargeError();
921 return;
922 }
923 // TODO: Should we check receivedBodyTooLarge as well?
924
925 if (!group) {
926 debugs(11,3, HERE << "no adapation needed");
927 setFinalReply(virginReply());
928 processReplyBody();
929 return;
930 }
931
932 startAdaptation(group, originalRequest().getRaw());
933 processReplyBody();
934 }
935 #endif
936
937 void
938 Client::sendBodyIsTooLargeError()
939 {
940 const auto err = new ErrorState(ERR_TOO_BIG, Http::scForbidden, request.getRaw(), fwd->al);
941 fwd->fail(err);
942 fwd->dontRetry(true);
943 abortOnData("Virgin body too large.");
944 }
945
946 // TODO: when HttpStateData sends all errors to ICAP,
947 // we should be able to move this at the end of setVirginReply().
948 void
949 Client::adaptOrFinalizeReply()
950 {
951 #if USE_ADAPTATION
952 // TODO: merge with client side and return void to hide the on/off logic?
953 // The callback can be called with a NULL service if adaptation is off.
954 adaptationAccessCheckPending = Adaptation::AccessCheck::Start(
955 Adaptation::methodRespmod, Adaptation::pointPreCache,
956 originalRequest().getRaw(), virginReply(), fwd->al, this);
957 debugs(11,5, HERE << "adaptationAccessCheckPending=" << adaptationAccessCheckPending);
958 if (adaptationAccessCheckPending)
959 return;
960 #endif
961
962 setFinalReply(virginReply());
963 }
964
965 /// initializes bodyBytesRead stats if needed and applies delta
966 void
967 Client::adjustBodyBytesRead(const int64_t delta)
968 {
969 int64_t &bodyBytesRead = originalRequest()->hier.bodyBytesRead;
970
971 // if we got here, do not log a dash even if we got nothing from the server
972 if (bodyBytesRead < 0)
973 bodyBytesRead = 0;
974
975 bodyBytesRead += delta; // supports negative and zero deltas
976
977 // check for overflows ("infinite" response?) and undeflows (a bug)
978 Must(bodyBytesRead >= 0);
979 }
980
981 void
982 Client::addVirginReplyBody(const char *data, ssize_t len)
983 {
984 adjustBodyBytesRead(len);
985
986 #if USE_ADAPTATION
987 assert(!adaptationAccessCheckPending); // or would need to buffer while waiting
988 if (startedAdaptation) {
989 adaptVirginReplyBody(data, len);
990 return;
991 }
992 #endif
993 storeReplyBody(data, len);
994 }
995
996 // writes virgin or adapted reply body to store
997 void
998 Client::storeReplyBody(const char *data, ssize_t len)
999 {
1000 // write even if len is zero to push headers towards the client side
1001 entry->write (StoreIOBuffer(len, currentOffset, (char*)data));
1002
1003 currentOffset += len;
1004 }
1005
1006 size_t
1007 Client::calcBufferSpaceToReserve(size_t space, const size_t wantSpace) const
1008 {
1009 if (space < wantSpace) {
1010 const size_t maxSpace = SBuf::maxSize; // absolute best
1011 space = min(wantSpace, maxSpace); // do not promise more than asked
1012 }
1013
1014 #if USE_ADAPTATION
1015 if (responseBodyBuffer) {
1016 return 0; // Stop reading if already overflowed waiting for ICAP to catch up
1017 }
1018
1019 if (virginBodyDestination != NULL) {
1020 /*
1021 * BodyPipe buffer has a finite size limit. We
1022 * should not read more data from the network than will fit
1023 * into the pipe buffer or we _lose_ what did not fit if
1024 * the response ends sooner that BodyPipe frees up space:
1025 * There is no code to keep pumping data into the pipe once
1026 * response ends and serverComplete() is called.
1027 */
1028 const size_t adaptor_space = virginBodyDestination->buf().potentialSpaceSize();
1029
1030 debugs(11,9, "Client may read up to min(" <<
1031 adaptor_space << ", " << space << ") bytes");
1032
1033 if (adaptor_space < space)
1034 space = adaptor_space;
1035 }
1036 #endif
1037
1038 return space;
1039 }
1040
1041 size_t
1042 Client::replyBodySpace(const MemBuf &readBuf, const size_t minSpace) const
1043 {
1044 size_t space = readBuf.spaceSize(); // available space w/o heroic measures
1045 if (space < minSpace) {
1046 const size_t maxSpace = readBuf.potentialSpaceSize(); // absolute best
1047 space = min(minSpace, maxSpace); // do not promise more than asked
1048 }
1049
1050 #if USE_ADAPTATION
1051 if (responseBodyBuffer) {
1052 return 0; // Stop reading if already overflowed waiting for ICAP to catch up
1053 }
1054
1055 if (virginBodyDestination != NULL) {
1056 /*
1057 * BodyPipe buffer has a finite size limit. We
1058 * should not read more data from the network than will fit
1059 * into the pipe buffer or we _lose_ what did not fit if
1060 * the response ends sooner that BodyPipe frees up space:
1061 * There is no code to keep pumping data into the pipe once
1062 * response ends and serverComplete() is called.
1063 *
1064 * If the pipe is totally full, don't register the read handler.
1065 * The BodyPipe will call our noteMoreBodySpaceAvailable() method
1066 * when it has free space again.
1067 */
1068 size_t adaptation_space =
1069 virginBodyDestination->buf().potentialSpaceSize();
1070
1071 debugs(11,9, "Client may read up to min(" <<
1072 adaptation_space << ", " << space << ") bytes");
1073
1074 if (adaptation_space < space)
1075 space = adaptation_space;
1076 }
1077 #endif
1078
1079 return space;
1080 }
1081