]> git.ipfire.org Git - thirdparty/squid.git/blob - src/clients/Client.cc
Maintenance: Removed most NULLs using modernize-use-nullptr (#1075)
[thirdparty/squid.git] / src / clients / Client.cc
1 /*
2 * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 #include "squid.h"
10 #include "acl/FilledChecklist.h"
11 #include "acl/Gadgets.h"
12 #include "base/TextException.h"
13 #include "clients/Client.h"
14 #include "comm/Connection.h"
15 #include "comm/forward.h"
16 #include "comm/Write.h"
17 #include "error/Detail.h"
18 #include "errorpage.h"
19 #include "fd.h"
20 #include "HttpHdrContRange.h"
21 #include "HttpReply.h"
22 #include "HttpRequest.h"
23 #include "SquidConfig.h"
24 #include "StatCounters.h"
25 #include "Store.h"
26 #include "tools.h"
27
28 #if USE_ADAPTATION
29 #include "adaptation/AccessCheck.h"
30 #include "adaptation/Answer.h"
31 #include "adaptation/Iterator.h"
32 #include "base/AsyncCall.h"
33 #endif
34
35 // implemented in client_side_reply.cc until sides have a common parent
36 void purgeEntriesByUrl(HttpRequest * req, const char *url);
37
38 Client::Client(FwdState *theFwdState) :
39 AsyncJob("Client"),
40 fwd(theFwdState),
41 request(fwd->request)
42 {
43 entry = fwd->entry;
44 entry->lock("Client");
45 }
46
47 Client::~Client()
48 {
49 // paranoid: check that swanSong has been called
50 assert(!requestBodySource);
51 #if USE_ADAPTATION
52 assert(!virginBodyDestination);
53 assert(!adaptedBodySource);
54 #endif
55
56 entry->unlock("Client");
57
58 HTTPMSGUNLOCK(theVirginReply);
59 HTTPMSGUNLOCK(theFinalReply);
60
61 if (responseBodyBuffer != nullptr) {
62 delete responseBodyBuffer;
63 responseBodyBuffer = nullptr;
64 }
65 }
66
67 void
68 Client::swanSong()
69 {
70 // get rid of our piping obligations
71 if (requestBodySource != nullptr)
72 stopConsumingFrom(requestBodySource);
73
74 #if USE_ADAPTATION
75 cleanAdaptation();
76 #endif
77
78 if (!doneWithServer())
79 closeServer();
80
81 if (!doneWithFwd) {
82 doneWithFwd = "swanSong()";
83 fwd->handleUnregisteredServerEnd();
84 }
85
86 BodyConsumer::swanSong();
87 #if USE_ADAPTATION
88 Initiator::swanSong();
89 BodyProducer::swanSong();
90 #endif
91
92 // paranoid: check that swanSong has been called
93 // extra paranoid: yeah, I really mean it. they MUST pass here.
94 assert(!requestBodySource);
95 #if USE_ADAPTATION
96 assert(!virginBodyDestination);
97 assert(!adaptedBodySource);
98 #endif
99 }
100
101 HttpReply *
102 Client::virginReply()
103 {
104 assert(theVirginReply);
105 return theVirginReply;
106 }
107
108 const HttpReply *
109 Client::virginReply() const
110 {
111 assert(theVirginReply);
112 return theVirginReply;
113 }
114
115 HttpReply *
116 Client::setVirginReply(HttpReply *rep)
117 {
118 debugs(11,5, this << " setting virgin reply to " << rep);
119 assert(!theVirginReply);
120 assert(rep);
121 theVirginReply = rep;
122 HTTPMSGLOCK(theVirginReply);
123 if (fwd->al)
124 fwd->al->reply = theVirginReply;
125 return theVirginReply;
126 }
127
128 HttpReply *
129 Client::finalReply()
130 {
131 assert(theFinalReply);
132 return theFinalReply;
133 }
134
135 HttpReply *
136 Client::setFinalReply(HttpReply *rep)
137 {
138 debugs(11,5, this << " setting final reply to " << rep);
139
140 assert(!theFinalReply);
141 assert(rep);
142 theFinalReply = rep;
143 HTTPMSGLOCK(theFinalReply);
144 if (fwd->al)
145 fwd->al->reply = theFinalReply;
146
147 // give entry the reply because haveParsedReplyHeaders() expects it there
148 entry->replaceHttpReply(theFinalReply, false); // but do not write yet
149 haveParsedReplyHeaders(); // update the entry/reply (e.g., set timestamps)
150 if (!EBIT_TEST(entry->flags, RELEASE_REQUEST) && blockCaching())
151 entry->release();
152 entry->startWriting(); // write the updated entry to store
153
154 return theFinalReply;
155 }
156
157 void
158 Client::markParsedVirginReplyAsWhole(const char *reasonWeAreSure)
159 {
160 assert(reasonWeAreSure);
161 debugs(11, 3, reasonWeAreSure);
162
163 // The code storing adapted reply takes care of markStoredReplyAsWhole().
164 // We need to take care of the remaining regular network-to-store case.
165 #if USE_ADAPTATION
166 if (startedAdaptation) {
167 debugs(11, 5, "adaptation handles markStoredReplyAsWhole()");
168 return;
169 }
170 #endif
171
172 // Convert the "parsed whole virgin reply" event into the "stored..." event
173 // because, without adaptation, we store everything we parse: There is no
174 // buffer for parsed content; addVirginReplyBody() stores every parsed byte.
175 fwd->markStoredReplyAsWhole(reasonWeAreSure);
176 }
177
178 // called when no more server communication is expected; may quit
179 void
180 Client::serverComplete()
181 {
182 debugs(11,5, "serverComplete " << this);
183
184 if (!doneWithServer()) {
185 closeServer();
186 assert(doneWithServer());
187 }
188
189 completed = true;
190 originalRequest()->hier.stopPeerClock(true);
191
192 if (requestBodySource != nullptr)
193 stopConsumingFrom(requestBodySource);
194
195 if (responseBodyBuffer != nullptr)
196 return;
197
198 serverComplete2();
199 }
200
201 void
202 Client::serverComplete2()
203 {
204 debugs(11,5, "serverComplete2 " << this);
205
206 #if USE_ADAPTATION
207 if (virginBodyDestination != nullptr)
208 stopProducingFor(virginBodyDestination, true);
209
210 if (!doneWithAdaptation())
211 return;
212 #endif
213
214 completeForwarding();
215 }
216
217 bool Client::doneAll() const
218 {
219 return doneWithServer() &&
220 #if USE_ADAPTATION
221 doneWithAdaptation() &&
222 Adaptation::Initiator::doneAll() &&
223 BodyProducer::doneAll() &&
224 #endif
225 BodyConsumer::doneAll();
226 }
227
228 // FTP side overloads this to work around multiple calls to fwd->complete
229 void
230 Client::completeForwarding()
231 {
232 debugs(11,5, "completing forwarding for " << fwd);
233 assert(fwd != nullptr);
234 doneWithFwd = "completeForwarding()";
235 fwd->complete();
236 }
237
238 // Register to receive request body
239 bool Client::startRequestBodyFlow()
240 {
241 HttpRequestPointer r(originalRequest());
242 assert(r->body_pipe != nullptr);
243 requestBodySource = r->body_pipe;
244 if (requestBodySource->setConsumerIfNotLate(this)) {
245 debugs(11,3, "expecting request body from " <<
246 requestBodySource->status());
247 return true;
248 }
249
250 debugs(11,3, "aborting on partially consumed request body: " <<
251 requestBodySource->status());
252 requestBodySource = nullptr;
253 return false;
254 }
255
256 // Entry-dependent callbacks use this check to quit if the entry went bad
257 bool
258 Client::abortOnBadEntry(const char *abortReason)
259 {
260 if (entry->isAccepting())
261 return false;
262
263 debugs(11,5, "entry is not Accepting!");
264 abortOnData(abortReason);
265 return true;
266 }
267
268 // more request or adapted response body is available
269 void
270 Client::noteMoreBodyDataAvailable(BodyPipe::Pointer bp)
271 {
272 #if USE_ADAPTATION
273 if (adaptedBodySource == bp) {
274 handleMoreAdaptedBodyAvailable();
275 return;
276 }
277 #endif
278 if (requestBodySource == bp)
279 handleMoreRequestBodyAvailable();
280 }
281
282 // the entire request or adapted response body was provided, successfully
283 void
284 Client::noteBodyProductionEnded(BodyPipe::Pointer bp)
285 {
286 #if USE_ADAPTATION
287 if (adaptedBodySource == bp) {
288 handleAdaptedBodyProductionEnded();
289 return;
290 }
291 #endif
292 if (requestBodySource == bp)
293 handleRequestBodyProductionEnded();
294 }
295
296 // premature end of the request or adapted response body production
297 void
298 Client::noteBodyProducerAborted(BodyPipe::Pointer bp)
299 {
300 #if USE_ADAPTATION
301 if (adaptedBodySource == bp) {
302 handleAdaptedBodyProducerAborted();
303 return;
304 }
305 #endif
306 if (requestBodySource == bp)
307 handleRequestBodyProducerAborted();
308 }
309
310 bool
311 Client::abortOnData(const char *reason)
312 {
313 abortAll(reason);
314 return true;
315 }
316
317 // more origin request body data is available
318 void
319 Client::handleMoreRequestBodyAvailable()
320 {
321 if (!requestSender)
322 sendMoreRequestBody();
323 else
324 debugs(9,3, "waiting for request body write to complete");
325 }
326
327 // there will be no more handleMoreRequestBodyAvailable calls
328 void
329 Client::handleRequestBodyProductionEnded()
330 {
331 receivedWholeRequestBody = true;
332 if (!requestSender)
333 doneSendingRequestBody();
334 else
335 debugs(9,3, "waiting for request body write to complete");
336 }
337
338 // called when we are done sending request body; kids extend this
339 void
340 Client::doneSendingRequestBody()
341 {
342 debugs(9,3, "done sending request body");
343 assert(requestBodySource != nullptr);
344 stopConsumingFrom(requestBodySource);
345
346 // kids extend this
347 }
348
349 // called when body producers aborts; kids extend this
350 void
351 Client::handleRequestBodyProducerAborted()
352 {
353 if (requestSender != nullptr)
354 debugs(9,3, "fyi: request body aborted while we were sending");
355
356 fwd->dontRetry(true); // the problem is not with the server
357 stopConsumingFrom(requestBodySource); // requestSender, if any, will notice
358
359 // kids extend this
360 }
361
362 // called when we wrote request headers(!) or a part of the body
363 void
364 Client::sentRequestBody(const CommIoCbParams &io)
365 {
366 debugs(11, 5, "sentRequestBody: FD " << io.fd << ": size " << io.size << ": errflag " << io.flag << ".");
367 debugs(32,3, "sentRequestBody called");
368
369 requestSender = nullptr;
370
371 if (io.size > 0) {
372 fd_bytes(io.fd, io.size, FD_WRITE);
373 statCounter.server.all.kbytes_out += io.size;
374 // kids should increment their counters
375 }
376
377 if (io.flag == Comm::ERR_CLOSING)
378 return;
379
380 if (!requestBodySource) {
381 debugs(9,3, "detected while-we-were-sending abort");
382 return; // do nothing;
383 }
384
385 // both successful and failed writes affect response times
386 request->hier.notePeerWrite();
387
388 if (io.flag) {
389 debugs(11, DBG_IMPORTANT, "ERROR: sentRequestBody failure: FD " << io.fd << ": " << xstrerr(io.xerrno));
390 ErrorState *err;
391 err = new ErrorState(ERR_WRITE_ERROR, Http::scBadGateway, fwd->request, fwd->al);
392 err->xerrno = io.xerrno;
393 fwd->fail(err);
394 abortOnData("I/O error while sending request body");
395 return;
396 }
397
398 if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
399 abortOnData("store entry aborted while sending request body");
400 return;
401 }
402
403 if (!requestBodySource->exhausted())
404 sendMoreRequestBody();
405 else if (receivedWholeRequestBody)
406 doneSendingRequestBody();
407 else
408 debugs(9,3, "waiting for body production end or abort");
409 }
410
411 void
412 Client::sendMoreRequestBody()
413 {
414 assert(requestBodySource != nullptr);
415 assert(!requestSender);
416
417 const Comm::ConnectionPointer conn = dataConnection();
418
419 if (!Comm::IsConnOpen(conn)) {
420 debugs(9,3, "cannot send request body to closing " << conn);
421 return; // wait for the kid's close handler; TODO: assert(closer);
422 }
423
424 MemBuf buf;
425 if (getMoreRequestBody(buf) && buf.contentSize() > 0) {
426 debugs(9,3, "will write " << buf.contentSize() << " request body bytes");
427 typedef CommCbMemFunT<Client, CommIoCbParams> Dialer;
428 requestSender = JobCallback(93,3, Dialer, this, Client::sentRequestBody);
429 Comm::Write(conn, &buf, requestSender);
430 } else {
431 debugs(9,3, "will wait for more request body bytes or eof");
432 requestSender = nullptr;
433 }
434 }
435
436 /// either fill buf with available [encoded] request body bytes or return false
437 bool
438 Client::getMoreRequestBody(MemBuf &buf)
439 {
440 // default implementation does not encode request body content
441 Must(requestBodySource != nullptr);
442 return requestBodySource->getMoreData(buf);
443 }
444
445 // Compares hosts in urls, returns false if different, no sheme, or no host.
446 static bool
447 sameUrlHosts(const char *url1, const char *url2)
448 {
449 // XXX: Want AnyP::Uri::parse() here, but it uses static storage and copying
450 const char *host1 = strchr(url1, ':');
451 const char *host2 = strchr(url2, ':');
452
453 if (host1 && host2) {
454 // skip scheme slashes
455 do {
456 ++host1;
457 ++host2;
458 } while (*host1 == '/' && *host2 == '/');
459
460 if (!*host1)
461 return false; // no host
462
463 // increment while the same until we reach the end of the URL/host
464 while (*host1 && *host1 != '/' && *host1 == *host2) {
465 ++host1;
466 ++host2;
467 }
468 return *host1 == *host2;
469 }
470
471 return false; // no URL scheme
472 }
473
474 // purges entries that match the value of a given HTTP [response] header
475 static void
476 purgeEntriesByHeader(HttpRequest *req, const char *reqUrl, Http::Message *rep, Http::HdrType hdr)
477 {
478 const auto hdrUrl = rep->header.getStr(hdr);
479 if (!hdrUrl)
480 return;
481
482 /*
483 * If the URL is relative, make it absolute so we can find it.
484 * If it's absolute, make sure the host parts match to avoid DOS attacks
485 * as per RFC 2616 13.10.
486 */
487 SBuf absUrlMaker;
488 const char *absUrl = nullptr;
489 if (urlIsRelative(hdrUrl)) {
490 if (req->method.id() == Http::METHOD_CONNECT)
491 absUrl = hdrUrl; // TODO: merge authority-uri and hdrUrl
492 else if (req->url.getScheme() == AnyP::PROTO_URN)
493 absUrl = req->url.absolute().c_str();
494 else {
495 AnyP::Uri tmpUrl = req->url;
496 if (*hdrUrl == '/') {
497 // RFC 3986 section 4.2: absolute-path reference
498 // for this logic replace the entire request-target URI path
499 tmpUrl.path(hdrUrl);
500 } else {
501 tmpUrl.addRelativePath(reqUrl);
502 }
503 absUrlMaker = tmpUrl.absolute();
504 absUrl = absUrlMaker.c_str();
505 }
506 } else if (!sameUrlHosts(reqUrl, hdrUrl)) {
507 return;
508 } else
509 absUrl = hdrUrl;
510
511 purgeEntriesByUrl(req, absUrl);
512 }
513
514 // some HTTP methods should purge matching cache entries
515 void
516 Client::maybePurgeOthers()
517 {
518 // only some HTTP methods should purge matching cache entries
519 if (!request->method.purgesOthers())
520 return;
521
522 // and probably only if the response was successful
523 if (theFinalReply->sline.status() >= 400)
524 return;
525
526 // XXX: should we use originalRequest() here?
527 SBuf tmp(request->effectiveRequestUri());
528 const char *reqUrl = tmp.c_str();
529 debugs(88, 5, "maybe purging due to " << request->method << ' ' << tmp);
530 purgeEntriesByUrl(request.getRaw(), reqUrl);
531 purgeEntriesByHeader(request.getRaw(), reqUrl, theFinalReply, Http::HdrType::LOCATION);
532 purgeEntriesByHeader(request.getRaw(), reqUrl, theFinalReply, Http::HdrType::CONTENT_LOCATION);
533 }
534
535 /// called when we have final (possibly adapted) reply headers; kids extend
536 void
537 Client::haveParsedReplyHeaders()
538 {
539 Must(theFinalReply);
540 maybePurgeOthers();
541
542 // adaptation may overwrite old offset computed using the virgin response
543 currentOffset = 0;
544 if (const auto cr = theFinalReply->contentRange()) {
545 if (cr->spec.offset != HttpHdrRangeSpec::UnknownPosition)
546 currentOffset = cr->spec.offset;
547 }
548 }
549
550 /// whether to prevent caching of an otherwise cachable response
551 bool
552 Client::blockCaching()
553 {
554 if (const Acl::Tree *acl = Config.accessList.storeMiss) {
555 // This relatively expensive check is not in StoreEntry::checkCachable:
556 // That method lacks HttpRequest and may be called too many times.
557 ACLFilledChecklist ch(acl, originalRequest().getRaw());
558 ch.reply = const_cast<HttpReply*>(&entry->mem().freshestReply()); // ACLFilledChecklist API bug
559 HTTPMSGLOCK(ch.reply);
560 ch.al = fwd->al;
561 if (!ch.fastCheck().allowed()) { // when in doubt, block
562 debugs(20, 3, "store_miss prohibits caching");
563 return true;
564 }
565 }
566 return false;
567 }
568
569 HttpRequestPointer
570 Client::originalRequest()
571 {
572 return request;
573 }
574
575 #if USE_ADAPTATION
576 /// Initiate an asynchronous adaptation transaction which will call us back.
577 void
578 Client::startAdaptation(const Adaptation::ServiceGroupPointer &group, HttpRequest *cause)
579 {
580 debugs(11, 5, "Client::startAdaptation() called");
581 // check whether we should be sending a body as well
582 // start body pipe to feed ICAP transaction if needed
583 assert(!virginBodyDestination);
584 HttpReply *vrep = virginReply();
585 assert(!vrep->body_pipe);
586 int64_t size = 0;
587 if (vrep->expectingBody(cause->method, size) && size) {
588 virginBodyDestination = new BodyPipe(this);
589 vrep->body_pipe = virginBodyDestination;
590 debugs(93, 6, "will send virgin reply body to " <<
591 virginBodyDestination << "; size: " << size);
592 if (size > 0)
593 virginBodyDestination->setBodySize(size);
594 }
595
596 adaptedHeadSource = initiateAdaptation(
597 new Adaptation::Iterator(vrep, cause, fwd->al, group));
598 startedAdaptation = initiated(adaptedHeadSource);
599 Must(startedAdaptation);
600 }
601
602 // properly cleans up ICAP-related state
603 // may be called multiple times
604 void Client::cleanAdaptation()
605 {
606 debugs(11,5, "cleaning ICAP; ACL: " << adaptationAccessCheckPending);
607
608 if (virginBodyDestination != nullptr)
609 stopProducingFor(virginBodyDestination, false);
610
611 announceInitiatorAbort(adaptedHeadSource);
612
613 if (adaptedBodySource != nullptr)
614 stopConsumingFrom(adaptedBodySource);
615
616 if (!adaptationAccessCheckPending) // we cannot cancel a pending callback
617 assert(doneWithAdaptation()); // make sure the two methods are in sync
618 }
619
620 bool
621 Client::doneWithAdaptation() const
622 {
623 return !adaptationAccessCheckPending &&
624 !virginBodyDestination && !adaptedHeadSource && !adaptedBodySource;
625 }
626
627 // sends virgin reply body to ICAP, buffering excesses if needed
628 void
629 Client::adaptVirginReplyBody(const char *data, ssize_t len)
630 {
631 assert(startedAdaptation);
632
633 if (!virginBodyDestination) {
634 debugs(11,3, "ICAP does not want more virgin body");
635 return;
636 }
637
638 // grow overflow area if already overflowed
639 if (responseBodyBuffer) {
640 responseBodyBuffer->append(data, len);
641 data = responseBodyBuffer->content();
642 len = responseBodyBuffer->contentSize();
643 }
644
645 const ssize_t putSize = virginBodyDestination->putMoreData(data, len);
646 data += putSize;
647 len -= putSize;
648
649 // if we had overflow area, shrink it as necessary
650 if (responseBodyBuffer) {
651 if (putSize == responseBodyBuffer->contentSize()) {
652 delete responseBodyBuffer;
653 responseBodyBuffer = nullptr;
654 } else {
655 responseBodyBuffer->consume(putSize);
656 }
657 return;
658 }
659
660 // if we did not have an overflow area, create it as needed
661 if (len > 0) {
662 assert(!responseBodyBuffer);
663 responseBodyBuffer = new MemBuf;
664 responseBodyBuffer->init(4096, SQUID_TCP_SO_RCVBUF * 10);
665 responseBodyBuffer->append(data, len);
666 }
667 }
668
669 // can supply more virgin response body data
670 void
671 Client::noteMoreBodySpaceAvailable(BodyPipe::Pointer)
672 {
673 if (responseBodyBuffer) {
674 addVirginReplyBody(nullptr, 0); // kick the buffered fragment alive again
675 if (completed && !responseBodyBuffer) {
676 serverComplete2();
677 return;
678 }
679 }
680 maybeReadVirginBody();
681 }
682
683 // the consumer of our virgin response body aborted
684 void
685 Client::noteBodyConsumerAborted(BodyPipe::Pointer)
686 {
687 stopProducingFor(virginBodyDestination, false);
688
689 // do not force closeServer here in case we need to bypass AdaptationQueryAbort
690
691 if (doneWithAdaptation()) // we may still be receiving adapted response
692 handleAdaptationCompleted();
693 }
694
695 // received adapted response headers (body may follow)
696 void
697 Client::noteAdaptationAnswer(const Adaptation::Answer &answer)
698 {
699 clearAdaptation(adaptedHeadSource); // we do not expect more messages
700
701 switch (answer.kind) {
702 case Adaptation::Answer::akForward:
703 handleAdaptedHeader(const_cast<Http::Message*>(answer.message.getRaw()));
704 break;
705
706 case Adaptation::Answer::akBlock:
707 handleAdaptationBlocked(answer);
708 break;
709
710 case Adaptation::Answer::akError:
711 handleAdaptationAborted(!answer.final);
712 break;
713 }
714 }
715
716 void
717 Client::handleAdaptedHeader(Http::Message *msg)
718 {
719 if (abortOnBadEntry("entry went bad while waiting for adapted headers")) {
720 // If the adapted response has a body, the ICAP side needs to know
721 // that nobody will consume that body. We will be destroyed upon
722 // return. Tell the ICAP side that it is on its own.
723 HttpReply *rep = dynamic_cast<HttpReply*>(msg);
724 assert(rep);
725 if (rep->body_pipe != nullptr)
726 rep->body_pipe->expectNoConsumption();
727
728 return;
729 }
730
731 HttpReply *rep = dynamic_cast<HttpReply*>(msg);
732 assert(rep);
733 debugs(11,5, this << " setting adapted reply to " << rep);
734 setFinalReply(rep);
735
736 assert(!adaptedBodySource);
737 if (rep->body_pipe != nullptr) {
738 // subscribe to receive adapted body
739 adaptedBodySource = rep->body_pipe;
740 // assume that ICAP does not auto-consume on failures
741 const bool result = adaptedBodySource->setConsumerIfNotLate(this);
742 assert(result);
743 } else {
744 // no body
745 fwd->markStoredReplyAsWhole("setFinalReply() stored header-only adapted reply");
746 if (doneWithAdaptation()) // we may still be sending virgin response
747 handleAdaptationCompleted();
748 }
749 }
750
751 void
752 Client::resumeBodyStorage()
753 {
754 if (abortOnBadEntry("store entry aborted while kick producer callback"))
755 return;
756
757 if (!adaptedBodySource)
758 return;
759
760 handleMoreAdaptedBodyAvailable();
761
762 if (adaptedBodySource != nullptr && adaptedBodySource->exhausted())
763 endAdaptedBodyConsumption();
764 }
765
766 // more adapted response body is available
767 void
768 Client::handleMoreAdaptedBodyAvailable()
769 {
770 if (abortOnBadEntry("entry refuses adapted body"))
771 return;
772
773 assert(entry);
774
775 size_t contentSize = adaptedBodySource->buf().contentSize();
776
777 if (!contentSize)
778 return; // XXX: bytesWanted asserts on zero-size ranges
779
780 const size_t spaceAvailable = entry->bytesWanted(Range<size_t>(0, contentSize), true);
781
782 if (spaceAvailable < contentSize ) {
783 // No or partial body data consuming
784 typedef NullaryMemFunT<Client> Dialer;
785 AsyncCall::Pointer call = asyncCall(93, 5, "Client::resumeBodyStorage",
786 Dialer(this, &Client::resumeBodyStorage));
787 entry->deferProducer(call);
788 }
789
790 if (!spaceAvailable) {
791 debugs(11, 5, "NOT storing " << contentSize << " bytes of adapted " <<
792 "response body at offset " << adaptedBodySource->consumedSize());
793 return;
794 }
795
796 if (spaceAvailable < contentSize ) {
797 debugs(11, 5, "postponing storage of " <<
798 (contentSize - spaceAvailable) << " body bytes");
799 contentSize = spaceAvailable;
800 }
801
802 debugs(11,5, "storing " << contentSize << " bytes of adapted " <<
803 "response body at offset " << adaptedBodySource->consumedSize());
804
805 BodyPipeCheckout bpc(*adaptedBodySource);
806 const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize);
807 currentOffset += ioBuf.length;
808 entry->write(ioBuf);
809 bpc.buf.consume(contentSize);
810 bpc.checkIn();
811 }
812
813 // the entire adapted response body was produced, successfully
814 void
815 Client::handleAdaptedBodyProductionEnded()
816 {
817 if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
818 return;
819
820 // distinguish this code path from handleAdaptedBodyProducerAborted()
821 receivedWholeAdaptedReply = true;
822
823 // end consumption if we consumed everything
824 if (adaptedBodySource != nullptr && adaptedBodySource->exhausted())
825 endAdaptedBodyConsumption();
826 // else resumeBodyStorage() will eventually consume the rest
827 }
828
829 void
830 Client::endAdaptedBodyConsumption()
831 {
832 stopConsumingFrom(adaptedBodySource);
833
834 if (receivedWholeAdaptedReply) {
835 // We received the entire adapted reply per receivedWholeAdaptedReply.
836 // We are called when we consumed everything received (per our callers).
837 // We consume only what we store per handleMoreAdaptedBodyAvailable().
838 fwd->markStoredReplyAsWhole("received,consumed=>stored the entire RESPMOD reply");
839 }
840
841 handleAdaptationCompleted();
842 }
843
844 // premature end of the adapted response body
845 void Client::handleAdaptedBodyProducerAborted()
846 {
847 if (abortOnBadEntry("entry went bad while waiting for the now-aborted adapted body"))
848 return;
849
850 Must(adaptedBodySource != nullptr);
851 if (!adaptedBodySource->exhausted()) {
852 debugs(11,5, "waiting to consume the remainder of the aborted adapted body");
853 return; // resumeBodyStorage() should eventually consume the rest
854 }
855
856 stopConsumingFrom(adaptedBodySource);
857
858 if (handledEarlyAdaptationAbort())
859 return;
860
861 handleAdaptationCompleted(); // the user should get a truncated response
862 }
863
864 // common part of noteAdaptationAnswer and handleAdaptedBodyProductionEnded
865 void
866 Client::handleAdaptationCompleted()
867 {
868 debugs(11,5, "handleAdaptationCompleted");
869 cleanAdaptation();
870
871 // We stop reading origin response because we have no place to put it(*) and
872 // cannot use it. If some origin servers do not like that or if we want to
873 // reuse more pconns, we can add code to discard unneeded origin responses.
874 // (*) TODO: Is it possible that the adaptation xaction is still running?
875 if (mayReadVirginReplyBody()) {
876 debugs(11,3, "closing origin conn due to ICAP completion");
877 closeServer();
878 }
879
880 completeForwarding();
881 }
882
883 // common part of noteAdaptation*Aborted and noteBodyConsumerAborted methods
884 void
885 Client::handleAdaptationAborted(bool bypassable)
886 {
887 debugs(11,5, "handleAdaptationAborted; bypassable: " << bypassable <<
888 ", entry empty: " << entry->isEmpty());
889
890 if (abortOnBadEntry("entry went bad while ICAP aborted"))
891 return;
892
893 // TODO: bypass if possible
894 if (!handledEarlyAdaptationAbort())
895 abortAll("adaptation failure with a filled entry");
896 }
897
898 /// If the store entry is still empty, fully handles adaptation abort, returning
899 /// true. Otherwise just updates the request error detail and returns false.
900 bool
901 Client::handledEarlyAdaptationAbort()
902 {
903 if (entry->isEmpty()) {
904 debugs(11,8, "adaptation failure with an empty entry: " << *entry);
905 const auto err = new ErrorState(ERR_ICAP_FAILURE, Http::scInternalServerError, request.getRaw(), fwd->al);
906 static const auto d = MakeNamedErrorDetail("ICAP_RESPMOD_EARLY");
907 err->detailError(d);
908 fwd->fail(err);
909 fwd->dontRetry(true);
910 abortAll("adaptation failure with an empty entry");
911 return true; // handled
912 }
913
914 if (request) { // update logged info directly
915 static const auto d = MakeNamedErrorDetail("ICAP_RESPMOD_LATE");
916 request->detailError(ERR_ICAP_FAILURE, d);
917 }
918
919 return false; // the caller must handle
920 }
921
922 // adaptation service wants us to deny HTTP client access to this response
923 void
924 Client::handleAdaptationBlocked(const Adaptation::Answer &answer)
925 {
926 debugs(11,5, answer.ruleId);
927
928 if (abortOnBadEntry("entry went bad while ICAP aborted"))
929 return;
930
931 if (!entry->isEmpty()) { // too late to block (should not really happen)
932 if (request) {
933 static const auto d = MakeNamedErrorDetail("RESPMOD_BLOCK_LATE");
934 request->detailError(ERR_ICAP_FAILURE, d);
935 }
936 abortAll("late adaptation block");
937 return;
938 }
939
940 debugs(11,7, "creating adaptation block response");
941
942 err_type page_id =
943 aclGetDenyInfoPage(&Config.denyInfoList, answer.ruleId.termedBuf(), 1);
944 if (page_id == ERR_NONE)
945 page_id = ERR_ACCESS_DENIED;
946
947 const auto err = new ErrorState(page_id, Http::scForbidden, request.getRaw(), fwd->al);
948 static const auto d = MakeNamedErrorDetail("RESPMOD_BLOCK_EARLY");
949 err->detailError(d);
950 fwd->fail(err);
951 fwd->dontRetry(true);
952
953 abortOnData("timely adaptation block");
954 }
955
956 void
957 Client::noteAdaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
958 {
959 adaptationAccessCheckPending = false;
960
961 if (abortOnBadEntry("entry went bad while waiting for ICAP ACL check"))
962 return;
963
964 // TODO: Should non-ICAP and ICAP REPMOD pre-cache paths check this?
965 // That check now only happens on REQMOD pre-cache and REPMOD post-cache, in processReplyAccess().
966 if (virginReply()->expectedBodyTooLarge(*request)) {
967 sendBodyIsTooLargeError();
968 return;
969 }
970 // TODO: Should we check receivedBodyTooLarge as well?
971
972 if (!group) {
973 debugs(11,3, "no adapation needed");
974 setFinalReply(virginReply());
975 processReplyBody();
976 return;
977 }
978
979 startAdaptation(group, originalRequest().getRaw());
980 processReplyBody();
981 }
982 #endif
983
984 void
985 Client::sendBodyIsTooLargeError()
986 {
987 const auto err = new ErrorState(ERR_TOO_BIG, Http::scForbidden, request.getRaw(), fwd->al);
988 fwd->fail(err);
989 fwd->dontRetry(true);
990 abortOnData("Virgin body too large.");
991 }
992
993 // TODO: when HttpStateData sends all errors to ICAP,
994 // we should be able to move this at the end of setVirginReply().
995 void
996 Client::adaptOrFinalizeReply()
997 {
998 #if USE_ADAPTATION
999 // TODO: merge with client side and return void to hide the on/off logic?
1000 // The callback can be called with a NULL service if adaptation is off.
1001 adaptationAccessCheckPending = Adaptation::AccessCheck::Start(
1002 Adaptation::methodRespmod, Adaptation::pointPreCache,
1003 originalRequest().getRaw(), virginReply(), fwd->al, this);
1004 debugs(11,5, "adaptationAccessCheckPending=" << adaptationAccessCheckPending);
1005 if (adaptationAccessCheckPending)
1006 return;
1007 #endif
1008
1009 setFinalReply(virginReply());
1010 }
1011
1012 /// initializes bodyBytesRead stats if needed and applies delta
1013 void
1014 Client::adjustBodyBytesRead(const int64_t delta)
1015 {
1016 int64_t &bodyBytesRead = originalRequest()->hier.bodyBytesRead;
1017
1018 // if we got here, do not log a dash even if we got nothing from the server
1019 if (bodyBytesRead < 0)
1020 bodyBytesRead = 0;
1021
1022 bodyBytesRead += delta; // supports negative and zero deltas
1023
1024 // check for overflows ("infinite" response?) and underflows (a bug)
1025 Must(bodyBytesRead >= 0);
1026 }
1027
1028 void
1029 Client::delayRead()
1030 {
1031 using DeferredReadDialer = NullaryMemFunT<Client>;
1032 AsyncCall::Pointer call = asyncCall(11, 5, "Client::noteDelayAwareReadChance",
1033 DeferredReadDialer(this, &Client::noteDelayAwareReadChance));
1034 entry->mem().delayRead(call);
1035 }
1036
1037 void
1038 Client::addVirginReplyBody(const char *data, ssize_t len)
1039 {
1040 adjustBodyBytesRead(len);
1041
1042 #if USE_ADAPTATION
1043 assert(!adaptationAccessCheckPending); // or would need to buffer while waiting
1044 if (startedAdaptation) {
1045 adaptVirginReplyBody(data, len);
1046 return;
1047 }
1048 #endif
1049 storeReplyBody(data, len);
1050 }
1051
1052 // writes virgin or adapted reply body to store
1053 void
1054 Client::storeReplyBody(const char *data, ssize_t len)
1055 {
1056 // write even if len is zero to push headers towards the client side
1057 entry->write (StoreIOBuffer(len, currentOffset, (char*)data));
1058
1059 currentOffset += len;
1060 }
1061
1062 size_t
1063 Client::calcBufferSpaceToReserve(size_t space, const size_t wantSpace) const
1064 {
1065 if (space < wantSpace) {
1066 const size_t maxSpace = SBuf::maxSize; // absolute best
1067 space = min(wantSpace, maxSpace); // do not promise more than asked
1068 }
1069
1070 #if USE_ADAPTATION
1071 if (responseBodyBuffer) {
1072 return 0; // Stop reading if already overflowed waiting for ICAP to catch up
1073 }
1074
1075 if (virginBodyDestination != nullptr) {
1076 /*
1077 * BodyPipe buffer has a finite size limit. We
1078 * should not read more data from the network than will fit
1079 * into the pipe buffer or we _lose_ what did not fit if
1080 * the response ends sooner that BodyPipe frees up space:
1081 * There is no code to keep pumping data into the pipe once
1082 * response ends and serverComplete() is called.
1083 */
1084 const size_t adaptor_space = virginBodyDestination->buf().potentialSpaceSize();
1085
1086 debugs(11,9, "Client may read up to min(" <<
1087 adaptor_space << ", " << space << ") bytes");
1088
1089 if (adaptor_space < space)
1090 space = adaptor_space;
1091 }
1092 #endif
1093
1094 return space;
1095 }
1096
1097 size_t
1098 Client::replyBodySpace(const MemBuf &readBuf, const size_t minSpace) const
1099 {
1100 size_t space = readBuf.spaceSize(); // available space w/o heroic measures
1101 if (space < minSpace) {
1102 const size_t maxSpace = readBuf.potentialSpaceSize(); // absolute best
1103 space = min(minSpace, maxSpace); // do not promise more than asked
1104 }
1105
1106 #if USE_ADAPTATION
1107 if (responseBodyBuffer) {
1108 return 0; // Stop reading if already overflowed waiting for ICAP to catch up
1109 }
1110
1111 if (virginBodyDestination != nullptr) {
1112 /*
1113 * BodyPipe buffer has a finite size limit. We
1114 * should not read more data from the network than will fit
1115 * into the pipe buffer or we _lose_ what did not fit if
1116 * the response ends sooner that BodyPipe frees up space:
1117 * There is no code to keep pumping data into the pipe once
1118 * response ends and serverComplete() is called.
1119 *
1120 * If the pipe is totally full, don't register the read handler.
1121 * The BodyPipe will call our noteMoreBodySpaceAvailable() method
1122 * when it has free space again.
1123 */
1124 size_t adaptation_space =
1125 virginBodyDestination->buf().potentialSpaceSize();
1126
1127 debugs(11,9, "Client may read up to min(" <<
1128 adaptation_space << ", " << space << ") bytes");
1129
1130 if (adaptation_space < space)
1131 space = adaptation_space;
1132 }
1133 #endif
1134
1135 return space;
1136 }
1137