2 * Copyright (C) 1996-2017 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
10 #include "client_side_request.h"
11 #include "http/Stream.h"
12 #include "HttpHdrContRange.h"
13 #include "HttpHeaderTools.h"
15 #include "TimeOrTag.h"
17 #include "acl/FilledChecklist.h"
18 #include "ClientInfo.h"
20 #include "MessageDelayPools.h"
23 Http::Stream::Stream(const Comm::ConnectionPointer
&aConn
, ClientHttpRequest
*aReq
) :
24 clientConnection(aConn
),
28 mayUseConnection_(false),
29 connRegistered_(false)
31 assert(http
!= nullptr);
32 memset(reqbuf
, '\0', sizeof (reqbuf
));
35 deferredparams
.node
= nullptr;
36 deferredparams
.rep
= nullptr;
39 Http::Stream::~Stream()
41 if (auto node
= getTail()) {
42 if (auto ctx
= dynamic_cast<Http::Stream
*>(node
->data
.getRaw())) {
43 /* We are *always* the tail - prevent recursive free */
48 httpRequestFree(http
);
52 Http::Stream::registerWithConn()
54 assert(!connRegistered_
);
56 connRegistered_
= true;
57 getConn()->pipeline
.add(Http::StreamPointer(this));
61 Http::Stream::startOfOutput() const
63 return http
->out
.size
== 0;
67 Http::Stream::writeComplete(size_t size
)
69 const StoreEntry
*entry
= http
->storeEntry();
70 debugs(33, 5, clientConnection
<< ", sz " << size
<<
71 ", off " << (http
->out
.size
+ size
) << ", len " <<
72 (entry
? entry
->objectLen() : 0));
74 http
->out
.size
+= size
;
76 if (clientHttpRequestStatus(clientConnection
->fd
, http
)) {
77 initiateClose("failure or true request status");
78 /* Do we leak here ? */
82 switch (socketState()) {
88 case STREAM_COMPLETE
: {
89 debugs(33, 5, clientConnection
<< " Stream complete, keepalive is " <<
90 http
->request
->flags
.proxyKeepalive
);
91 ConnStateData
*c
= getConn();
92 if (!http
->request
->flags
.proxyKeepalive
)
93 clientConnection
->close();
99 case STREAM_UNPLANNED_COMPLETE
:
100 initiateClose("STREAM_UNPLANNED_COMPLETE");
104 initiateClose("STREAM_FAILED");
108 fatal("Hit unreachable code in Http::Stream::writeComplete\n");
113 Http::Stream::pullData()
115 debugs(33, 5, reply
<< " written " << http
->out
.size
<< " into " << clientConnection
);
117 /* More data will be coming from the stream. */
118 StoreIOBuffer readBuffer
;
119 /* XXX: Next requested byte in the range sequence */
120 /* XXX: length = getmaximumrangelenfgth */
121 readBuffer
.offset
= getNextRangeOffset();
122 readBuffer
.length
= HTTP_REQBUF_SZ
;
123 readBuffer
.data
= reqbuf
;
124 /* we may note we have reached the end of the wanted ranges */
125 clientStreamRead(getTail(), http
, readBuffer
);
129 Http::Stream::multipartRangeRequest() const
131 return http
->multipartRangeRequest();
135 Http::Stream::getNextRangeOffset() const
137 debugs (33, 5, "range: " << http
->request
->range
<<
138 "; http offset " << http
->out
.offset
<<
139 "; reply " << reply
);
141 // XXX: This method is called from many places, including pullData() which
142 // may be called before prepareReply() [on some Squid-generated errors].
143 // Hence, we may not even know yet whether we should honor/do ranges.
145 if (http
->request
->range
) {
146 /* offset in range specs does not count the prefix of an http msg */
147 /* check: reply was parsed and range iterator was initialized */
148 assert(http
->range_iter
.valid
);
149 /* filter out data according to range specs */
150 assert(canPackMoreRanges());
152 assert(http
->range_iter
.currentSpec());
153 /* offset of still missing data */
154 int64_t start
= http
->range_iter
.currentSpec()->offset
+
155 http
->range_iter
.currentSpec()->length
-
156 http
->range_iter
.debt();
157 debugs(33, 3, "clientPackMoreRanges: in: offset: " << http
->out
.offset
);
158 debugs(33, 3, "clientPackMoreRanges: out:"
159 " start: " << start
<<
160 " spec[" << http
->range_iter
.pos
- http
->request
->range
->begin() << "]:" <<
161 " [" << http
->range_iter
.currentSpec()->offset
<<
162 ", " << http
->range_iter
.currentSpec()->offset
+
163 http
->range_iter
.currentSpec()->length
<< "),"
164 " len: " << http
->range_iter
.currentSpec()->length
<<
165 " debt: " << http
->range_iter
.debt());
166 if (http
->range_iter
.currentSpec()->length
!= -1)
167 assert(http
->out
.offset
<= start
); /* we did not miss it */
172 } else if (reply
&& reply
->content_range
) {
173 /* request does not have ranges, but reply does */
174 /** \todo FIXME: should use range_iter_pos on reply, as soon as reply->content_range
175 * becomes HttpHdrRange rather than HttpHdrRangeSpec.
177 return http
->out
.offset
+ reply
->content_range
->spec
.offset
;
180 return http
->out
.offset
;
184 * increments iterator "i"
185 * used by clientPackMoreRanges
187 * \retval true there is still data available to pack more ranges
191 Http::Stream::canPackMoreRanges() const
193 /** first update iterator "i" if needed */
194 if (!http
->range_iter
.debt()) {
195 debugs(33, 5, "At end of current range spec for " << clientConnection
);
197 if (http
->range_iter
.pos
!= http
->range_iter
.end
)
198 ++http
->range_iter
.pos
;
200 http
->range_iter
.updateSpec();
203 assert(!http
->range_iter
.debt() == !http
->range_iter
.currentSpec());
205 /* paranoid sync condition */
206 /* continue condition: need_more_data */
207 debugs(33, 5, "returning " << (http
->range_iter
.currentSpec() ? true : false));
208 return http
->range_iter
.currentSpec() ? true : false;
211 /// Adapt stream status to account for Range cases
212 clientStream_status_t
213 Http::Stream::socketState()
215 switch (clientStreamStatus(getTail(), http
)) {
218 /* check for range support ending */
219 if (http
->request
->range
) {
220 /* check: reply was parsed and range iterator was initialized */
221 assert(http
->range_iter
.valid
);
222 /* filter out data according to range specs */
224 if (!canPackMoreRanges()) {
225 debugs(33, 5, "Range request at end of returnable " <<
226 "range sequence on " << clientConnection
);
227 // we got everything we wanted from the store
228 return STREAM_COMPLETE
;
230 } else if (reply
&& reply
->content_range
) {
231 /* reply has content-range, but Squid is not managing ranges */
232 const int64_t &bytesSent
= http
->out
.offset
;
233 const int64_t &bytesExpected
= reply
->content_range
->spec
.length
;
235 debugs(33, 7, "body bytes sent vs. expected: " <<
236 bytesSent
<< " ? " << bytesExpected
<< " (+" <<
237 reply
->content_range
->spec
.offset
<< ")");
239 // did we get at least what we expected, based on range specs?
241 if (bytesSent
== bytesExpected
) // got everything
242 return STREAM_COMPLETE
;
244 if (bytesSent
> bytesExpected
) // Error: Sent more than expected
245 return STREAM_UNPLANNED_COMPLETE
;
250 case STREAM_COMPLETE
:
251 return STREAM_COMPLETE
;
253 case STREAM_UNPLANNED_COMPLETE
:
254 return STREAM_UNPLANNED_COMPLETE
;
257 return STREAM_FAILED
;
260 fatal ("unreachable code\n");
265 Http::Stream::sendStartOfMessage(HttpReply
*rep
, StoreIOBuffer bodyData
)
269 MemBuf
*mb
= rep
->pack();
271 // dump now, so we dont output any body.
272 debugs(11, 2, "HTTP Client " << clientConnection
);
273 debugs(11, 2, "HTTP Client REPLY:\n---------\n" << mb
->buf
<< "\n----------");
275 /* Save length of headers for persistent conn checks */
276 http
->out
.headers_sz
= mb
->contentSize();
278 headersLog(0, 0, http
->request
->method
, rep
);
281 if (bodyData
.data
&& bodyData
.length
) {
282 if (multipartRangeRequest())
283 packRange(bodyData
, mb
);
284 else if (http
->request
->flags
.chunkedReply
) {
285 packChunk(bodyData
, *mb
);
287 size_t length
= lengthToSend(bodyData
.range());
288 noteSentBodyBytes(length
);
289 mb
->append(bodyData
.data
, length
);
293 for (const auto &pool
: MessageDelayPools::Instance()->pools
) {
295 std::unique_ptr
<ACLFilledChecklist
> chl(clientAclChecklistCreate(pool
->access
, http
));
297 HTTPMSGLOCK(chl
->reply
);
298 const allow_t answer
= chl
->fastCheck();
299 if (answer
.allowed()) {
300 writeQuotaHandler
= pool
->createBucket();
301 fd_table
[clientConnection
->fd
].writeQuotaHandler
= writeQuotaHandler
;
304 debugs(83, 4, "Response delay pool " << pool
->poolName
<<
305 " skipped because ACL " << answer
);
311 getConn()->write(mb
);
316 Http::Stream::sendBody(StoreIOBuffer bodyData
)
318 if (!multipartRangeRequest() && !http
->request
->flags
.chunkedReply
) {
319 size_t length
= lengthToSend(bodyData
.range());
320 noteSentBodyBytes(length
);
321 getConn()->write(bodyData
.data
, length
);
327 if (multipartRangeRequest())
328 packRange(bodyData
, &mb
);
330 packChunk(bodyData
, mb
);
332 if (mb
.contentSize())
333 getConn()->write(&mb
);
339 Http::Stream::lengthToSend(Range
<int64_t> const &available
) const
341 // the size of available range can always fit into a size_t type
342 size_t maximum
= available
.size();
344 if (!http
->request
->range
)
347 assert(canPackMoreRanges());
349 if (http
->range_iter
.debt() == -1)
352 assert(http
->range_iter
.debt() > 0);
354 /* TODO this + the last line could be a range intersection calculation */
355 if (available
.start
< http
->range_iter
.currentSpec()->offset
)
358 return min(http
->range_iter
.debt(), static_cast<int64_t>(maximum
));
362 Http::Stream::noteSentBodyBytes(size_t bytes
)
364 debugs(33, 7, bytes
<< " body bytes");
365 http
->out
.offset
+= bytes
;
367 if (!http
->request
->range
)
370 if (http
->range_iter
.debt() != -1) {
371 http
->range_iter
.debt(http
->range_iter
.debt() - bytes
);
372 assert (http
->range_iter
.debt() >= 0);
375 /* debt() always stops at -1, below that is a bug */
376 assert(http
->range_iter
.debt() >= -1);
379 /// \return true when If-Range specs match reply, false otherwise
381 clientIfRangeMatch(ClientHttpRequest
* http
, HttpReply
* rep
)
383 const TimeOrTag spec
= http
->request
->header
.getTimeOrTag(Http::HdrType::IF_RANGE
);
385 /* check for parsing falure */
391 ETag rep_tag
= rep
->header
.getETag(Http::HdrType::ETAG
);
392 debugs(33, 3, "ETags: " << spec
.tag
.str
<< " and " <<
393 (rep_tag
.str
? rep_tag
.str
: "<none>"));
396 return false; // entity has no etag to compare with!
398 if (spec
.tag
.weak
|| rep_tag
.weak
) {
399 debugs(33, DBG_IMPORTANT
, "Weak ETags are not allowed in If-Range: " <<
400 spec
.tag
.str
<< " ? " << rep_tag
.str
);
401 return false; // must use strong validator for sub-range requests
404 return etagIsStrongEqual(rep_tag
, spec
.tag
);
407 /* got modification time? */
409 return !http
->storeEntry()->modifiedSince(spec
.time
);
411 assert(0); /* should not happen */
415 // seems to be something better suited to Server logic
416 /** adds appropriate Range headers if needed */
418 Http::Stream::buildRangeHeader(HttpReply
*rep
)
420 HttpHeader
*hdr
= rep
? &rep
->header
: nullptr;
421 const char *range_err
= nullptr;
422 HttpRequest
*request
= http
->request
;
423 assert(request
->range
);
424 /* check if we still want to do ranges */
425 int64_t roffLimit
= request
->getRangeOffsetLimit();
428 range_err
= "no [parse-able] reply";
429 else if ((rep
->sline
.status() != Http::scOkay
) && (rep
->sline
.status() != Http::scPartialContent
))
430 range_err
= "wrong status code";
431 else if (hdr
->has(Http::HdrType::CONTENT_RANGE
))
432 range_err
= "origin server does ranges";
433 else if (rep
->content_length
< 0)
434 range_err
= "unknown length";
435 else if (rep
->content_length
!= http
->memObject()->getReply()->content_length
)
436 range_err
= "INCONSISTENT length"; /* a bug? */
438 /* hits only - upstream CachePeer determines correct behaviour on misses,
439 * and client_side_reply determines hits candidates
441 else if (http
->logType
.isTcpHit() &&
442 http
->request
->header
.has(Http::HdrType::IF_RANGE
) &&
443 !clientIfRangeMatch(http
, rep
))
444 range_err
= "If-Range match failed";
446 else if (!http
->request
->range
->canonize(rep
))
447 range_err
= "canonization failed";
448 else if (http
->request
->range
->isComplex())
449 range_err
= "too complex range header";
450 else if (!http
->logType
.isTcpHit() && http
->request
->range
->offsetLimitExceeded(roffLimit
))
451 range_err
= "range outside range_offset_limit";
453 /* get rid of our range specs on error */
455 /* XXX We do this here because we need canonisation etc. However, this current
456 * code will lead to incorrect store offset requests - the store will have the
457 * offset data, but we won't be requesting it.
458 * So, we can either re-request, or generate an error
460 http
->request
->ignoreRange(range_err
);
462 /* XXX: TODO: Review, this unconditional set may be wrong. */
463 rep
->sline
.set(rep
->sline
.version
, Http::scPartialContent
);
464 // web server responded with a valid, but unexpected range.
465 // will (try-to) forward as-is.
466 //TODO: we should cope with multirange request/responses
467 bool replyMatchRequest
= rep
->content_range
!= nullptr ?
468 request
->range
->contains(rep
->content_range
->spec
) :
470 const int spec_count
= http
->request
->range
->specs
.size();
471 int64_t actual_clen
= -1;
473 debugs(33, 3, "range spec count: " << spec_count
<<
474 " virgin clen: " << rep
->content_length
);
475 assert(spec_count
> 0);
476 /* append appropriate header(s) */
477 if (spec_count
== 1) {
478 if (!replyMatchRequest
) {
479 hdr
->delById(Http::HdrType::CONTENT_RANGE
);
480 hdr
->putContRange(rep
->content_range
);
481 actual_clen
= rep
->content_length
;
482 //http->range_iter.pos = rep->content_range->spec.begin();
483 (*http
->range_iter
.pos
)->offset
= rep
->content_range
->spec
.offset
;
484 (*http
->range_iter
.pos
)->length
= rep
->content_range
->spec
.length
;
487 HttpHdrRange::iterator pos
= http
->request
->range
->begin();
489 /* append Content-Range */
491 if (!hdr
->has(Http::HdrType::CONTENT_RANGE
)) {
492 /* No content range, so this was a full object we are
495 httpHeaderAddContRange(hdr
, **pos
, rep
->content_length
);
498 /* set new Content-Length to the actual number of bytes
499 * transmitted in the message-body */
500 actual_clen
= (*pos
)->length
;
504 /* generate boundary string */
505 http
->range_iter
.boundary
= http
->rangeBoundaryStr();
506 /* delete old Content-Type, add ours */
507 hdr
->delById(Http::HdrType::CONTENT_TYPE
);
508 httpHeaderPutStrf(hdr
, Http::HdrType::CONTENT_TYPE
,
509 "multipart/byteranges; boundary=\"" SQUIDSTRINGPH
"\"",
510 SQUIDSTRINGPRINT(http
->range_iter
.boundary
));
511 /* Content-Length is not required in multipart responses
512 * but it is always nice to have one */
513 actual_clen
= http
->mRangeCLen();
515 /* http->out needs to start where we want data at */
516 http
->out
.offset
= http
->range_iter
.currentSpec()->offset
;
519 /* replace Content-Length header */
520 assert(actual_clen
>= 0);
521 hdr
->delById(Http::HdrType::CONTENT_LENGTH
);
522 hdr
->putInt64(Http::HdrType::CONTENT_LENGTH
, actual_clen
);
523 debugs(33, 3, "actual content length: " << actual_clen
);
525 /* And start the range iter off */
526 http
->range_iter
.updateSpec();
531 Http::Stream::getTail() const
533 if (http
->client_stream
.tail
)
534 return static_cast<clientStreamNode
*>(http
->client_stream
.tail
->data
);
540 Http::Stream::getClientReplyContext() const
542 return static_cast<clientStreamNode
*>(http
->client_stream
.tail
->prev
->data
);
546 Http::Stream::getConn() const
548 assert(http
&& http
->getConn());
549 return http
->getConn();
552 /// remembers the abnormal connection termination for logging purposes
554 Http::Stream::noteIoError(const int xerrno
)
557 http
->logType
.err
.timedout
= (xerrno
== ETIMEDOUT
);
558 // aborted even if xerrno is zero (which means read abort/eof)
559 http
->logType
.err
.aborted
= (xerrno
!= ETIMEDOUT
);
564 Http::Stream::finished()
566 ConnStateData
*conn
= getConn();
568 /* we can't handle any more stream data - detach */
569 clientStreamDetach(getTail(), http
);
571 assert(connRegistered_
);
572 connRegistered_
= false;
573 conn
->pipeline
.popMe(Http::StreamPointer(this));
576 /// called when we encounter a response-related error
578 Http::Stream::initiateClose(const char *reason
)
580 debugs(33, 4, clientConnection
<< " because " << reason
);
581 getConn()->stopSending(reason
); // closes ASAP
585 Http::Stream::deferRecipientForLater(clientStreamNode
*node
, HttpReply
*rep
, StoreIOBuffer receivedData
)
587 debugs(33, 2, "Deferring request " << http
->uri
);
588 assert(flags
.deferred
== 0);
590 deferredparams
.node
= node
;
591 deferredparams
.rep
= rep
;
592 deferredparams
.queuedBuffer
= receivedData
;
596 Http::Stream::prepareReply(HttpReply
*rep
)
599 if (http
->request
->range
)
600 buildRangeHeader(rep
);
604 * Packs bodyData into mb using chunked encoding.
605 * Packs the last-chunk if bodyData is empty.
608 Http::Stream::packChunk(const StoreIOBuffer
&bodyData
, MemBuf
&mb
)
610 const uint64_t length
=
611 static_cast<uint64_t>(lengthToSend(bodyData
.range()));
612 noteSentBodyBytes(length
);
614 mb
.appendf("%" PRIX64
"\r\n", length
);
615 mb
.append(bodyData
.data
, length
);
616 mb
.append("\r\n", 2);
620 * extracts a "range" from *buf and appends them to mb, updating
621 * all offsets and such.
624 Http::Stream::packRange(StoreIOBuffer
const &source
, MemBuf
*mb
)
626 HttpHdrRangeIter
* i
= &http
->range_iter
;
627 Range
<int64_t> available(source
.range());
628 char const *buf
= source
.data
;
630 while (i
->currentSpec() && available
.size()) {
631 const size_t copy_sz
= lengthToSend(available
);
633 // intersection of "have" and "need" ranges must not be empty
634 assert(http
->out
.offset
< i
->currentSpec()->offset
+ i
->currentSpec()->length
);
635 assert(http
->out
.offset
+ (int64_t)available
.size() > i
->currentSpec()->offset
);
638 * put boundary and headers at the beginning of a range in a
641 if (http
->multipartRangeRequest() && i
->debt() == i
->currentSpec()->length
) {
642 assert(http
->memObject());
644 http
->memObject()->getReply(), /* original reply */
645 i
->currentSpec(), /* current range */
646 i
->boundary
, /* boundary, the same for all */
651 debugs(33, 3, "appending " << copy_sz
<< " bytes");
652 noteSentBodyBytes(copy_sz
);
653 mb
->append(buf
, copy_sz
);
656 available
.start
+= copy_sz
;
660 if (!canPackMoreRanges()) {
661 debugs(33, 3, "Returning because !canPackMoreRanges.");
663 // put terminating boundary for multiparts
664 clientPackTermBound(i
->boundary
, mb
);
668 int64_t nextOffset
= getNextRangeOffset();
669 assert(nextOffset
>= http
->out
.offset
);
670 int64_t skip
= nextOffset
- http
->out
.offset
;
671 /* adjust for not to be transmitted bytes */
672 http
->out
.offset
= nextOffset
;
674 if (available
.size() <= (uint64_t)skip
)
677 available
.start
+= skip
;
686 Http::Stream::doClose()
688 clientConnection
->close();