2 * Copyright (C) 1996-2019 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
10 #include "client_side_request.h"
11 #include "http/Stream.h"
12 #include "HttpHdrContRange.h"
13 #include "HttpHeaderTools.h"
15 #include "TimeOrTag.h"
17 #include "acl/FilledChecklist.h"
18 #include "ClientInfo.h"
20 #include "MessageDelayPools.h"
23 Http::Stream::Stream(const Comm::ConnectionPointer
&aConn
, ClientHttpRequest
*aReq
) :
24 clientConnection(aConn
),
28 mayUseConnection_(false),
29 connRegistered_(false)
31 assert(http
!= nullptr);
32 memset(reqbuf
, '\0', sizeof (reqbuf
));
35 deferredparams
.node
= nullptr;
36 deferredparams
.rep
= nullptr;
39 Http::Stream::~Stream()
41 if (auto node
= getTail()) {
42 if (auto ctx
= dynamic_cast<Http::Stream
*>(node
->data
.getRaw())) {
43 /* We are *always* the tail - prevent recursive free */
48 httpRequestFree(http
);
52 Http::Stream::registerWithConn()
54 assert(!connRegistered_
);
56 connRegistered_
= true;
57 getConn()->pipeline
.add(Http::StreamPointer(this));
61 Http::Stream::startOfOutput() const
63 return http
->out
.size
== 0;
67 Http::Stream::writeComplete(size_t size
)
69 const StoreEntry
*entry
= http
->storeEntry();
70 debugs(33, 5, clientConnection
<< ", sz " << size
<<
71 ", off " << (http
->out
.size
+ size
) << ", len " <<
72 (entry
? entry
->objectLen() : 0));
74 http
->out
.size
+= size
;
76 if (clientHttpRequestStatus(clientConnection
->fd
, http
)) {
77 initiateClose("failure or true request status");
78 /* Do we leak here ? */
82 switch (socketState()) {
88 case STREAM_COMPLETE
: {
89 debugs(33, 5, clientConnection
<< " Stream complete, keepalive is " <<
90 http
->request
->flags
.proxyKeepalive
);
91 // XXX: This code assumes we are done with the transaction, but we may
92 // still be receiving request body. TODO: Extend stopSending() instead.
93 ConnStateData
*c
= getConn();
94 if (!http
->request
->flags
.proxyKeepalive
)
95 clientConnection
->close();
101 case STREAM_UNPLANNED_COMPLETE
:
102 initiateClose("STREAM_UNPLANNED_COMPLETE");
106 initiateClose("STREAM_FAILED");
110 fatal("Hit unreachable code in Http::Stream::writeComplete\n");
115 Http::Stream::pullData()
117 debugs(33, 5, reply
<< " written " << http
->out
.size
<< " into " << clientConnection
);
119 /* More data will be coming from the stream. */
120 StoreIOBuffer readBuffer
;
121 /* XXX: Next requested byte in the range sequence */
122 /* XXX: length = getmaximumrangelenfgth */
123 readBuffer
.offset
= getNextRangeOffset();
124 readBuffer
.length
= HTTP_REQBUF_SZ
;
125 readBuffer
.data
= reqbuf
;
126 /* we may note we have reached the end of the wanted ranges */
127 clientStreamRead(getTail(), http
, readBuffer
);
131 Http::Stream::multipartRangeRequest() const
133 return http
->multipartRangeRequest();
137 Http::Stream::getNextRangeOffset() const
139 debugs (33, 5, "range: " << http
->request
->range
<<
140 "; http offset " << http
->out
.offset
<<
141 "; reply " << reply
);
143 // XXX: This method is called from many places, including pullData() which
144 // may be called before prepareReply() [on some Squid-generated errors].
145 // Hence, we may not even know yet whether we should honor/do ranges.
147 if (http
->request
->range
) {
148 /* offset in range specs does not count the prefix of an http msg */
149 /* check: reply was parsed and range iterator was initialized */
150 assert(http
->range_iter
.valid
);
151 /* filter out data according to range specs */
152 assert(canPackMoreRanges());
154 assert(http
->range_iter
.currentSpec());
155 /* offset of still missing data */
156 int64_t start
= http
->range_iter
.currentSpec()->offset
+
157 http
->range_iter
.currentSpec()->length
-
158 http
->range_iter
.debt();
159 debugs(33, 3, "clientPackMoreRanges: in: offset: " << http
->out
.offset
);
160 debugs(33, 3, "clientPackMoreRanges: out:"
161 " start: " << start
<<
162 " spec[" << http
->range_iter
.pos
- http
->request
->range
->begin() << "]:" <<
163 " [" << http
->range_iter
.currentSpec()->offset
<<
164 ", " << http
->range_iter
.currentSpec()->offset
+
165 http
->range_iter
.currentSpec()->length
<< "),"
166 " len: " << http
->range_iter
.currentSpec()->length
<<
167 " debt: " << http
->range_iter
.debt());
168 if (http
->range_iter
.currentSpec()->length
!= -1)
169 assert(http
->out
.offset
<= start
); /* we did not miss it */
174 } else if (reply
&& reply
->contentRange()) {
175 /* request does not have ranges, but reply does */
176 /** \todo FIXME: should use range_iter_pos on reply, as soon as reply->content_range
177 * becomes HttpHdrRange rather than HttpHdrRangeSpec.
179 return http
->out
.offset
+ reply
->contentRange()->spec
.offset
;
182 return http
->out
.offset
;
186 * increments iterator "i"
187 * used by clientPackMoreRanges
189 * \retval true there is still data available to pack more ranges
193 Http::Stream::canPackMoreRanges() const
195 /** first update iterator "i" if needed */
196 if (!http
->range_iter
.debt()) {
197 debugs(33, 5, "At end of current range spec for " << clientConnection
);
199 if (http
->range_iter
.pos
!= http
->range_iter
.end
)
200 ++http
->range_iter
.pos
;
202 http
->range_iter
.updateSpec();
205 assert(!http
->range_iter
.debt() == !http
->range_iter
.currentSpec());
207 /* paranoid sync condition */
208 /* continue condition: need_more_data */
209 debugs(33, 5, "returning " << (http
->range_iter
.currentSpec() ? true : false));
210 return http
->range_iter
.currentSpec() ? true : false;
213 /// Adapt stream status to account for Range cases
214 clientStream_status_t
215 Http::Stream::socketState()
217 switch (clientStreamStatus(getTail(), http
)) {
220 /* check for range support ending */
221 if (http
->request
->range
) {
222 /* check: reply was parsed and range iterator was initialized */
223 assert(http
->range_iter
.valid
);
224 /* filter out data according to range specs */
226 if (!canPackMoreRanges()) {
227 debugs(33, 5, "Range request at end of returnable " <<
228 "range sequence on " << clientConnection
);
229 // we got everything we wanted from the store
230 return STREAM_COMPLETE
;
232 } else if (reply
&& reply
->contentRange()) {
233 /* reply has content-range, but Squid is not managing ranges */
234 const int64_t &bytesSent
= http
->out
.offset
;
235 const int64_t &bytesExpected
= reply
->contentRange()->spec
.length
;
237 debugs(33, 7, "body bytes sent vs. expected: " <<
238 bytesSent
<< " ? " << bytesExpected
<< " (+" <<
239 reply
->contentRange()->spec
.offset
<< ")");
241 // did we get at least what we expected, based on range specs?
243 if (bytesSent
== bytesExpected
) // got everything
244 return STREAM_COMPLETE
;
246 if (bytesSent
> bytesExpected
) // Error: Sent more than expected
247 return STREAM_UNPLANNED_COMPLETE
;
252 case STREAM_COMPLETE
:
253 return STREAM_COMPLETE
;
255 case STREAM_UNPLANNED_COMPLETE
:
256 return STREAM_UNPLANNED_COMPLETE
;
259 return STREAM_FAILED
;
262 fatal ("unreachable code\n");
267 Http::Stream::sendStartOfMessage(HttpReply
*rep
, StoreIOBuffer bodyData
)
271 MemBuf
*mb
= rep
->pack();
273 // dump now, so we do not output any body.
274 debugs(11, 2, "HTTP Client " << clientConnection
);
275 debugs(11, 2, "HTTP Client REPLY:\n---------\n" << mb
->buf
<< "\n----------");
277 /* Save length of headers for persistent conn checks */
278 http
->out
.headers_sz
= mb
->contentSize();
280 headersLog(0, 0, http
->request
->method
, rep
);
283 if (bodyData
.data
&& bodyData
.length
) {
284 if (multipartRangeRequest())
285 packRange(bodyData
, mb
);
286 else if (http
->request
->flags
.chunkedReply
) {
287 packChunk(bodyData
, *mb
);
289 size_t length
= lengthToSend(bodyData
.range());
290 noteSentBodyBytes(length
);
291 mb
->append(bodyData
.data
, length
);
295 for (const auto &pool
: MessageDelayPools::Instance()->pools
) {
297 std::unique_ptr
<ACLFilledChecklist
> chl(clientAclChecklistCreate(pool
->access
, http
));
299 HTTPMSGLOCK(chl
->reply
);
300 const auto answer
= chl
->fastCheck();
301 if (answer
.allowed()) {
302 writeQuotaHandler
= pool
->createBucket();
303 fd_table
[clientConnection
->fd
].writeQuotaHandler
= writeQuotaHandler
;
306 debugs(83, 4, "Response delay pool " << pool
->poolName
<<
307 " skipped because ACL " << answer
);
313 getConn()->write(mb
);
318 Http::Stream::sendBody(StoreIOBuffer bodyData
)
320 if (!multipartRangeRequest() && !http
->request
->flags
.chunkedReply
) {
321 size_t length
= lengthToSend(bodyData
.range());
322 noteSentBodyBytes(length
);
323 getConn()->write(bodyData
.data
, length
);
329 if (multipartRangeRequest())
330 packRange(bodyData
, &mb
);
332 packChunk(bodyData
, mb
);
334 if (mb
.contentSize())
335 getConn()->write(&mb
);
341 Http::Stream::lengthToSend(Range
<int64_t> const &available
) const
343 // the size of available range can always fit into a size_t type
344 size_t maximum
= available
.size();
346 if (!http
->request
->range
)
349 assert(canPackMoreRanges());
351 if (http
->range_iter
.debt() == -1)
354 assert(http
->range_iter
.debt() > 0);
356 /* TODO this + the last line could be a range intersection calculation */
357 if (available
.start
< http
->range_iter
.currentSpec()->offset
)
360 return min(http
->range_iter
.debt(), static_cast<int64_t>(maximum
));
364 Http::Stream::noteSentBodyBytes(size_t bytes
)
366 debugs(33, 7, bytes
<< " body bytes");
367 http
->out
.offset
+= bytes
;
369 if (!http
->request
->range
)
372 if (http
->range_iter
.debt() != -1) {
373 http
->range_iter
.debt(http
->range_iter
.debt() - bytes
);
374 assert (http
->range_iter
.debt() >= 0);
377 /* debt() always stops at -1, below that is a bug */
378 assert(http
->range_iter
.debt() >= -1);
381 /// \return true when If-Range specs match reply, false otherwise
383 clientIfRangeMatch(ClientHttpRequest
* http
, HttpReply
* rep
)
385 const TimeOrTag spec
= http
->request
->header
.getTimeOrTag(Http::HdrType::IF_RANGE
);
387 /* check for parsing falure */
393 ETag rep_tag
= rep
->header
.getETag(Http::HdrType::ETAG
);
394 debugs(33, 3, "ETags: " << spec
.tag
.str
<< " and " <<
395 (rep_tag
.str
? rep_tag
.str
: "<none>"));
398 return false; // entity has no etag to compare with!
400 if (spec
.tag
.weak
|| rep_tag
.weak
) {
401 debugs(33, DBG_IMPORTANT
, "Weak ETags are not allowed in If-Range: " <<
402 spec
.tag
.str
<< " ? " << rep_tag
.str
);
403 return false; // must use strong validator for sub-range requests
406 return etagIsStrongEqual(rep_tag
, spec
.tag
);
409 /* got modification time? */
411 return !http
->storeEntry()->modifiedSince(spec
.time
);
413 assert(0); /* should not happen */
417 // seems to be something better suited to Server logic
418 /** adds appropriate Range headers if needed */
420 Http::Stream::buildRangeHeader(HttpReply
*rep
)
422 HttpHeader
*hdr
= rep
? &rep
->header
: nullptr;
423 const char *range_err
= nullptr;
424 HttpRequest
*request
= http
->request
;
425 assert(request
->range
);
426 /* check if we still want to do ranges */
427 int64_t roffLimit
= request
->getRangeOffsetLimit();
428 auto contentRange
= rep
? rep
->contentRange() : nullptr;
431 range_err
= "no [parse-able] reply";
432 else if ((rep
->sline
.status() != Http::scOkay
) && (rep
->sline
.status() != Http::scPartialContent
))
433 range_err
= "wrong status code";
434 else if (rep
->sline
.status() == Http::scPartialContent
)
435 range_err
= "too complex response"; // probably contains what the client needs
436 else if (rep
->sline
.status() != Http::scOkay
)
437 range_err
= "wrong status code";
438 else if (hdr
->has(Http::HdrType::CONTENT_RANGE
)) {
439 Must(!contentRange
); // this is a 200, not 206 response
440 range_err
= "meaningless response"; // the status code or the header is wrong
442 else if (rep
->content_length
< 0)
443 range_err
= "unknown length";
444 else if (rep
->content_length
!= http
->storeEntry()->mem().baseReply().content_length
)
445 range_err
= "INCONSISTENT length"; /* a bug? */
447 /* hits only - upstream CachePeer determines correct behaviour on misses,
448 * and client_side_reply determines hits candidates
450 else if (http
->logType
.isTcpHit() &&
451 http
->request
->header
.has(Http::HdrType::IF_RANGE
) &&
452 !clientIfRangeMatch(http
, rep
))
453 range_err
= "If-Range match failed";
455 else if (!http
->request
->range
->canonize(rep
))
456 range_err
= "canonization failed";
457 else if (http
->request
->range
->isComplex())
458 range_err
= "too complex range header";
459 else if (!http
->logType
.isTcpHit() && http
->request
->range
->offsetLimitExceeded(roffLimit
))
460 range_err
= "range outside range_offset_limit";
462 /* get rid of our range specs on error */
464 /* XXX We do this here because we need canonisation etc. However, this current
465 * code will lead to incorrect store offset requests - the store will have the
466 * offset data, but we won't be requesting it.
467 * So, we can either re-request, or generate an error
469 http
->request
->ignoreRange(range_err
);
471 /* XXX: TODO: Review, this unconditional set may be wrong. */
472 rep
->sline
.set(rep
->sline
.version
, Http::scPartialContent
);
473 // web server responded with a valid, but unexpected range.
474 // will (try-to) forward as-is.
475 //TODO: we should cope with multirange request/responses
476 // TODO: review, since rep->content_range is always nil here.
477 bool replyMatchRequest
= contentRange
!= nullptr ?
478 request
->range
->contains(contentRange
->spec
) :
480 const int spec_count
= http
->request
->range
->specs
.size();
481 int64_t actual_clen
= -1;
483 debugs(33, 3, "range spec count: " << spec_count
<<
484 " virgin clen: " << rep
->content_length
);
485 assert(spec_count
> 0);
486 /* append appropriate header(s) */
487 if (spec_count
== 1) {
488 if (!replyMatchRequest
) {
489 hdr
->putContRange(contentRange
);
490 actual_clen
= rep
->content_length
;
491 //http->range_iter.pos = rep->content_range->spec.begin();
492 (*http
->range_iter
.pos
)->offset
= contentRange
->spec
.offset
;
493 (*http
->range_iter
.pos
)->length
= contentRange
->spec
.length
;
496 HttpHdrRange::iterator pos
= http
->request
->range
->begin();
498 /* append Content-Range */
501 /* No content range, so this was a full object we are
504 httpHeaderAddContRange(hdr
, **pos
, rep
->content_length
);
507 /* set new Content-Length to the actual number of bytes
508 * transmitted in the message-body */
509 actual_clen
= (*pos
)->length
;
513 /* generate boundary string */
514 http
->range_iter
.boundary
= http
->rangeBoundaryStr();
515 /* delete old Content-Type, add ours */
516 hdr
->delById(Http::HdrType::CONTENT_TYPE
);
517 httpHeaderPutStrf(hdr
, Http::HdrType::CONTENT_TYPE
,
518 "multipart/byteranges; boundary=\"" SQUIDSTRINGPH
"\"",
519 SQUIDSTRINGPRINT(http
->range_iter
.boundary
));
520 /* Content-Length is not required in multipart responses
521 * but it is always nice to have one */
522 actual_clen
= http
->mRangeCLen();
524 /* http->out needs to start where we want data at */
525 http
->out
.offset
= http
->range_iter
.currentSpec()->offset
;
528 /* replace Content-Length header */
529 assert(actual_clen
>= 0);
530 hdr
->delById(Http::HdrType::CONTENT_LENGTH
);
531 hdr
->putInt64(Http::HdrType::CONTENT_LENGTH
, actual_clen
);
532 debugs(33, 3, "actual content length: " << actual_clen
);
534 /* And start the range iter off */
535 http
->range_iter
.updateSpec();
540 Http::Stream::getTail() const
542 if (http
->client_stream
.tail
)
543 return static_cast<clientStreamNode
*>(http
->client_stream
.tail
->data
);
549 Http::Stream::getClientReplyContext() const
551 return static_cast<clientStreamNode
*>(http
->client_stream
.tail
->prev
->data
);
555 Http::Stream::getConn() const
557 assert(http
&& http
->getConn());
558 return http
->getConn();
561 /// remembers the abnormal connection termination for logging purposes
563 Http::Stream::noteIoError(const int xerrno
)
566 http
->logType
.err
.timedout
= (xerrno
== ETIMEDOUT
);
567 // aborted even if xerrno is zero (which means read abort/eof)
568 http
->logType
.err
.aborted
= (xerrno
!= ETIMEDOUT
);
573 Http::Stream::finished()
575 CodeContext::Reset(clientConnection
);
576 ConnStateData
*conn
= getConn();
578 /* we can't handle any more stream data - detach */
579 clientStreamDetach(getTail(), http
);
581 assert(connRegistered_
);
582 connRegistered_
= false;
583 conn
->pipeline
.popMe(Http::StreamPointer(this));
586 /// called when we encounter a response-related error
588 Http::Stream::initiateClose(const char *reason
)
590 debugs(33, 4, clientConnection
<< " because " << reason
);
591 getConn()->stopSending(reason
); // closes ASAP
595 Http::Stream::deferRecipientForLater(clientStreamNode
*node
, HttpReply
*rep
, StoreIOBuffer receivedData
)
597 debugs(33, 2, "Deferring request " << http
->uri
);
598 assert(flags
.deferred
== 0);
600 deferredparams
.node
= node
;
601 deferredparams
.rep
= rep
;
602 deferredparams
.queuedBuffer
= receivedData
;
606 Http::Stream::prepareReply(HttpReply
*rep
)
609 if (http
->request
->range
)
610 buildRangeHeader(rep
);
614 * Packs bodyData into mb using chunked encoding.
615 * Packs the last-chunk if bodyData is empty.
618 Http::Stream::packChunk(const StoreIOBuffer
&bodyData
, MemBuf
&mb
)
620 const uint64_t length
=
621 static_cast<uint64_t>(lengthToSend(bodyData
.range()));
622 noteSentBodyBytes(length
);
624 mb
.appendf("%" PRIX64
"\r\n", length
);
625 mb
.append(bodyData
.data
, length
);
626 mb
.append("\r\n", 2);
630 * extracts a "range" from *buf and appends them to mb, updating
631 * all offsets and such.
634 Http::Stream::packRange(StoreIOBuffer
const &source
, MemBuf
*mb
)
636 HttpHdrRangeIter
* i
= &http
->range_iter
;
637 Range
<int64_t> available(source
.range());
638 char const *buf
= source
.data
;
640 while (i
->currentSpec() && available
.size()) {
641 const size_t copy_sz
= lengthToSend(available
);
643 // intersection of "have" and "need" ranges must not be empty
644 assert(http
->out
.offset
< i
->currentSpec()->offset
+ i
->currentSpec()->length
);
645 assert(http
->out
.offset
+ (int64_t)available
.size() > i
->currentSpec()->offset
);
648 * put boundary and headers at the beginning of a range in a
651 if (http
->multipartRangeRequest() && i
->debt() == i
->currentSpec()->length
) {
653 &http
->storeEntry()->mem().freshestReply(),
654 i
->currentSpec(), /* current range */
655 i
->boundary
, /* boundary, the same for all */
660 debugs(33, 3, "appending " << copy_sz
<< " bytes");
661 noteSentBodyBytes(copy_sz
);
662 mb
->append(buf
, copy_sz
);
665 available
.start
+= copy_sz
;
669 if (!canPackMoreRanges()) {
670 debugs(33, 3, "Returning because !canPackMoreRanges.");
672 // put terminating boundary for multiparts
673 clientPackTermBound(i
->boundary
, mb
);
677 int64_t nextOffset
= getNextRangeOffset();
678 assert(nextOffset
>= http
->out
.offset
);
679 int64_t skip
= nextOffset
- http
->out
.offset
;
680 /* adjust for not to be transmitted bytes */
681 http
->out
.offset
= nextOffset
;
683 if (available
.size() <= (uint64_t)skip
)
686 available
.start
+= skip
;
695 Http::Stream::doClose()
697 clientConnection
->close();