]> git.ipfire.org Git - thirdparty/squid.git/blob - src/Server.cc
Boilerplate: update copyright blurbs on src/
[thirdparty/squid.git] / src / Server.cc
1 /*
2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 #include "squid.h"
10 #include "acl/FilledChecklist.h"
11 #include "acl/Gadgets.h"
12 #include "base/TextException.h"
13 #include "comm/Connection.h"
14 #include "comm/forward.h"
15 #include "comm/Write.h"
16 #include "err_detail_type.h"
17 #include "errorpage.h"
18 #include "fd.h"
19 #include "HttpHdrContRange.h"
20 #include "HttpReply.h"
21 #include "HttpRequest.h"
22 #include "Server.h"
23 #include "SquidConfig.h"
24 #include "SquidTime.h"
25 #include "StatCounters.h"
26 #include "Store.h"
27 #include "tools.h"
28 #include "URL.h"
29
30 #if USE_ADAPTATION
31 #include "adaptation/AccessCheck.h"
32 #include "adaptation/Answer.h"
33 #include "adaptation/Iterator.h"
34 #include "base/AsyncCall.h"
35 #endif
36
37 // implemented in client_side_reply.cc until sides have a common parent
38 void purgeEntriesByUrl(HttpRequest * req, const char *url);
39
40 ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateData"),
41 requestSender(NULL),
42 #if USE_ADAPTATION
43 adaptedHeadSource(NULL),
44 adaptationAccessCheckPending(false),
45 startedAdaptation(false),
46 #endif
47 receivedWholeRequestBody(false),
48 theVirginReply(NULL),
49 theFinalReply(NULL)
50 {
51 fwd = theFwdState;
52 entry = fwd->entry;
53
54 entry->lock("ServerStateData");
55
56 request = fwd->request;
57 HTTPMSGLOCK(request);
58 }
59
60 ServerStateData::~ServerStateData()
61 {
62 // paranoid: check that swanSong has been called
63 assert(!requestBodySource);
64 #if USE_ADAPTATION
65 assert(!virginBodyDestination);
66 assert(!adaptedBodySource);
67 #endif
68
69 entry->unlock("ServerStateData");
70
71 HTTPMSGUNLOCK(request);
72 HTTPMSGUNLOCK(theVirginReply);
73 HTTPMSGUNLOCK(theFinalReply);
74
75 fwd = NULL; // refcounted
76
77 if (responseBodyBuffer != NULL) {
78 delete responseBodyBuffer;
79 responseBodyBuffer = NULL;
80 }
81 }
82
83 void
84 ServerStateData::swanSong()
85 {
86 // get rid of our piping obligations
87 if (requestBodySource != NULL)
88 stopConsumingFrom(requestBodySource);
89
90 #if USE_ADAPTATION
91 cleanAdaptation();
92 #endif
93
94 BodyConsumer::swanSong();
95 #if USE_ADAPTATION
96 Initiator::swanSong();
97 BodyProducer::swanSong();
98 #endif
99
100 // paranoid: check that swanSong has been called
101 // extra paranoid: yeah, I really mean it. they MUST pass here.
102 assert(!requestBodySource);
103 #if USE_ADAPTATION
104 assert(!virginBodyDestination);
105 assert(!adaptedBodySource);
106 #endif
107 }
108
109 HttpReply *
110 ServerStateData::virginReply()
111 {
112 assert(theVirginReply);
113 return theVirginReply;
114 }
115
116 const HttpReply *
117 ServerStateData::virginReply() const
118 {
119 assert(theVirginReply);
120 return theVirginReply;
121 }
122
123 HttpReply *
124 ServerStateData::setVirginReply(HttpReply *rep)
125 {
126 debugs(11,5, HERE << this << " setting virgin reply to " << rep);
127 assert(!theVirginReply);
128 assert(rep);
129 theVirginReply = rep;
130 HTTPMSGLOCK(theVirginReply);
131 return theVirginReply;
132 }
133
134 HttpReply *
135 ServerStateData::finalReply()
136 {
137 assert(theFinalReply);
138 return theFinalReply;
139 }
140
141 HttpReply *
142 ServerStateData::setFinalReply(HttpReply *rep)
143 {
144 debugs(11,5, HERE << this << " setting final reply to " << rep);
145
146 assert(!theFinalReply);
147 assert(rep);
148 theFinalReply = rep;
149 HTTPMSGLOCK(theFinalReply);
150
151 // give entry the reply because haveParsedReplyHeaders() expects it there
152 entry->replaceHttpReply(theFinalReply, false); // but do not write yet
153 haveParsedReplyHeaders(); // update the entry/reply (e.g., set timestamps)
154 if (!EBIT_TEST(entry->flags, RELEASE_REQUEST) && blockCaching())
155 entry->release();
156 entry->startWriting(); // write the updated entry to store
157
158 return theFinalReply;
159 }
160
161 // called when no more server communication is expected; may quit
162 void
163 ServerStateData::serverComplete()
164 {
165 debugs(11,5,HERE << "serverComplete " << this);
166
167 if (!doneWithServer()) {
168 closeServer();
169 assert(doneWithServer());
170 }
171
172 completed = true;
173
174 HttpRequest *r = originalRequest();
175 r->hier.stopPeerClock(true);
176
177 if (requestBodySource != NULL)
178 stopConsumingFrom(requestBodySource);
179
180 if (responseBodyBuffer != NULL)
181 return;
182
183 serverComplete2();
184 }
185
186 void
187 ServerStateData::serverComplete2()
188 {
189 debugs(11,5,HERE << "serverComplete2 " << this);
190
191 #if USE_ADAPTATION
192 if (virginBodyDestination != NULL)
193 stopProducingFor(virginBodyDestination, true);
194
195 if (!doneWithAdaptation())
196 return;
197 #endif
198
199 completeForwarding();
200 }
201
202 bool ServerStateData::doneAll() const
203 {
204 return doneWithServer() &&
205 #if USE_ADAPTATION
206 doneWithAdaptation() &&
207 Adaptation::Initiator::doneAll() &&
208 BodyProducer::doneAll() &&
209 #endif
210 BodyConsumer::doneAll();
211 }
212
213 // FTP side overloads this to work around multiple calls to fwd->complete
214 void
215 ServerStateData::completeForwarding()
216 {
217 debugs(11,5, HERE << "completing forwarding for " << fwd);
218 assert(fwd != NULL);
219 fwd->complete();
220 }
221
222 // Register to receive request body
223 bool ServerStateData::startRequestBodyFlow()
224 {
225 HttpRequest *r = originalRequest();
226 assert(r->body_pipe != NULL);
227 requestBodySource = r->body_pipe;
228 if (requestBodySource->setConsumerIfNotLate(this)) {
229 debugs(11,3, HERE << "expecting request body from " <<
230 requestBodySource->status());
231 return true;
232 }
233
234 debugs(11,3, HERE << "aborting on partially consumed request body: " <<
235 requestBodySource->status());
236 requestBodySource = NULL;
237 return false;
238 }
239
240 // Entry-dependent callbacks use this check to quit if the entry went bad
241 bool
242 ServerStateData::abortOnBadEntry(const char *abortReason)
243 {
244 if (entry->isAccepting())
245 return false;
246
247 debugs(11,5, HERE << "entry is not Accepting!");
248 abortTransaction(abortReason);
249 return true;
250 }
251
252 // more request or adapted response body is available
253 void
254 ServerStateData::noteMoreBodyDataAvailable(BodyPipe::Pointer bp)
255 {
256 #if USE_ADAPTATION
257 if (adaptedBodySource == bp) {
258 handleMoreAdaptedBodyAvailable();
259 return;
260 }
261 #endif
262 if (requestBodySource == bp)
263 handleMoreRequestBodyAvailable();
264 }
265
266 // the entire request or adapted response body was provided, successfully
267 void
268 ServerStateData::noteBodyProductionEnded(BodyPipe::Pointer bp)
269 {
270 #if USE_ADAPTATION
271 if (adaptedBodySource == bp) {
272 handleAdaptedBodyProductionEnded();
273 return;
274 }
275 #endif
276 if (requestBodySource == bp)
277 handleRequestBodyProductionEnded();
278 }
279
280 // premature end of the request or adapted response body production
281 void
282 ServerStateData::noteBodyProducerAborted(BodyPipe::Pointer bp)
283 {
284 #if USE_ADAPTATION
285 if (adaptedBodySource == bp) {
286 handleAdaptedBodyProducerAborted();
287 return;
288 }
289 #endif
290 if (requestBodySource == bp)
291 handleRequestBodyProducerAborted();
292 }
293
294 // more origin request body data is available
295 void
296 ServerStateData::handleMoreRequestBodyAvailable()
297 {
298 if (!requestSender)
299 sendMoreRequestBody();
300 else
301 debugs(9,3, HERE << "waiting for request body write to complete");
302 }
303
304 // there will be no more handleMoreRequestBodyAvailable calls
305 void
306 ServerStateData::handleRequestBodyProductionEnded()
307 {
308 receivedWholeRequestBody = true;
309 if (!requestSender)
310 doneSendingRequestBody();
311 else
312 debugs(9,3, HERE << "waiting for request body write to complete");
313 }
314
315 // called when we are done sending request body; kids extend this
316 void
317 ServerStateData::doneSendingRequestBody()
318 {
319 debugs(9,3, HERE << "done sending request body");
320 assert(requestBodySource != NULL);
321 stopConsumingFrom(requestBodySource);
322
323 // kids extend this
324 }
325
326 // called when body producers aborts; kids extend this
327 void
328 ServerStateData::handleRequestBodyProducerAborted()
329 {
330 if (requestSender != NULL)
331 debugs(9,3, HERE << "fyi: request body aborted while we were sending");
332
333 fwd->dontRetry(true); // the problem is not with the server
334 stopConsumingFrom(requestBodySource); // requestSender, if any, will notice
335
336 // kids extend this
337 }
338
339 // called when we wrote request headers(!) or a part of the body
340 void
341 ServerStateData::sentRequestBody(const CommIoCbParams &io)
342 {
343 debugs(11, 5, "sentRequestBody: FD " << io.fd << ": size " << io.size << ": errflag " << io.flag << ".");
344 debugs(32,3,HERE << "sentRequestBody called");
345
346 requestSender = NULL;
347
348 if (io.size > 0) {
349 fd_bytes(io.fd, io.size, FD_WRITE);
350 kb_incr(&(statCounter.server.all.kbytes_out), io.size);
351 // kids should increment their counters
352 }
353
354 if (io.flag == Comm::ERR_CLOSING)
355 return;
356
357 if (!requestBodySource) {
358 debugs(9,3, HERE << "detected while-we-were-sending abort");
359 return; // do nothing;
360 }
361
362 if (io.flag) {
363 debugs(11, DBG_IMPORTANT, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(io.xerrno));
364 ErrorState *err;
365 err = new ErrorState(ERR_WRITE_ERROR, Http::scBadGateway, fwd->request);
366 err->xerrno = io.xerrno;
367 fwd->fail(err);
368 abortTransaction("I/O error while sending request body");
369 return;
370 }
371
372 if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
373 abortTransaction("store entry aborted while sending request body");
374 return;
375 }
376
377 if (!requestBodySource->exhausted())
378 sendMoreRequestBody();
379 else if (receivedWholeRequestBody)
380 doneSendingRequestBody();
381 else
382 debugs(9,3, HERE << "waiting for body production end or abort");
383 }
384
385 void
386 ServerStateData::sendMoreRequestBody()
387 {
388 assert(requestBodySource != NULL);
389 assert(!requestSender);
390
391 const Comm::ConnectionPointer conn = dataConnection();
392
393 if (!Comm::IsConnOpen(conn)) {
394 debugs(9,3, HERE << "cannot send request body to closing " << conn);
395 return; // wait for the kid's close handler; TODO: assert(closer);
396 }
397
398 MemBuf buf;
399 if (getMoreRequestBody(buf) && buf.contentSize() > 0) {
400 debugs(9,3, HERE << "will write " << buf.contentSize() << " request body bytes");
401 typedef CommCbMemFunT<ServerStateData, CommIoCbParams> Dialer;
402 requestSender = JobCallback(93,3, Dialer, this, ServerStateData::sentRequestBody);
403 Comm::Write(conn, &buf, requestSender);
404 } else {
405 debugs(9,3, HERE << "will wait for more request body bytes or eof");
406 requestSender = NULL;
407 }
408 }
409
410 /// either fill buf with available [encoded] request body bytes or return false
411 bool
412 ServerStateData::getMoreRequestBody(MemBuf &buf)
413 {
414 // default implementation does not encode request body content
415 Must(requestBodySource != NULL);
416 return requestBodySource->getMoreData(buf);
417 }
418
419 // Compares hosts in urls, returns false if different, no sheme, or no host.
420 static bool
421 sameUrlHosts(const char *url1, const char *url2)
422 {
423 // XXX: Want urlHostname() here, but it uses static storage and copying
424 const char *host1 = strchr(url1, ':');
425 const char *host2 = strchr(url2, ':');
426
427 if (host1 && host2) {
428 // skip scheme slashes
429 do {
430 ++host1;
431 ++host2;
432 } while (*host1 == '/' && *host2 == '/');
433
434 if (!*host1)
435 return false; // no host
436
437 // increment while the same until we reach the end of the URL/host
438 while (*host1 && *host1 != '/' && *host1 == *host2) {
439 ++host1;
440 ++host2;
441 }
442 return *host1 == *host2;
443 }
444
445 return false; // no URL scheme
446 }
447
448 // purges entries that match the value of a given HTTP [response] header
449 static void
450 purgeEntriesByHeader(HttpRequest *req, const char *reqUrl, HttpMsg *rep, http_hdr_type hdr)
451 {
452 const char *hdrUrl, *absUrl;
453
454 absUrl = NULL;
455 hdrUrl = rep->header.getStr(hdr);
456 if (hdrUrl == NULL) {
457 return;
458 }
459
460 /*
461 * If the URL is relative, make it absolute so we can find it.
462 * If it's absolute, make sure the host parts match to avoid DOS attacks
463 * as per RFC 2616 13.10.
464 */
465 if (urlIsRelative(hdrUrl)) {
466 absUrl = urlMakeAbsolute(req, hdrUrl);
467 if (absUrl != NULL) {
468 hdrUrl = absUrl;
469 }
470 } else if (!sameUrlHosts(reqUrl, hdrUrl)) {
471 return;
472 }
473
474 purgeEntriesByUrl(req, hdrUrl);
475
476 if (absUrl != NULL) {
477 safe_free(absUrl);
478 }
479 }
480
481 // some HTTP methods should purge matching cache entries
482 void
483 ServerStateData::maybePurgeOthers()
484 {
485 // only some HTTP methods should purge matching cache entries
486 if (!request->method.purgesOthers())
487 return;
488
489 // and probably only if the response was successful
490 if (theFinalReply->sline.status() >= 400)
491 return;
492
493 // XXX: should we use originalRequest() here?
494 const char *reqUrl = urlCanonical(request);
495 debugs(88, 5, "maybe purging due to " << request->method << ' ' << reqUrl);
496 purgeEntriesByUrl(request, reqUrl);
497 purgeEntriesByHeader(request, reqUrl, theFinalReply, HDR_LOCATION);
498 purgeEntriesByHeader(request, reqUrl, theFinalReply, HDR_CONTENT_LOCATION);
499 }
500
501 /// called when we have final (possibly adapted) reply headers; kids extend
502 void
503 ServerStateData::haveParsedReplyHeaders()
504 {
505 Must(theFinalReply);
506 maybePurgeOthers();
507
508 // adaptation may overwrite old offset computed using the virgin response
509 const bool partial = theFinalReply->content_range &&
510 theFinalReply->sline.status() == Http::scPartialContent;
511 currentOffset = partial ? theFinalReply->content_range->spec.offset : 0;
512 }
513
514 /// whether to prevent caching of an otherwise cachable response
515 bool
516 ServerStateData::blockCaching()
517 {
518 if (const Acl::Tree *acl = Config.accessList.storeMiss) {
519 // This relatively expensive check is not in StoreEntry::checkCachable:
520 // That method lacks HttpRequest and may be called too many times.
521 ACLFilledChecklist ch(acl, originalRequest(), NULL);
522 ch.reply = const_cast<HttpReply*>(entry->getReply()); // ACLFilledChecklist API bug
523 HTTPMSGLOCK(ch.reply);
524 if (ch.fastCheck() != ACCESS_ALLOWED) { // when in doubt, block
525 debugs(20, 3, "store_miss prohibits caching");
526 return true;
527 }
528 }
529 return false;
530 }
531
532 HttpRequest *
533 ServerStateData::originalRequest()
534 {
535 return request;
536 }
537
538 #if USE_ADAPTATION
539 /// Initiate an asynchronous adaptation transaction which will call us back.
540 void
541 ServerStateData::startAdaptation(const Adaptation::ServiceGroupPointer &group, HttpRequest *cause)
542 {
543 debugs(11, 5, "ServerStateData::startAdaptation() called");
544 // check whether we should be sending a body as well
545 // start body pipe to feed ICAP transaction if needed
546 assert(!virginBodyDestination);
547 HttpReply *vrep = virginReply();
548 assert(!vrep->body_pipe);
549 int64_t size = 0;
550 if (vrep->expectingBody(cause->method, size) && size) {
551 virginBodyDestination = new BodyPipe(this);
552 vrep->body_pipe = virginBodyDestination;
553 debugs(93, 6, HERE << "will send virgin reply body to " <<
554 virginBodyDestination << "; size: " << size);
555 if (size > 0)
556 virginBodyDestination->setBodySize(size);
557 }
558
559 adaptedHeadSource = initiateAdaptation(
560 new Adaptation::Iterator(vrep, cause, fwd->al, group));
561 startedAdaptation = initiated(adaptedHeadSource);
562 Must(startedAdaptation);
563 }
564
565 // properly cleans up ICAP-related state
566 // may be called multiple times
567 void ServerStateData::cleanAdaptation()
568 {
569 debugs(11,5, HERE << "cleaning ICAP; ACL: " << adaptationAccessCheckPending);
570
571 if (virginBodyDestination != NULL)
572 stopProducingFor(virginBodyDestination, false);
573
574 announceInitiatorAbort(adaptedHeadSource);
575
576 if (adaptedBodySource != NULL)
577 stopConsumingFrom(adaptedBodySource);
578
579 if (!adaptationAccessCheckPending) // we cannot cancel a pending callback
580 assert(doneWithAdaptation()); // make sure the two methods are in sync
581 }
582
583 bool
584 ServerStateData::doneWithAdaptation() const
585 {
586 return !adaptationAccessCheckPending &&
587 !virginBodyDestination && !adaptedHeadSource && !adaptedBodySource;
588 }
589
590 // sends virgin reply body to ICAP, buffering excesses if needed
591 void
592 ServerStateData::adaptVirginReplyBody(const char *data, ssize_t len)
593 {
594 assert(startedAdaptation);
595
596 if (!virginBodyDestination) {
597 debugs(11,3, HERE << "ICAP does not want more virgin body");
598 return;
599 }
600
601 // grow overflow area if already overflowed
602 if (responseBodyBuffer) {
603 responseBodyBuffer->append(data, len);
604 data = responseBodyBuffer->content();
605 len = responseBodyBuffer->contentSize();
606 }
607
608 const ssize_t putSize = virginBodyDestination->putMoreData(data, len);
609 data += putSize;
610 len -= putSize;
611
612 // if we had overflow area, shrink it as necessary
613 if (responseBodyBuffer) {
614 if (putSize == responseBodyBuffer->contentSize()) {
615 delete responseBodyBuffer;
616 responseBodyBuffer = NULL;
617 } else {
618 responseBodyBuffer->consume(putSize);
619 }
620 return;
621 }
622
623 // if we did not have an overflow area, create it as needed
624 if (len > 0) {
625 assert(!responseBodyBuffer);
626 responseBodyBuffer = new MemBuf;
627 responseBodyBuffer->init(4096, SQUID_TCP_SO_RCVBUF * 10);
628 responseBodyBuffer->append(data, len);
629 }
630 }
631
632 // can supply more virgin response body data
633 void
634 ServerStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer)
635 {
636 if (responseBodyBuffer) {
637 addVirginReplyBody(NULL, 0); // kick the buffered fragment alive again
638 if (completed && !responseBodyBuffer) {
639 serverComplete2();
640 return;
641 }
642 }
643 maybeReadVirginBody();
644 }
645
646 // the consumer of our virgin response body aborted
647 void
648 ServerStateData::noteBodyConsumerAborted(BodyPipe::Pointer)
649 {
650 stopProducingFor(virginBodyDestination, false);
651
652 // do not force closeServer here in case we need to bypass AdaptationQueryAbort
653
654 if (doneWithAdaptation()) // we may still be receiving adapted response
655 handleAdaptationCompleted();
656 }
657
658 // received adapted response headers (body may follow)
659 void
660 ServerStateData::noteAdaptationAnswer(const Adaptation::Answer &answer)
661 {
662 clearAdaptation(adaptedHeadSource); // we do not expect more messages
663
664 switch (answer.kind) {
665 case Adaptation::Answer::akForward:
666 handleAdaptedHeader(const_cast<HttpMsg*>(answer.message.getRaw()));
667 break;
668
669 case Adaptation::Answer::akBlock:
670 handleAdaptationBlocked(answer);
671 break;
672
673 case Adaptation::Answer::akError:
674 handleAdaptationAborted(!answer.final);
675 break;
676 }
677 }
678
679 void
680 ServerStateData::handleAdaptedHeader(HttpMsg *msg)
681 {
682 if (abortOnBadEntry("entry went bad while waiting for adapted headers")) {
683 // If the adapted response has a body, the ICAP side needs to know
684 // that nobody will consume that body. We will be destroyed upon
685 // return. Tell the ICAP side that it is on its own.
686 HttpReply *rep = dynamic_cast<HttpReply*>(msg);
687 assert(rep);
688 if (rep->body_pipe != NULL)
689 rep->body_pipe->expectNoConsumption();
690
691 return;
692 }
693
694 HttpReply *rep = dynamic_cast<HttpReply*>(msg);
695 assert(rep);
696 debugs(11,5, HERE << this << " setting adapted reply to " << rep);
697 setFinalReply(rep);
698
699 assert(!adaptedBodySource);
700 if (rep->body_pipe != NULL) {
701 // subscribe to receive adapted body
702 adaptedBodySource = rep->body_pipe;
703 // assume that ICAP does not auto-consume on failures
704 const bool result = adaptedBodySource->setConsumerIfNotLate(this);
705 assert(result);
706 } else {
707 // no body
708 if (doneWithAdaptation()) // we may still be sending virgin response
709 handleAdaptationCompleted();
710 }
711 }
712
713 void
714 ServerStateData::resumeBodyStorage()
715 {
716 if (abortOnBadEntry("store entry aborted while kick producer callback"))
717 return;
718
719 if (!adaptedBodySource)
720 return;
721
722 handleMoreAdaptedBodyAvailable();
723
724 if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
725 endAdaptedBodyConsumption();
726 }
727
728 // more adapted response body is available
729 void
730 ServerStateData::handleMoreAdaptedBodyAvailable()
731 {
732 if (abortOnBadEntry("entry refuses adapted body"))
733 return;
734
735 assert(entry);
736
737 size_t contentSize = adaptedBodySource->buf().contentSize();
738
739 if (!contentSize)
740 return; // XXX: bytesWanted asserts on zero-size ranges
741
742 const size_t spaceAvailable = entry->bytesWanted(Range<size_t>(0, contentSize), true);
743
744 if (spaceAvailable < contentSize ) {
745 // No or partial body data consuming
746 typedef NullaryMemFunT<ServerStateData> Dialer;
747 AsyncCall::Pointer call = asyncCall(93, 5, "ServerStateData::resumeBodyStorage",
748 Dialer(this, &ServerStateData::resumeBodyStorage));
749 entry->deferProducer(call);
750 }
751
752 if (!spaceAvailable) {
753 debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " <<
754 "response body at offset " << adaptedBodySource->consumedSize());
755 return;
756 }
757
758 if (spaceAvailable < contentSize ) {
759 debugs(11, 5, HERE << "postponing storage of " <<
760 (contentSize - spaceAvailable) << " body bytes");
761 contentSize = spaceAvailable;
762 }
763
764 debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " <<
765 "response body at offset " << adaptedBodySource->consumedSize());
766
767 BodyPipeCheckout bpc(*adaptedBodySource);
768 const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize);
769 currentOffset += ioBuf.length;
770 entry->write(ioBuf);
771 bpc.buf.consume(contentSize);
772 bpc.checkIn();
773 }
774
775 // the entire adapted response body was produced, successfully
776 void
777 ServerStateData::handleAdaptedBodyProductionEnded()
778 {
779 if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
780 return;
781
782 // end consumption if we consumed everything
783 if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
784 endAdaptedBodyConsumption();
785 // else resumeBodyStorage() will eventually consume the rest
786 }
787
788 void
789 ServerStateData::endAdaptedBodyConsumption()
790 {
791 stopConsumingFrom(adaptedBodySource);
792 handleAdaptationCompleted();
793 }
794
795 // premature end of the adapted response body
796 void ServerStateData::handleAdaptedBodyProducerAborted()
797 {
798 stopConsumingFrom(adaptedBodySource);
799 handleAdaptationAborted();
800 }
801
802 // common part of noteAdaptationAnswer and handleAdaptedBodyProductionEnded
803 void
804 ServerStateData::handleAdaptationCompleted()
805 {
806 debugs(11,5, HERE << "handleAdaptationCompleted");
807 cleanAdaptation();
808
809 // We stop reading origin response because we have no place to put it(*) and
810 // cannot use it. If some origin servers do not like that or if we want to
811 // reuse more pconns, we can add code to discard unneeded origin responses.
812 // (*) TODO: Is it possible that the adaptation xaction is still running?
813 if (mayReadVirginReplyBody()) {
814 debugs(11,3, HERE << "closing origin conn due to ICAP completion");
815 closeServer();
816 }
817
818 completeForwarding();
819 }
820
821 // common part of noteAdaptation*Aborted and noteBodyConsumerAborted methods
822 void
823 ServerStateData::handleAdaptationAborted(bool bypassable)
824 {
825 debugs(11,5, HERE << "handleAdaptationAborted; bypassable: " << bypassable <<
826 ", entry empty: " << entry->isEmpty());
827
828 if (abortOnBadEntry("entry went bad while ICAP aborted"))
829 return;
830
831 // TODO: bypass if possible
832
833 if (entry->isEmpty()) {
834 debugs(11,9, HERE << "creating ICAP error entry after ICAP failure");
835 ErrorState *err = new ErrorState(ERR_ICAP_FAILURE, Http::scInternalServerError, request);
836 err->detailError(ERR_DETAIL_ICAP_RESPMOD_EARLY);
837 fwd->fail(err);
838 fwd->dontRetry(true);
839 } else if (request) { // update logged info directly
840 request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_ICAP_RESPMOD_LATE);
841 }
842
843 abortTransaction("ICAP failure");
844 }
845
846 // adaptation service wants us to deny HTTP client access to this response
847 void
848 ServerStateData::handleAdaptationBlocked(const Adaptation::Answer &answer)
849 {
850 debugs(11,5, HERE << answer.ruleId);
851
852 if (abortOnBadEntry("entry went bad while ICAP aborted"))
853 return;
854
855 if (!entry->isEmpty()) { // too late to block (should not really happen)
856 if (request)
857 request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_RESPMOD_BLOCK_LATE);
858 abortTransaction("late adaptation block");
859 return;
860 }
861
862 debugs(11,7, HERE << "creating adaptation block response");
863
864 err_type page_id =
865 aclGetDenyInfoPage(&Config.denyInfoList, answer.ruleId.termedBuf(), 1);
866 if (page_id == ERR_NONE)
867 page_id = ERR_ACCESS_DENIED;
868
869 ErrorState *err = new ErrorState(page_id, Http::scForbidden, request);
870 err->detailError(ERR_DETAIL_RESPMOD_BLOCK_EARLY);
871 fwd->fail(err);
872 fwd->dontRetry(true);
873
874 abortTransaction("timely adaptation block");
875 }
876
877 void
878 ServerStateData::noteAdaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
879 {
880 adaptationAccessCheckPending = false;
881
882 if (abortOnBadEntry("entry went bad while waiting for ICAP ACL check"))
883 return;
884
885 // TODO: Should nonICAP and postICAP path check this on the server-side?
886 // That check now only happens on client-side, in processReplyAccess().
887 if (virginReply()->expectedBodyTooLarge(*request)) {
888 sendBodyIsTooLargeError();
889 return;
890 }
891 // TODO: Should we check receivedBodyTooLarge on the server-side as well?
892
893 if (!group) {
894 debugs(11,3, HERE << "no adapation needed");
895 setFinalReply(virginReply());
896 processReplyBody();
897 return;
898 }
899
900 startAdaptation(group, originalRequest());
901 processReplyBody();
902 }
903 #endif
904
905 void
906 ServerStateData::sendBodyIsTooLargeError()
907 {
908 ErrorState *err = new ErrorState(ERR_TOO_BIG, Http::scForbidden, request);
909 fwd->fail(err);
910 fwd->dontRetry(true);
911 abortTransaction("Virgin body too large.");
912 }
913
914 // TODO: when HttpStateData sends all errors to ICAP,
915 // we should be able to move this at the end of setVirginReply().
916 void
917 ServerStateData::adaptOrFinalizeReply()
918 {
919 #if USE_ADAPTATION
920 // TODO: merge with client side and return void to hide the on/off logic?
921 // The callback can be called with a NULL service if adaptation is off.
922 adaptationAccessCheckPending = Adaptation::AccessCheck::Start(
923 Adaptation::methodRespmod, Adaptation::pointPreCache,
924 originalRequest(), virginReply(), fwd->al, this);
925 debugs(11,5, HERE << "adaptationAccessCheckPending=" << adaptationAccessCheckPending);
926 if (adaptationAccessCheckPending)
927 return;
928 #endif
929
930 setFinalReply(virginReply());
931 }
932
933 /// initializes bodyBytesRead stats if needed and applies delta
934 void
935 ServerStateData::adjustBodyBytesRead(const int64_t delta)
936 {
937 int64_t &bodyBytesRead = originalRequest()->hier.bodyBytesRead;
938
939 // if we got here, do not log a dash even if we got nothing from the server
940 if (bodyBytesRead < 0)
941 bodyBytesRead = 0;
942
943 bodyBytesRead += delta; // supports negative and zero deltas
944
945 // check for overflows ("infinite" response?) and undeflows (a bug)
946 Must(bodyBytesRead >= 0);
947 }
948
949 void
950 ServerStateData::addVirginReplyBody(const char *data, ssize_t len)
951 {
952 adjustBodyBytesRead(len);
953
954 #if USE_ADAPTATION
955 assert(!adaptationAccessCheckPending); // or would need to buffer while waiting
956 if (startedAdaptation) {
957 adaptVirginReplyBody(data, len);
958 return;
959 }
960 #endif
961 storeReplyBody(data, len);
962 }
963
964 // writes virgin or adapted reply body to store
965 void
966 ServerStateData::storeReplyBody(const char *data, ssize_t len)
967 {
968 // write even if len is zero to push headers towards the client side
969 entry->write (StoreIOBuffer(len, currentOffset, (char*)data));
970
971 currentOffset += len;
972 }
973
974 size_t ServerStateData::replyBodySpace(const MemBuf &readBuf,
975 const size_t minSpace) const
976 {
977 size_t space = readBuf.spaceSize(); // available space w/o heroic measures
978 if (space < minSpace) {
979 const size_t maxSpace = readBuf.potentialSpaceSize(); // absolute best
980 space = min(minSpace, maxSpace); // do not promise more than asked
981 }
982
983 #if USE_ADAPTATION
984 if (responseBodyBuffer) {
985 return 0; // Stop reading if already overflowed waiting for ICAP to catch up
986 }
987
988 if (virginBodyDestination != NULL) {
989 /*
990 * BodyPipe buffer has a finite size limit. We
991 * should not read more data from the network than will fit
992 * into the pipe buffer or we _lose_ what did not fit if
993 * the response ends sooner that BodyPipe frees up space:
994 * There is no code to keep pumping data into the pipe once
995 * response ends and serverComplete() is called.
996 *
997 * If the pipe is totally full, don't register the read handler.
998 * The BodyPipe will call our noteMoreBodySpaceAvailable() method
999 * when it has free space again.
1000 */
1001 size_t adaptation_space =
1002 virginBodyDestination->buf().potentialSpaceSize();
1003
1004 debugs(11,9, "ServerStateData may read up to min(" <<
1005 adaptation_space << ", " << space << ") bytes");
1006
1007 if (adaptation_space < space)
1008 space = adaptation_space;
1009 }
1010 #endif
1011
1012 return space;
1013 }