5 * AUTHOR: Duane Wessels
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
36 #include "acl/Gadgets.h"
37 #include "base/TextException.h"
38 #include "comm/Write.h"
41 #include "fde.h" /* for fd_table[fd].closing */
42 #include "HttpRequest.h"
43 #include "HttpReply.h"
44 #include "errorpage.h"
45 #include "err_detail_type.h"
46 #include "SquidTime.h"
49 #include "adaptation/AccessCheck.h"
50 #include "adaptation/Answer.h"
51 #include "adaptation/Iterator.h"
54 // implemented in client_side_reply.cc until sides have a common parent
55 extern void purgeEntriesByUrl(HttpRequest
* req
, const char *url
);
58 ServerStateData::ServerStateData(FwdState
*theFwdState
): AsyncJob("ServerStateData"),
61 adaptedHeadSource(NULL
),
62 adaptationAccessCheckPending(false),
63 startedAdaptation(false),
65 receivedWholeRequestBody(false)
72 request
= HTTPMSGLOCK(fwd
->request
);
75 ServerStateData::~ServerStateData()
77 // paranoid: check that swanSong has been called
78 assert(!requestBodySource
);
80 assert(!virginBodyDestination
);
81 assert(!adaptedBodySource
);
86 HTTPMSGUNLOCK(request
);
87 HTTPMSGUNLOCK(theVirginReply
);
88 HTTPMSGUNLOCK(theFinalReply
);
90 fwd
= NULL
; // refcounted
92 if (responseBodyBuffer
!= NULL
) {
93 delete responseBodyBuffer
;
94 responseBodyBuffer
= NULL
;
99 ServerStateData::swanSong()
101 // get rid of our piping obligations
102 if (requestBodySource
!= NULL
)
103 stopConsumingFrom(requestBodySource
);
109 BodyConsumer::swanSong();
111 Initiator::swanSong();
112 BodyProducer::swanSong();
115 // paranoid: check that swanSong has been called
116 // extra paranoid: yeah, I really mean it. they MUST pass here.
117 assert(!requestBodySource
);
119 assert(!virginBodyDestination
);
120 assert(!adaptedBodySource
);
126 ServerStateData::virginReply()
128 assert(theVirginReply
);
129 return theVirginReply
;
133 ServerStateData::virginReply() const
135 assert(theVirginReply
);
136 return theVirginReply
;
140 ServerStateData::setVirginReply(HttpReply
*rep
)
142 debugs(11,5, HERE
<< this << " setting virgin reply to " << rep
);
143 assert(!theVirginReply
);
145 theVirginReply
= HTTPMSGLOCK(rep
);
146 return theVirginReply
;
150 ServerStateData::finalReply()
152 assert(theFinalReply
);
153 return theFinalReply
;
157 ServerStateData::setFinalReply(HttpReply
*rep
)
159 debugs(11,5, HERE
<< this << " setting final reply to " << rep
);
161 assert(!theFinalReply
);
163 theFinalReply
= HTTPMSGLOCK(rep
);
165 // give entry the reply because haveParsedReplyHeaders() expects it there
166 entry
->replaceHttpReply(theFinalReply
, false); // but do not write yet
167 haveParsedReplyHeaders(); // update the entry/reply (e.g., set timestamps)
168 entry
->startWriting(); // write the updated entry to store
170 return theFinalReply
;
173 // called when no more server communication is expected; may quit
175 ServerStateData::serverComplete()
177 debugs(11,5,HERE
<< "serverComplete " << this);
179 if (!doneWithServer()) {
181 assert(doneWithServer());
186 HttpRequest
*r
= originalRequest();
187 r
->hier
.total_response_time
= r
->hier
.first_conn_start
.tv_sec
?
188 tvSubMsec(r
->hier
.first_conn_start
, current_time
) : -1;
190 if (requestBodySource
!= NULL
)
191 stopConsumingFrom(requestBodySource
);
193 if (responseBodyBuffer
!= NULL
)
200 ServerStateData::serverComplete2()
202 debugs(11,5,HERE
<< "serverComplete2 " << this);
205 if (virginBodyDestination
!= NULL
)
206 stopProducingFor(virginBodyDestination
, true);
208 if (!doneWithAdaptation())
212 completeForwarding();
216 // When we are done talking to the primary server, we may be still talking
217 // to the ICAP service. And vice versa. Here, we quit only if we are done
219 void ServerStateData::quitIfAllDone()
222 if (!doneWithAdaptation()) {
223 debugs(11,5, HERE
<< "transaction not done: still talking to ICAP");
228 if (!doneWithServer()) {
229 debugs(11,5, HERE
<< "transaction not done: still talking to server");
233 debugs(11,3, HERE
<< "transaction done");
235 deleteThis("ServerStateData::quitIfAllDone");
238 // FTP side overloads this to work around multiple calls to fwd->complete
240 ServerStateData::completeForwarding()
242 debugs(11,5, HERE
<< "completing forwarding for " << fwd
);
247 // Register to receive request body
248 bool ServerStateData::startRequestBodyFlow()
250 HttpRequest
*r
= originalRequest();
251 assert(r
->body_pipe
!= NULL
);
252 requestBodySource
= r
->body_pipe
;
253 if (requestBodySource
->setConsumerIfNotLate(this)) {
254 debugs(11,3, HERE
<< "expecting request body from " <<
255 requestBodySource
->status());
259 debugs(11,3, HERE
<< "aborting on partially consumed request body: " <<
260 requestBodySource
->status());
261 requestBodySource
= NULL
;
265 // Entry-dependent callbacks use this check to quit if the entry went bad
267 ServerStateData::abortOnBadEntry(const char *abortReason
)
269 if (entry
->isAccepting())
272 debugs(11,5, HERE
<< "entry is not Accepting!");
273 abortTransaction(abortReason
);
277 // more request or adapted response body is available
279 ServerStateData::noteMoreBodyDataAvailable(BodyPipe::Pointer bp
)
282 if (adaptedBodySource
== bp
) {
283 handleMoreAdaptedBodyAvailable();
287 handleMoreRequestBodyAvailable();
290 // the entire request or adapted response body was provided, successfully
292 ServerStateData::noteBodyProductionEnded(BodyPipe::Pointer bp
)
295 if (adaptedBodySource
== bp
) {
296 handleAdaptedBodyProductionEnded();
300 handleRequestBodyProductionEnded();
303 // premature end of the request or adapted response body production
305 ServerStateData::noteBodyProducerAborted(BodyPipe::Pointer bp
)
308 if (adaptedBodySource
== bp
) {
309 handleAdaptedBodyProducerAborted();
313 handleRequestBodyProducerAborted();
317 // more origin request body data is available
319 ServerStateData::handleMoreRequestBodyAvailable()
322 sendMoreRequestBody();
324 debugs(9,3, HERE
<< "waiting for request body write to complete");
327 // there will be no more handleMoreRequestBodyAvailable calls
329 ServerStateData::handleRequestBodyProductionEnded()
331 receivedWholeRequestBody
= true;
333 doneSendingRequestBody();
335 debugs(9,3, HERE
<< "waiting for request body write to complete");
338 // called when we are done sending request body; kids extend this
340 ServerStateData::doneSendingRequestBody()
342 debugs(9,3, HERE
<< "done sending request body");
343 assert(requestBodySource
!= NULL
);
344 stopConsumingFrom(requestBodySource
);
349 // called when body producers aborts; kids extend this
351 ServerStateData::handleRequestBodyProducerAborted()
353 if (requestSender
!= NULL
)
354 debugs(9,3, HERE
<< "fyi: request body aborted while we were sending");
356 fwd
->dontRetry(true); // the problem is not with the server
357 stopConsumingFrom(requestBodySource
); // requestSender, if any, will notice
362 // called when we wrote request headers(!) or a part of the body
364 ServerStateData::sentRequestBody(const CommIoCbParams
&io
)
366 debugs(11, 5, "sentRequestBody: FD " << io
.fd
<< ": size " << io
.size
<< ": errflag " << io
.flag
<< ".");
367 debugs(32,3,HERE
<< "sentRequestBody called");
369 requestSender
= NULL
;
372 fd_bytes(io
.fd
, io
.size
, FD_WRITE
);
373 kb_incr(&statCounter
.server
.all
.kbytes_out
, io
.size
);
374 // kids should increment their counters
377 if (io
.flag
== COMM_ERR_CLOSING
)
380 if (!requestBodySource
) {
381 debugs(9,3, HERE
<< "detected while-we-were-sending abort");
382 return; // do nothing;
386 debugs(11, 1, "sentRequestBody error: FD " << io
.fd
<< ": " << xstrerr(io
.xerrno
));
388 err
= errorCon(ERR_WRITE_ERROR
, HTTP_BAD_GATEWAY
, fwd
->request
);
389 err
->xerrno
= io
.xerrno
;
391 abortTransaction("I/O error while sending request body");
395 if (EBIT_TEST(entry
->flags
, ENTRY_ABORTED
)) {
396 abortTransaction("store entry aborted while sending request body");
400 if (!requestBodySource
->exhausted())
401 sendMoreRequestBody();
402 else if (receivedWholeRequestBody
)
403 doneSendingRequestBody();
405 debugs(9,3, HERE
<< "waiting for body production end or abort");
409 ServerStateData::canSend(int fd
) const
411 return fd
>= 0 && !fd_table
[fd
].closing();
415 ServerStateData::sendMoreRequestBody()
417 assert(requestBodySource
!= NULL
);
418 assert(!requestSender
);
420 const int fd
= dataDescriptor();
423 debugs(9,3, HERE
<< "cannot send request body to closing FD " << fd
);
424 return; // wait for the kid's close handler; TODO: assert(closer);
428 if (getMoreRequestBody(buf
) && buf
.contentSize() > 0) {
429 debugs(9,3, HERE
<< "will write " << buf
.contentSize() << " request body bytes");
430 typedef CommCbMemFunT
<ServerStateData
, CommIoCbParams
> Dialer
;
431 requestSender
= JobCallback(93,3,
432 Dialer
, this, ServerStateData::sentRequestBody
);
433 Comm::Write(fd
, &buf
, requestSender
);
435 debugs(9,3, HERE
<< "will wait for more request body bytes or eof");
436 requestSender
= NULL
;
440 /// either fill buf with available [encoded] request body bytes or return false
442 ServerStateData::getMoreRequestBody(MemBuf
&buf
)
444 // default implementation does not encode request body content
445 Must(requestBodySource
!= NULL
);
446 return requestBodySource
->getMoreData(buf
);
449 // Compares hosts in urls, returns false if different, no sheme, or no host.
451 sameUrlHosts(const char *url1
, const char *url2
)
453 // XXX: Want urlHostname() here, but it uses static storage and copying
454 const char *host1
= strchr(url1
, ':');
455 const char *host2
= strchr(url2
, ':');
457 if (host1
&& host2
) {
458 // skip scheme slashes
462 } while (*host1
== '/' && *host2
== '/');
465 return false; // no host
467 // increment while the same until we reach the end of the URL/host
468 while (*host1
&& *host1
!= '/' && *host1
== *host2
) {
472 return *host1
== *host2
;
475 return false; // no URL scheme
478 // purges entries that match the value of a given HTTP [response] header
480 purgeEntriesByHeader(HttpRequest
*req
, const char *reqUrl
, HttpMsg
*rep
, http_hdr_type hdr
)
482 const char *hdrUrl
, *absUrl
;
485 hdrUrl
= rep
->header
.getStr(hdr
);
486 if (hdrUrl
== NULL
) {
491 * If the URL is relative, make it absolute so we can find it.
492 * If it's absolute, make sure the host parts match to avoid DOS attacks
493 * as per RFC 2616 13.10.
495 if (urlIsRelative(hdrUrl
)) {
496 absUrl
= urlMakeAbsolute(req
, hdrUrl
);
497 if (absUrl
!= NULL
) {
500 } else if (!sameUrlHosts(reqUrl
, hdrUrl
)) {
504 purgeEntriesByUrl(req
, hdrUrl
);
506 if (absUrl
!= NULL
) {
511 // some HTTP methods should purge matching cache entries
513 ServerStateData::maybePurgeOthers()
515 // only some HTTP methods should purge matching cache entries
516 if (!request
->method
.purgesOthers())
519 // and probably only if the response was successful
520 if (theFinalReply
->sline
.status
>= 400)
523 // XXX: should we use originalRequest() here?
524 const char *reqUrl
= urlCanonical(request
);
525 debugs(88, 5, "maybe purging due to " << RequestMethodStr(request
->method
) << ' ' << reqUrl
);
526 purgeEntriesByUrl(request
, reqUrl
);
527 purgeEntriesByHeader(request
, reqUrl
, theFinalReply
, HDR_LOCATION
);
528 purgeEntriesByHeader(request
, reqUrl
, theFinalReply
, HDR_CONTENT_LOCATION
);
531 /// called when we have final (possibly adapted) reply headers; kids extend
533 ServerStateData::haveParsedReplyHeaders()
540 ServerStateData::originalRequest()
546 /// Initiate an asynchronous adaptation transaction which will call us back.
548 ServerStateData::startAdaptation(const Adaptation::ServiceGroupPointer
&group
, HttpRequest
*cause
)
550 debugs(11, 5, "ServerStateData::startAdaptation() called");
551 // check whether we should be sending a body as well
552 // start body pipe to feed ICAP transaction if needed
553 assert(!virginBodyDestination
);
554 HttpReply
*vrep
= virginReply();
555 assert(!vrep
->body_pipe
);
557 if (vrep
->expectingBody(cause
->method
, size
) && size
) {
558 virginBodyDestination
= new BodyPipe(this);
559 vrep
->body_pipe
= virginBodyDestination
;
560 debugs(93, 6, HERE
<< "will send virgin reply body to " <<
561 virginBodyDestination
<< "; size: " << size
);
563 virginBodyDestination
->setBodySize(size
);
566 adaptedHeadSource
= initiateAdaptation(
567 new Adaptation::Iterator(vrep
, cause
, group
));
568 startedAdaptation
= initiated(adaptedHeadSource
);
569 Must(startedAdaptation
);
572 // properly cleans up ICAP-related state
573 // may be called multiple times
574 void ServerStateData::cleanAdaptation()
576 debugs(11,5, HERE
<< "cleaning ICAP; ACL: " << adaptationAccessCheckPending
);
578 if (virginBodyDestination
!= NULL
)
579 stopProducingFor(virginBodyDestination
, false);
581 announceInitiatorAbort(adaptedHeadSource
);
583 if (adaptedBodySource
!= NULL
)
584 stopConsumingFrom(adaptedBodySource
);
586 if (!adaptationAccessCheckPending
) // we cannot cancel a pending callback
587 assert(doneWithAdaptation()); // make sure the two methods are in sync
591 ServerStateData::doneWithAdaptation() const
593 return !adaptationAccessCheckPending
&&
594 !virginBodyDestination
&& !adaptedHeadSource
&& !adaptedBodySource
;
597 // sends virgin reply body to ICAP, buffering excesses if needed
599 ServerStateData::adaptVirginReplyBody(const char *data
, ssize_t len
)
601 assert(startedAdaptation
);
603 if (!virginBodyDestination
) {
604 debugs(11,3, HERE
<< "ICAP does not want more virgin body");
608 // grow overflow area if already overflowed
609 if (responseBodyBuffer
) {
610 responseBodyBuffer
->append(data
, len
);
611 data
= responseBodyBuffer
->content();
612 len
= responseBodyBuffer
->contentSize();
615 const ssize_t putSize
= virginBodyDestination
->putMoreData(data
, len
);
619 // if we had overflow area, shrink it as necessary
620 if (responseBodyBuffer
) {
621 if (putSize
== responseBodyBuffer
->contentSize()) {
622 delete responseBodyBuffer
;
623 responseBodyBuffer
= NULL
;
625 responseBodyBuffer
->consume(putSize
);
630 // if we did not have an overflow area, create it as needed
632 assert(!responseBodyBuffer
);
633 responseBodyBuffer
= new MemBuf
;
634 responseBodyBuffer
->init(4096, SQUID_TCP_SO_RCVBUF
* 10);
635 responseBodyBuffer
->append(data
, len
);
639 // can supply more virgin response body data
641 ServerStateData::noteMoreBodySpaceAvailable(BodyPipe::Pointer
)
643 if (responseBodyBuffer
) {
644 addVirginReplyBody(NULL
, 0); // kick the buffered fragment alive again
645 if (completed
&& !responseBodyBuffer
) {
650 maybeReadVirginBody();
653 // the consumer of our virgin response body aborted
655 ServerStateData::noteBodyConsumerAborted(BodyPipe::Pointer
)
657 stopProducingFor(virginBodyDestination
, false);
659 // do not force closeServer here in case we need to bypass AdaptationQueryAbort
661 if (doneWithAdaptation()) // we may still be receiving adapted response
662 handleAdaptationCompleted();
665 // received adapted response headers (body may follow)
667 ServerStateData::noteAdaptationAnswer(const Adaptation::Answer
&answer
)
669 clearAdaptation(adaptedHeadSource
); // we do not expect more messages
671 switch (answer
.kind
) {
672 case Adaptation::Answer::akForward
:
673 handleAdaptedHeader(answer
.message
);
676 case Adaptation::Answer::akBlock
:
677 handleAdaptationBlocked(answer
);
680 case Adaptation::Answer::akError
:
681 handleAdaptationAborted(!answer
.final
);
687 ServerStateData::handleAdaptedHeader(HttpMsg
*msg
)
689 if (abortOnBadEntry("entry went bad while waiting for adapted headers"))
692 HttpReply
*rep
= dynamic_cast<HttpReply
*>(msg
);
694 debugs(11,5, HERE
<< this << " setting adapted reply to " << rep
);
697 assert(!adaptedBodySource
);
698 if (rep
->body_pipe
!= NULL
) {
699 // subscribe to receive adapted body
700 adaptedBodySource
= rep
->body_pipe
;
701 // assume that ICAP does not auto-consume on failures
702 assert(adaptedBodySource
->setConsumerIfNotLate(this));
705 if (doneWithAdaptation()) // we may still be sending virgin response
706 handleAdaptationCompleted();
710 // more adapted response body is available
712 ServerStateData::handleMoreAdaptedBodyAvailable()
714 const size_t contentSize
= adaptedBodySource
->buf().contentSize();
716 debugs(11,5, HERE
<< "consuming " << contentSize
<< " bytes of adapted " <<
717 "response body at offset " << adaptedBodySource
->consumedSize());
719 if (abortOnBadEntry("entry refuses adapted body"))
723 BodyPipeCheckout
bpc(*adaptedBodySource
);
724 const StoreIOBuffer
ioBuf(&bpc
.buf
, currentOffset
);
725 currentOffset
+= bpc
.buf
.size
;
727 bpc
.buf
.consume(contentSize
);
731 // the entire adapted response body was produced, successfully
733 ServerStateData::handleAdaptedBodyProductionEnded()
735 stopConsumingFrom(adaptedBodySource
);
737 if (abortOnBadEntry("entry went bad while waiting for adapted body eof"))
740 handleAdaptationCompleted();
743 // premature end of the adapted response body
744 void ServerStateData::handleAdaptedBodyProducerAborted()
746 stopConsumingFrom(adaptedBodySource
);
747 handleAdaptationAborted();
750 // common part of noteAdaptationAnswer and handleAdaptedBodyProductionEnded
752 ServerStateData::handleAdaptationCompleted()
754 debugs(11,5, HERE
<< "handleAdaptationCompleted");
757 // We stop reading origin response because we have no place to put it and
758 // cannot use it. If some origin servers do not like that or if we want to
759 // reuse more pconns, we can add code to discard unneeded origin responses.
760 if (!doneWithServer()) {
761 debugs(11,3, HERE
<< "closing origin conn due to ICAP completion");
765 completeForwarding();
770 // common part of noteAdaptation*Aborted and noteBodyConsumerAborted methods
772 ServerStateData::handleAdaptationAborted(bool bypassable
)
774 debugs(11,5, HERE
<< "handleAdaptationAborted; bypassable: " << bypassable
<<
775 ", entry empty: " << entry
->isEmpty());
777 if (abortOnBadEntry("entry went bad while ICAP aborted"))
780 // TODO: bypass if possible
782 if (entry
->isEmpty()) {
783 debugs(11,9, HERE
<< "creating ICAP error entry after ICAP failure");
784 ErrorState
*err
= errorCon(ERR_ICAP_FAILURE
, HTTP_INTERNAL_SERVER_ERROR
, request
);
785 err
->xerrno
= ERR_DETAIL_ICAP_RESPMOD_EARLY
;
787 fwd
->dontRetry(true);
788 } else if (request
) { // update logged info directly
789 request
->detailError(ERR_ICAP_FAILURE
, ERR_DETAIL_ICAP_RESPMOD_LATE
);
792 abortTransaction("ICAP failure");
795 // adaptation service wants us to deny HTTP client access to this response
797 ServerStateData::handleAdaptationBlocked(const Adaptation::Answer
&answer
)
799 debugs(11,5, HERE
<< answer
.ruleId
);
801 if (abortOnBadEntry("entry went bad while ICAP aborted"))
804 if (!entry
->isEmpty()) { // too late to block (should not really happen)
806 request
->detailError(ERR_ICAP_FAILURE
, ERR_DETAIL_RESPMOD_BLOCK_LATE
);
807 abortTransaction("late adaptation block");
811 debugs(11,7, HERE
<< "creating adaptation block response");
814 aclGetDenyInfoPage(&Config
.denyInfoList
, answer
.ruleId
.termedBuf(), 1);
815 if (page_id
== ERR_NONE
)
816 page_id
= ERR_ACCESS_DENIED
;
818 ErrorState
*err
= errorCon(page_id
, HTTP_FORBIDDEN
, request
);
819 err
->xerrno
= ERR_DETAIL_RESPMOD_BLOCK_EARLY
;
821 fwd
->dontRetry(true);
823 abortTransaction("timely adaptation block");
827 ServerStateData::adaptationAclCheckDone(Adaptation::ServiceGroupPointer group
)
829 adaptationAccessCheckPending
= false;
831 if (abortOnBadEntry("entry went bad while waiting for ICAP ACL check"))
834 // TODO: Should nonICAP and postICAP path check this on the server-side?
835 // That check now only happens on client-side, in processReplyAccess().
836 if (virginReply()->expectedBodyTooLarge(*request
)) {
837 sendBodyIsTooLargeError();
840 // TODO: Should we check receivedBodyTooLarge on the server-side as well?
843 debugs(11,3, HERE
<< "no adapation needed");
844 setFinalReply(virginReply());
849 startAdaptation(group
, originalRequest());
854 ServerStateData::adaptationAclCheckDoneWrapper(Adaptation::ServiceGroupPointer group
, void *data
)
856 ServerStateData
*state
= (ServerStateData
*)data
;
857 state
->adaptationAclCheckDone(group
);
862 ServerStateData::sendBodyIsTooLargeError()
864 ErrorState
*err
= errorCon(ERR_TOO_BIG
, HTTP_FORBIDDEN
, request
);
867 fwd
->dontRetry(true);
868 abortTransaction("Virgin body too large.");
871 // TODO: when HttpStateData sends all errors to ICAP,
872 // we should be able to move this at the end of setVirginReply().
874 ServerStateData::adaptOrFinalizeReply()
877 // TODO: merge with client side and return void to hide the on/off logic?
878 // The callback can be called with a NULL service if adaptation is off.
879 adaptationAccessCheckPending
= Adaptation::AccessCheck::Start(
880 Adaptation::methodRespmod
, Adaptation::pointPreCache
,
881 originalRequest(), virginReply(), adaptationAclCheckDoneWrapper
, this);
882 debugs(11,5, HERE
<< "adaptationAccessCheckPending=" << adaptationAccessCheckPending
);
883 if (adaptationAccessCheckPending
)
887 setFinalReply(virginReply());
890 /// initializes bodyBytesRead stats if needed and applies delta
892 ServerStateData::adjustBodyBytesRead(const int64_t delta
)
894 int64_t &bodyBytesRead
= originalRequest()->hier
.bodyBytesRead
;
896 // if we got here, do not log a dash even if we got nothing from the server
897 if (bodyBytesRead
< 0)
900 bodyBytesRead
+= delta
; // supports negative and zero deltas
902 // check for overflows ("infinite" response?) and undeflows (a bug)
903 Must(bodyBytesRead
>= 0);
907 ServerStateData::addVirginReplyBody(const char *data
, ssize_t len
)
909 adjustBodyBytesRead(len
);
912 assert(!adaptationAccessCheckPending
); // or would need to buffer while waiting
913 if (startedAdaptation
) {
914 adaptVirginReplyBody(data
, len
);
918 storeReplyBody(data
, len
);
921 // writes virgin or adapted reply body to store
923 ServerStateData::storeReplyBody(const char *data
, ssize_t len
)
925 // write even if len is zero to push headers towards the client side
926 entry
->write (StoreIOBuffer(len
, currentOffset
, (char*)data
));
928 currentOffset
+= len
;
931 size_t ServerStateData::replyBodySpace(const MemBuf
&readBuf
,
932 const size_t minSpace
) const
934 size_t space
= readBuf
.spaceSize(); // available space w/o heroic measures
935 if (space
< minSpace
) {
936 const size_t maxSpace
= readBuf
.potentialSpaceSize(); // absolute best
937 space
= min(minSpace
, maxSpace
); // do not promise more than asked
941 if (responseBodyBuffer
) {
942 return 0; // Stop reading if already overflowed waiting for ICAP to catch up
945 if (virginBodyDestination
!= NULL
) {
947 * BodyPipe buffer has a finite size limit. We
948 * should not read more data from the network than will fit
949 * into the pipe buffer or we _lose_ what did not fit if
950 * the response ends sooner that BodyPipe frees up space:
951 * There is no code to keep pumping data into the pipe once
952 * response ends and serverComplete() is called.
954 * If the pipe is totally full, don't register the read handler.
955 * The BodyPipe will call our noteMoreBodySpaceAvailable() method
956 * when it has free space again.
958 size_t adaptation_space
=
959 virginBodyDestination
->buf().potentialSpaceSize();
961 debugs(11,9, "ServerStateData may read up to min(" <<
962 adaptation_space
<< ", " << space
<< ") bytes");
964 if (adaptation_space
< space
)
965 space
= adaptation_space
;