]> git.ipfire.org Git - thirdparty/squid.git/blob - src/Server.cc
13e1d76adcb57a2a5ccf975f3421e5f1d2ddb603
[thirdparty/squid.git] / src / Server.cc
1 /*
2 * $Id$
3 *
4 * DEBUG:
5 * AUTHOR: Duane Wessels
6 *
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
9 *
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
18 *
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
23 *
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
32 *
33 */
34
35 #include "squid.h"
36 #include "acl/Gadgets.h"
37 #include "base/TextException.h"
38 #include "comm/Connection.h"
39 #include "comm/forward.h"
40 #include "comm/Write.h"
41 #include "Server.h"
42 #include "Store.h"
43 #include "HttpRequest.h"
44 #include "HttpReply.h"
45 #include "errorpage.h"
46 #include "err_detail_type.h"
47 #include "StatCounters.h"
48 #include "SquidTime.h"
49
50 #if USE_ADAPTATION
51 #include "adaptation/AccessCheck.h"
52 #include "adaptation/Answer.h"
53 #include "adaptation/Iterator.h"
54 #include "base/AsyncCall.h"
55 #endif
56
57 // implemented in client_side_reply.cc until sides have a common parent
58 extern void purgeEntriesByUrl(HttpRequest * req, const char *url);
59
60
61 ServerStateData::ServerStateData(FwdState *theFwdState): AsyncJob("ServerStateData"),
62 requestSender(NULL),
63 #if USE_ADAPTATION
64 adaptedHeadSource(NULL),
65 adaptationAccessCheckPending(false),
66 startedAdaptation(false),
67 #endif
68 receivedWholeRequestBody(false),
69 theVirginReply(NULL),
70 theFinalReply(NULL)
71 {
72 fwd = theFwdState;
73 entry = fwd->entry;
74
75 entry->lock();
76
77 request = HTTPMSGLOCK(fwd->request);
78 }
79
80 ServerStateData::~ServerStateData()
81 {
82 // paranoid: check that swanSong has been called
83 assert(!requestBodySource);
84 #if USE_ADAPTATION
85 assert(!virginBodyDestination);
86 assert(!adaptedBodySource);
87 #endif
88
89 entry->unlock();
90
91 HTTPMSGUNLOCK(request);
92 HTTPMSGUNLOCK(theVirginReply);
93 HTTPMSGUNLOCK(theFinalReply);
94
95 fwd = NULL; // refcounted
96
97 if (responseBodyBuffer != NULL) {
98 delete responseBodyBuffer;
99 responseBodyBuffer = NULL;
100 }
101 }
102
103 void
104 ServerStateData::swanSong()
105 {
106 // get rid of our piping obligations
107 if (requestBodySource != NULL)
108 stopConsumingFrom(requestBodySource);
109
110 #if USE_ADAPTATION
111 cleanAdaptation();
112 #endif
113
114 BodyConsumer::swanSong();
115 #if USE_ADAPTATION
116 Initiator::swanSong();
117 BodyProducer::swanSong();
118 #endif
119
120 // paranoid: check that swanSong has been called
121 // extra paranoid: yeah, I really mean it. they MUST pass here.
122 assert(!requestBodySource);
123 #if USE_ADAPTATION
124 assert(!virginBodyDestination);
125 assert(!adaptedBodySource);
126 #endif
127 }
128
129
130 HttpReply *
131 ServerStateData::virginReply()
132 {
133 assert(theVirginReply);
134 return theVirginReply;
135 }
136
137 const HttpReply *
138 ServerStateData::virginReply() const
139 {
140 assert(theVirginReply);
141 return theVirginReply;
142 }
143
144 HttpReply *
145 ServerStateData::setVirginReply(HttpReply *rep)
146 {
147 debugs(11,5, HERE << this << " setting virgin reply to " << rep);
148 assert(!theVirginReply);
149 assert(rep);
150 theVirginReply = HTTPMSGLOCK(rep);
151 return theVirginReply;
152 }
153
154 HttpReply *
155 ServerStateData::finalReply()
156 {
157 assert(theFinalReply);
158 return theFinalReply;
159 }
160
161 HttpReply *
162 ServerStateData::setFinalReply(HttpReply *rep)
163 {
164 debugs(11,5, HERE << this << " setting final reply to " << rep);
165
166 assert(!theFinalReply);
167 assert(rep);
168 theFinalReply = HTTPMSGLOCK(rep);
169
170 // give entry the reply because haveParsedReplyHeaders() expects it there
171 entry->replaceHttpReply(theFinalReply, false); // but do not write yet
172 haveParsedReplyHeaders(); // update the entry/reply (e.g., set timestamps)
173 entry->startWriting(); // write the updated entry to store
174
175 return theFinalReply;
176 }
177
178 // called when no more server communication is expected; may quit
179 void
180 ServerStateData::serverComplete()
181 {
182 debugs(11,5,HERE << "serverComplete " << this);
183
184 if (!doneWithServer()) {
185 closeServer();
186 assert(doneWithServer());
187 }
188
189 completed = true;
190
191 HttpRequest *r = originalRequest();
192 r->hier.total_response_time = r->hier.first_conn_start.tv_sec ?
193 tvSubMsec(r->hier.first_conn_start, current_time) : -1;
194
195 if (requestBodySource != NULL)
196 stopConsumingFrom(requestBodySource);
197
198 if (responseBodyBuffer != NULL)
199 return;
200
201 serverComplete2();
202 }
203
204 void
205 ServerStateData::serverComplete2()
206 {
207 debugs(11,5,HERE << "serverComplete2 " << this);
208
209 #if USE_ADAPTATION
210 if (virginBodyDestination != NULL)
211 stopProducingFor(virginBodyDestination, true);
212
213 if (!doneWithAdaptation())
214 return;
215 #endif
216
217 completeForwarding();
218 }
219
220 bool ServerStateData::doneAll() const
221 {
222 return doneWithServer() &&
223 #if USE_ADAPTATION
224 doneWithAdaptation() &&
225 Adaptation::Initiator::doneAll() &&
226 BodyProducer::doneAll() &&
227 #endif
228 BodyConsumer::doneAll();
229 }
230
231 // FTP side overloads this to work around multiple calls to fwd->complete
232 void
233 ServerStateData::completeForwarding()
234 {
235 debugs(11,5, HERE << "completing forwarding for " << fwd);
236 assert(fwd != NULL);
237 fwd->complete();
238 }
239
240 // Register to receive request body
241 bool ServerStateData::startRequestBodyFlow()
242 {
243 HttpRequest *r = originalRequest();
244 assert(r->body_pipe != NULL);
245 requestBodySource = r->body_pipe;
246 if (requestBodySource->setConsumerIfNotLate(this)) {
247 debugs(11,3, HERE << "expecting request body from " <<
248 requestBodySource->status());
249 return true;
250 }
251
252 debugs(11,3, HERE << "aborting on partially consumed request body: " <<
253 requestBodySource->status());
254 requestBodySource = NULL;
255 return false;
256 }
257
258 // Entry-dependent callbacks use this check to quit if the entry went bad
259 bool
260 ServerStateData::abortOnBadEntry(const char *abortReason)
261 {
262 if (entry->isAccepting())
263 return false;
264
265 debugs(11,5, HERE << "entry is not Accepting!");
266 abortTransaction(abortReason);
267 return true;
268 }
269
270 // more request or adapted response body is available
271 void
272 ServerStateData::noteMoreBodyDataAvailable(BodyPipe::Pointer bp)
273 {
274 #if USE_ADAPTATION
275 if (adaptedBodySource == bp) {
276 handleMoreAdaptedBodyAvailable();
277 return;
278 }
279 #endif
280 if (requestBodySource == bp)
281 handleMoreRequestBodyAvailable();
282 }
283
284 // the entire request or adapted response body was provided, successfully
285 void
286 ServerStateData::noteBodyProductionEnded(BodyPipe::Pointer bp)
287 {
288 #if USE_ADAPTATION
289 if (adaptedBodySource == bp) {
290 handleAdaptedBodyProductionEnded();
291 return;
292 }
293 #endif
294 if (requestBodySource == bp)
295 handleRequestBodyProductionEnded();
296 }
297
298 // premature end of the request or adapted response body production
299 void
300 ServerStateData::noteBodyProducerAborted(BodyPipe::Pointer bp)
301 {
302 #if USE_ADAPTATION
303 if (adaptedBodySource == bp) {
304 handleAdaptedBodyProducerAborted();
305 return;
306 }
307 #endif
308 if (requestBodySource == bp)
309 handleRequestBodyProducerAborted();
310 }
311
312
313 // more origin request body data is available
314 void
315 ServerStateData::handleMoreRequestBodyAvailable()
316 {
317 if (!requestSender)
318 sendMoreRequestBody();
319 else
320 debugs(9,3, HERE << "waiting for request body write to complete");
321 }
322
323 // there will be no more handleMoreRequestBodyAvailable calls
324 void
325 ServerStateData::handleRequestBodyProductionEnded()
326 {
327 receivedWholeRequestBody = true;
328 if (!requestSender)
329 doneSendingRequestBody();
330 else
331 debugs(9,3, HERE << "waiting for request body write to complete");
332 }
333
334 // called when we are done sending request body; kids extend this
335 void
336 ServerStateData::doneSendingRequestBody()
337 {
338 debugs(9,3, HERE << "done sending request body");
339 assert(requestBodySource != NULL);
340 stopConsumingFrom(requestBodySource);
341
342 // kids extend this
343 }
344
345 // called when body producers aborts; kids extend this
346 void
347 ServerStateData::handleRequestBodyProducerAborted()
348 {
349 if (requestSender != NULL)
350 debugs(9,3, HERE << "fyi: request body aborted while we were sending");
351
352 fwd->dontRetry(true); // the problem is not with the server
353 stopConsumingFrom(requestBodySource); // requestSender, if any, will notice
354
355 // kids extend this
356 }
357
358 // called when we wrote request headers(!) or a part of the body
359 void
360 ServerStateData::sentRequestBody(const CommIoCbParams &io)
361 {
362 debugs(11, 5, "sentRequestBody: FD " << io.fd << ": size " << io.size << ": errflag " << io.flag << ".");
363 debugs(32,3,HERE << "sentRequestBody called");
364
365 requestSender = NULL;
366
367 if (io.size > 0) {
368 fd_bytes(io.fd, io.size, FD_WRITE);
369 kb_incr(&(statCounter.server.all.kbytes_out), io.size);
370 // kids should increment their counters
371 }
372
373 if (io.flag == COMM_ERR_CLOSING)
374 return;
375
376 if (!requestBodySource) {
377 debugs(9,3, HERE << "detected while-we-were-sending abort");
378 return; // do nothing;
379 }
380
381 if (io.flag) {
382 debugs(11, 1, "sentRequestBody error: FD " << io.fd << ": " << xstrerr(io.xerrno));
383 ErrorState *err;
384 err = new ErrorState(ERR_WRITE_ERROR, HTTP_BAD_GATEWAY, fwd->request);
385 err->xerrno = io.xerrno;
386 fwd->fail(err);
387 abortTransaction("I/O error while sending request body");
388 return;
389 }
390
391 if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
392 abortTransaction("store entry aborted while sending request body");
393 return;
394 }
395
396 if (!requestBodySource->exhausted())
397 sendMoreRequestBody();
398 else if (receivedWholeRequestBody)
399 doneSendingRequestBody();
400 else
401 debugs(9,3, HERE << "waiting for body production end or abort");
402 }
403
404 void
405 ServerStateData::sendMoreRequestBody()
406 {
407 assert(requestBodySource != NULL);
408 assert(!requestSender);
409
410 const Comm::ConnectionPointer conn = dataConnection();
411
412 if (!Comm::IsConnOpen(conn)) {
413 debugs(9,3, HERE << "cannot send request body to closing " << conn);
414 return; // wait for the kid's close handler; TODO: assert(closer);
415 }
416
417 MemBuf buf;
418 if (getMoreRequestBody(buf) && buf.contentSize() > 0) {
419 debugs(9,3, HERE << "will write " << buf.contentSize() << " request body bytes");
420 typedef CommCbMemFunT<ServerStateData, CommIoCbParams> Dialer;
421 requestSender = JobCallback(93,3, Dialer, this, ServerStateData::sentRequestBody);
422 Comm::Write(conn, &buf, requestSender);
423 } else {
424 debugs(9,3, HERE << "will wait for more request body bytes or eof");
425 requestSender = NULL;
426 }
427 }
428
429 /// either fill buf with available [encoded] request body bytes or return false
430 bool
431 ServerStateData::getMoreRequestBody(MemBuf &buf)
432 {
433 // default implementation does not encode request body content
434 Must(requestBodySource != NULL);
435 return requestBodySource->getMoreData(buf);
436 }
437
438 // Compares hosts in urls, returns false if different, no sheme, or no host.
439 static bool
440 sameUrlHosts(const char *url1, const char *url2)
441 {
442 // XXX: Want urlHostname() here, but it uses static storage and copying
443 const char *host1 = strchr(url1, ':');
444 const char *host2 = strchr(url2, ':');
445
446 if (host1 && host2) {
447 // skip scheme slashes
448 do {
449 ++host1;
450 ++host2;
451 } while (*host1 == '/' && *host2 == '/');
452
453 if (!*host1)
454 return false; // no host
455
456 // increment while the same until we reach the end of the URL/host
457 while (*host1 && *host1 != '/' && *host1 == *host2) {
458 ++host1;
459 ++host2;
460 }
461 return *host1 == *host2;
462 }
463
464 return false; // no URL scheme
465 }
466
467 // purges entries that match the value of a given HTTP [response] header
468 static void
469 purgeEntriesByHeader(HttpRequest *req, const char *reqUrl, HttpMsg *rep, http_hdr_type hdr)
470 {
471 const char *hdrUrl, *absUrl;
472
473 absUrl = NULL;
474 hdrUrl = rep->header.getStr(hdr);
475 if (hdrUrl == NULL) {
476 return;
477 }
478
479 /*
480 * If the URL is relative, make it absolute so we can find it.
481 * If it's absolute, make sure the host parts match to avoid DOS attacks
482 * as per RFC 2616 13.10.
483 */
484 if (urlIsRelative(hdrUrl)) {
485 absUrl = urlMakeAbsolute(req, hdrUrl);
486 if (absUrl != NULL) {
487 hdrUrl = absUrl;
488 }
489 } else if (!sameUrlHosts(reqUrl, hdrUrl)) {
490 return;
491 }
492
493 purgeEntriesByUrl(req, hdrUrl);
494
495 if (absUrl != NULL) {
496 safe_free(absUrl);
497 }
498 }
499
500 // some HTTP methods should purge matching cache entries
501 void
502 ServerStateData::maybePurgeOthers()
503 {
504 // only some HTTP methods should purge matching cache entries
505 if (!request->method.purgesOthers())
506 return;
507
508 // and probably only if the response was successful
509 if (theFinalReply->sline.status >= 400)
510 return;
511
512 // XXX: should we use originalRequest() here?
513 const char *reqUrl = urlCanonical(request);
514 debugs(88, 5, "maybe purging due to " << RequestMethodStr(request->method) << ' ' << reqUrl);
515 purgeEntriesByUrl(request, reqUrl);
516 purgeEntriesByHeader(request, reqUrl, theFinalReply, HDR_LOCATION);
517 purgeEntriesByHeader(request, reqUrl, theFinalReply, HDR_CONTENT_LOCATION);
518 }
519
520 /// called when we have final (possibly adapted) reply headers; kids extend
521 void
522 ServerStateData::haveParsedReplyHeaders()
523 {
524 Must(theFinalReply);
525 maybePurgeOthers();
526 }
527
528 HttpRequest *
529 ServerStateData::originalRequest()
530 {
531 return request;
532 }
533
534 #if USE_ADAPTATION
535 /// Initiate an asynchronous adaptation transaction which will call us back.
536 void
537 ServerStateData::startAdaptation(const Adaptation::ServiceGroupPointer &group, HttpRequest *cause)
538 {
539 debugs(11, 5, "ServerStateData::startAdaptation() called");
540 // check whether we should be sending a body as well
541 // start body pipe to feed ICAP transaction if needed
542 assert(!virginBodyDestination);
543 HttpReply *vrep = virginReply();
544 assert(!vrep->body_pipe);
545 int64_t size = 0;
546 if (vrep->expectingBody(cause->method, size) && size) {
547 virginBodyDestination = new BodyPipe(this);
548 vrep->body_pipe = virginBodyDestination;
549 debugs(93, 6, HERE << "will send virgin reply body to " <<
550 virginBodyDestination << "; size: " << size);
551 if (size > 0)
552 virginBodyDestination->setBodySize(size);
553 }
554
555 adaptedHeadSource = initiateAdaptation(
556 new Adaptation::Iterator(vrep, cause, group));
557 startedAdaptation = initiated(adaptedHeadSource);
558 Must(startedAdaptation);
559 }
560
561 // properly cleans up ICAP-related state
562 // may be called multiple times
563 void ServerStateData::cleanAdaptation()
564 {
565 debugs(11,5, HERE << "cleaning ICAP; ACL: " << adaptationAccessCheckPending);
566
567 if (virginBodyDestination != NULL)
568 stopProducingFor(virginBodyDestination, false);
569
570 announceInitiatorAbort(adaptedHeadSource);
571
572 if (adaptedBodySource != NULL)
573 stopConsumingFrom(adaptedBodySource);
574
575 if (!adaptationAccessCheckPending) // we cannot cancel a pending callback
576 assert(doneWithAdaptation()); // make sure the two methods are in sync
577 }
578
579 bool
580 ServerStateData::doneWithAdaptation() const
581 {
582 return !adaptationAccessCheckPending &&
583 !virginBodyDestination && !adaptedHeadSource && !adaptedBodySource;
584 }
585
586 // sends virgin reply body to ICAP, buffering excesses if needed
587 void
588 ServerStateData::adaptVirginReplyBody(const char *data, ssize_t len)
589 {
590 assert(startedAdaptation);
591
592 if (!virginBodyDestination) {
593 debugs(11,3, HERE << "ICAP does not want more virgin body");
594 return;
595 }
596
597 // grow overflow area if already overflowed
598 if (responseBodyBuffer) {
599 responseBodyBuffer->append(data, len);
600 data = responseBodyBuffer->content();
601 len = responseBodyBuffer->contentSize();
602 }
603
604 const ssize_t putSize = virginBodyDestination->putMoreData(data, len);
605 data += putSize;
606 len -= putSize;
607
608 // if we had overflow area, shrink it as necessary
609 if (responseBodyBuffer) {
610 if (putSize == responseBodyBuffer->contentSize()) {
611 delete responseBodyBuffer;
612 responseBodyBuffer = NULL;
613 } else {
614 responseBodyBuffer->consume(putSize);
615 }
616 return;
617 }
618
619 // if we did not have an overflow area, create it as needed
620 if (len > 0) {
621 assert(!responseBodyBuffer);
622 responseBodyBuffer = new MemBuf;
623 responseBodyBuffer->init(4096, SQUID_TCP_SO_RCVBUF * 10);
624 responseBodyBuffer->append(data, len);
625 }
626 }
627
628 // can supply more virgin response body data
629 void
630 ServerStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer)
631 {
632 if (responseBodyBuffer) {
633 addVirginReplyBody(NULL, 0); // kick the buffered fragment alive again
634 if (completed && !responseBodyBuffer) {
635 serverComplete2();
636 return;
637 }
638 }
639 maybeReadVirginBody();
640 }
641
642 // the consumer of our virgin response body aborted
643 void
644 ServerStateData::noteBodyConsumerAborted(BodyPipe::Pointer)
645 {
646 stopProducingFor(virginBodyDestination, false);
647
648 // do not force closeServer here in case we need to bypass AdaptationQueryAbort
649
650 if (doneWithAdaptation()) // we may still be receiving adapted response
651 handleAdaptationCompleted();
652 }
653
654 // received adapted response headers (body may follow)
655 void
656 ServerStateData::noteAdaptationAnswer(const Adaptation::Answer &answer)
657 {
658 clearAdaptation(adaptedHeadSource); // we do not expect more messages
659
660 switch (answer.kind) {
661 case Adaptation::Answer::akForward:
662 handleAdaptedHeader(answer.message);
663 break;
664
665 case Adaptation::Answer::akBlock:
666 handleAdaptationBlocked(answer);
667 break;
668
669 case Adaptation::Answer::akError:
670 handleAdaptationAborted(!answer.final);
671 break;
672 }
673 }
674
675 void
676 ServerStateData::handleAdaptedHeader(HttpMsg *msg)
677 {
678 if (abortOnBadEntry("entry went bad while waiting for adapted headers")) {
679 // If the adapted response has a body, the ICAP side needs to know
680 // that nobody will consume that body. We will be destroyed upon
681 // return. Tell the ICAP side that it is on its own.
682 HttpReply *rep = dynamic_cast<HttpReply*>(msg);
683 assert(rep);
684 if (rep->body_pipe != NULL)
685 rep->body_pipe->expectNoConsumption();
686
687 return;
688 }
689
690 HttpReply *rep = dynamic_cast<HttpReply*>(msg);
691 assert(rep);
692 debugs(11,5, HERE << this << " setting adapted reply to " << rep);
693 setFinalReply(rep);
694
695 assert(!adaptedBodySource);
696 if (rep->body_pipe != NULL) {
697 // subscribe to receive adapted body
698 adaptedBodySource = rep->body_pipe;
699 // assume that ICAP does not auto-consume on failures
700 assert(adaptedBodySource->setConsumerIfNotLate(this));
701 } else {
702 // no body
703 if (doneWithAdaptation()) // we may still be sending virgin response
704 handleAdaptationCompleted();
705 }
706 }
707
708 void
709 ServerStateData::resumeBodyStorage()
710 {
711 if (abortOnBadEntry("store entry aborted while kick producer callback"))
712 return;
713
714 if (!adaptedBodySource)
715 return;
716
717 handleMoreAdaptedBodyAvailable();
718
719 if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
720 endAdaptedBodyConsumption();
721 }
722
723 // more adapted response body is available
724 void
725 ServerStateData::handleMoreAdaptedBodyAvailable()
726 {
727 if (abortOnBadEntry("entry refuses adapted body"))
728 return;
729
730 assert(entry);
731
732 size_t contentSize = adaptedBodySource->buf().contentSize();
733
734 if (!contentSize)
735 return; // XXX: bytesWanted asserts on zero-size ranges
736
737 // XXX: entry->bytesWanted returns contentSize-1 if entry can accept data.
738 // We have to add 1 to avoid suspending forever.
739 const size_t bytesWanted = entry->bytesWanted(Range<size_t>(0, contentSize));
740 const size_t spaceAvailable = bytesWanted > 0 ? (bytesWanted + 1) : 0;
741
742 if (spaceAvailable < contentSize ) {
743 // No or partial body data consuming
744 typedef NullaryMemFunT<ServerStateData> Dialer;
745 AsyncCall::Pointer call = asyncCall(93, 5, "ServerStateData::resumeBodyStorage",
746 Dialer(this, &ServerStateData::resumeBodyStorage));
747 entry->deferProducer(call);
748 }
749
750 // XXX: bytesWanted API does not allow us to write just one byte!
751 if (!spaceAvailable && contentSize > 1) {
752 debugs(11, 5, HERE << "NOT storing " << contentSize << " bytes of adapted " <<
753 "response body at offset " << adaptedBodySource->consumedSize());
754 return;
755 }
756
757 if (spaceAvailable < contentSize ) {
758 debugs(11, 5, HERE << "postponing storage of " <<
759 (contentSize - spaceAvailable) << " body bytes");
760 contentSize = spaceAvailable;
761 }
762
763 debugs(11,5, HERE << "storing " << contentSize << " bytes of adapted " <<
764 "response body at offset " << adaptedBodySource->consumedSize());
765
766 BodyPipeCheckout bpc(*adaptedBodySource);
767 const StoreIOBuffer ioBuf(&bpc.buf, currentOffset, contentSize);
768 currentOffset += ioBuf.length;
769 entry->write(ioBuf);
770 bpc.buf.consume(contentSize);
771 bpc.checkIn();
772 }
773
774 // the entire adapted response body was produced, successfully
775 void
776 ServerStateData::handleAdaptedBodyProductionEnded()
777 {
778 if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
779 return;
780
781 // end consumption if we consumed everything
782 if (adaptedBodySource != NULL && adaptedBodySource->exhausted())
783 endAdaptedBodyConsumption();
784 // else resumeBodyStorage() will eventually consume the rest
785 }
786
787 void
788 ServerStateData::endAdaptedBodyConsumption()
789 {
790 stopConsumingFrom(adaptedBodySource);
791 handleAdaptationCompleted();
792 }
793
794 // premature end of the adapted response body
795 void ServerStateData::handleAdaptedBodyProducerAborted()
796 {
797 stopConsumingFrom(adaptedBodySource);
798 handleAdaptationAborted();
799 }
800
801 // common part of noteAdaptationAnswer and handleAdaptedBodyProductionEnded
802 void
803 ServerStateData::handleAdaptationCompleted()
804 {
805 debugs(11,5, HERE << "handleAdaptationCompleted");
806 cleanAdaptation();
807
808 // We stop reading origin response because we have no place to put it and
809 // cannot use it. If some origin servers do not like that or if we want to
810 // reuse more pconns, we can add code to discard unneeded origin responses.
811 if (!doneWithServer()) {
812 debugs(11,3, HERE << "closing origin conn due to ICAP completion");
813 closeServer();
814 }
815
816 completeForwarding();
817 }
818
819
820 // common part of noteAdaptation*Aborted and noteBodyConsumerAborted methods
821 void
822 ServerStateData::handleAdaptationAborted(bool bypassable)
823 {
824 debugs(11,5, HERE << "handleAdaptationAborted; bypassable: " << bypassable <<
825 ", entry empty: " << entry->isEmpty());
826
827 if (abortOnBadEntry("entry went bad while ICAP aborted"))
828 return;
829
830 // TODO: bypass if possible
831
832 if (entry->isEmpty()) {
833 debugs(11,9, HERE << "creating ICAP error entry after ICAP failure");
834 ErrorState *err = new ErrorState(ERR_ICAP_FAILURE, HTTP_INTERNAL_SERVER_ERROR, request);
835 err->xerrno = ERR_DETAIL_ICAP_RESPMOD_EARLY;
836 fwd->fail(err);
837 fwd->dontRetry(true);
838 } else if (request) { // update logged info directly
839 request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_ICAP_RESPMOD_LATE);
840 }
841
842 abortTransaction("ICAP failure");
843 }
844
845 // adaptation service wants us to deny HTTP client access to this response
846 void
847 ServerStateData::handleAdaptationBlocked(const Adaptation::Answer &answer)
848 {
849 debugs(11,5, HERE << answer.ruleId);
850
851 if (abortOnBadEntry("entry went bad while ICAP aborted"))
852 return;
853
854 if (!entry->isEmpty()) { // too late to block (should not really happen)
855 if (request)
856 request->detailError(ERR_ICAP_FAILURE, ERR_DETAIL_RESPMOD_BLOCK_LATE);
857 abortTransaction("late adaptation block");
858 return;
859 }
860
861 debugs(11,7, HERE << "creating adaptation block response");
862
863 err_type page_id =
864 aclGetDenyInfoPage(&Config.denyInfoList, answer.ruleId.termedBuf(), 1);
865 if (page_id == ERR_NONE)
866 page_id = ERR_ACCESS_DENIED;
867
868 ErrorState *err = new ErrorState(page_id, HTTP_FORBIDDEN, request);
869 err->xerrno = ERR_DETAIL_RESPMOD_BLOCK_EARLY;
870 fwd->fail(err);
871 fwd->dontRetry(true);
872
873 abortTransaction("timely adaptation block");
874 }
875
876 void
877 ServerStateData::noteAdaptationAclCheckDone(Adaptation::ServiceGroupPointer group)
878 {
879 adaptationAccessCheckPending = false;
880
881 if (abortOnBadEntry("entry went bad while waiting for ICAP ACL check"))
882 return;
883
884 // TODO: Should nonICAP and postICAP path check this on the server-side?
885 // That check now only happens on client-side, in processReplyAccess().
886 if (virginReply()->expectedBodyTooLarge(*request)) {
887 sendBodyIsTooLargeError();
888 return;
889 }
890 // TODO: Should we check receivedBodyTooLarge on the server-side as well?
891
892 if (!group) {
893 debugs(11,3, HERE << "no adapation needed");
894 setFinalReply(virginReply());
895 processReplyBody();
896 return;
897 }
898
899 startAdaptation(group, originalRequest());
900 processReplyBody();
901 }
902 #endif
903
904 void
905 ServerStateData::sendBodyIsTooLargeError()
906 {
907 ErrorState *err = new ErrorState(ERR_TOO_BIG, HTTP_FORBIDDEN, request);
908 err->xerrno = errno;
909 fwd->fail(err);
910 fwd->dontRetry(true);
911 abortTransaction("Virgin body too large.");
912 }
913
914 // TODO: when HttpStateData sends all errors to ICAP,
915 // we should be able to move this at the end of setVirginReply().
916 void
917 ServerStateData::adaptOrFinalizeReply()
918 {
919 #if USE_ADAPTATION
920 // TODO: merge with client side and return void to hide the on/off logic?
921 // The callback can be called with a NULL service if adaptation is off.
922 adaptationAccessCheckPending = Adaptation::AccessCheck::Start(
923 Adaptation::methodRespmod, Adaptation::pointPreCache,
924 originalRequest(), virginReply(), this);
925 debugs(11,5, HERE << "adaptationAccessCheckPending=" << adaptationAccessCheckPending);
926 if (adaptationAccessCheckPending)
927 return;
928 #endif
929
930 setFinalReply(virginReply());
931 }
932
933 /// initializes bodyBytesRead stats if needed and applies delta
934 void
935 ServerStateData::adjustBodyBytesRead(const int64_t delta)
936 {
937 int64_t &bodyBytesRead = originalRequest()->hier.bodyBytesRead;
938
939 // if we got here, do not log a dash even if we got nothing from the server
940 if (bodyBytesRead < 0)
941 bodyBytesRead = 0;
942
943 bodyBytesRead += delta; // supports negative and zero deltas
944
945 // check for overflows ("infinite" response?) and undeflows (a bug)
946 Must(bodyBytesRead >= 0);
947 }
948
949 void
950 ServerStateData::addVirginReplyBody(const char *data, ssize_t len)
951 {
952 adjustBodyBytesRead(len);
953
954 #if USE_ADAPTATION
955 assert(!adaptationAccessCheckPending); // or would need to buffer while waiting
956 if (startedAdaptation) {
957 adaptVirginReplyBody(data, len);
958 return;
959 }
960 #endif
961 storeReplyBody(data, len);
962 }
963
964 // writes virgin or adapted reply body to store
965 void
966 ServerStateData::storeReplyBody(const char *data, ssize_t len)
967 {
968 // write even if len is zero to push headers towards the client side
969 entry->write (StoreIOBuffer(len, currentOffset, (char*)data));
970
971 currentOffset += len;
972 }
973
974 size_t ServerStateData::replyBodySpace(const MemBuf &readBuf,
975 const size_t minSpace) const
976 {
977 size_t space = readBuf.spaceSize(); // available space w/o heroic measures
978 if (space < minSpace) {
979 const size_t maxSpace = readBuf.potentialSpaceSize(); // absolute best
980 space = min(minSpace, maxSpace); // do not promise more than asked
981 }
982
983 #if USE_ADAPTATION
984 if (responseBodyBuffer) {
985 return 0; // Stop reading if already overflowed waiting for ICAP to catch up
986 }
987
988 if (virginBodyDestination != NULL) {
989 /*
990 * BodyPipe buffer has a finite size limit. We
991 * should not read more data from the network than will fit
992 * into the pipe buffer or we _lose_ what did not fit if
993 * the response ends sooner that BodyPipe frees up space:
994 * There is no code to keep pumping data into the pipe once
995 * response ends and serverComplete() is called.
996 *
997 * If the pipe is totally full, don't register the read handler.
998 * The BodyPipe will call our noteMoreBodySpaceAvailable() method
999 * when it has free space again.
1000 */
1001 size_t adaptation_space =
1002 virginBodyDestination->buf().potentialSpaceSize();
1003
1004 debugs(11,9, "ServerStateData may read up to min(" <<
1005 adaptation_space << ", " << space << ") bytes");
1006
1007 if (adaptation_space < space)
1008 space = adaptation_space;
1009 }
1010 #endif
1011
1012 return space;
1013 }