2 * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
10 #include "client_side_request.h"
11 #include "http/Stream.h"
12 #include "HttpHdrContRange.h"
13 #include "HttpHeaderTools.h"
15 #include "TimeOrTag.h"
17 Http::Stream::Stream(const Comm::ConnectionPointer
&aConn
, ClientHttpRequest
*aReq
) :
18 clientConnection(aConn
),
22 mayUseConnection_(false),
23 connRegistered_(false)
25 assert(http
!= nullptr);
26 memset(reqbuf
, '\0', sizeof (reqbuf
));
29 deferredparams
.node
= nullptr;
30 deferredparams
.rep
= nullptr;
33 Http::Stream::~Stream()
35 if (auto node
= getTail()) {
36 if (auto ctx
= dynamic_cast<Http::Stream
*>(node
->data
.getRaw())) {
37 /* We are *always* the tail - prevent recursive free */
42 httpRequestFree(http
);
46 Http::Stream::registerWithConn()
48 assert(!connRegistered_
);
50 connRegistered_
= true;
51 getConn()->pipeline
.add(Http::StreamPointer(this));
55 Http::Stream::startOfOutput() const
57 return http
->out
.size
== 0;
61 Http::Stream::writeComplete(size_t size
)
63 const StoreEntry
*entry
= http
->storeEntry();
64 debugs(33, 5, clientConnection
<< ", sz " << size
<<
65 ", off " << (http
->out
.size
+ size
) << ", len " <<
66 (entry
? entry
->objectLen() : 0));
68 http
->out
.size
+= size
;
70 if (clientHttpRequestStatus(clientConnection
->fd
, http
)) {
71 initiateClose("failure or true request status");
72 /* Do we leak here ? */
76 switch (socketState()) {
82 case STREAM_COMPLETE
: {
83 debugs(33, 5, clientConnection
<< " Stream complete, keepalive is " <<
84 http
->request
->flags
.proxyKeepalive
);
85 ConnStateData
*c
= getConn();
86 if (!http
->request
->flags
.proxyKeepalive
)
87 clientConnection
->close();
93 case STREAM_UNPLANNED_COMPLETE
:
94 initiateClose("STREAM_UNPLANNED_COMPLETE");
98 initiateClose("STREAM_FAILED");
102 fatal("Hit unreachable code in Http::Stream::writeComplete\n");
107 Http::Stream::pullData()
109 debugs(33, 5, reply
<< " written " << http
->out
.size
<< " into " << clientConnection
);
111 /* More data will be coming from the stream. */
112 StoreIOBuffer readBuffer
;
113 /* XXX: Next requested byte in the range sequence */
114 /* XXX: length = getmaximumrangelenfgth */
115 readBuffer
.offset
= getNextRangeOffset();
116 readBuffer
.length
= HTTP_REQBUF_SZ
;
117 readBuffer
.data
= reqbuf
;
118 /* we may note we have reached the end of the wanted ranges */
119 clientStreamRead(getTail(), http
, readBuffer
);
123 Http::Stream::multipartRangeRequest() const
125 return http
->multipartRangeRequest();
129 Http::Stream::getNextRangeOffset() const
131 debugs (33, 5, "range: " << http
->request
->range
<<
132 "; http offset " << http
->out
.offset
<<
133 "; reply " << reply
);
135 // XXX: This method is called from many places, including pullData() which
136 // may be called before prepareReply() [on some Squid-generated errors].
137 // Hence, we may not even know yet whether we should honor/do ranges.
139 if (http
->request
->range
) {
140 /* offset in range specs does not count the prefix of an http msg */
141 /* check: reply was parsed and range iterator was initialized */
142 assert(http
->range_iter
.valid
);
143 /* filter out data according to range specs */
144 assert(canPackMoreRanges());
146 assert(http
->range_iter
.currentSpec());
147 /* offset of still missing data */
148 int64_t start
= http
->range_iter
.currentSpec()->offset
+
149 http
->range_iter
.currentSpec()->length
-
150 http
->range_iter
.debt();
151 debugs(33, 3, "clientPackMoreRanges: in: offset: " << http
->out
.offset
);
152 debugs(33, 3, "clientPackMoreRanges: out:"
153 " start: " << start
<<
154 " spec[" << http
->range_iter
.pos
- http
->request
->range
->begin() << "]:" <<
155 " [" << http
->range_iter
.currentSpec()->offset
<<
156 ", " << http
->range_iter
.currentSpec()->offset
+
157 http
->range_iter
.currentSpec()->length
<< "),"
158 " len: " << http
->range_iter
.currentSpec()->length
<<
159 " debt: " << http
->range_iter
.debt());
160 if (http
->range_iter
.currentSpec()->length
!= -1)
161 assert(http
->out
.offset
<= start
); /* we did not miss it */
166 } else if (reply
&& reply
->content_range
) {
167 /* request does not have ranges, but reply does */
168 /** \todo FIXME: should use range_iter_pos on reply, as soon as reply->content_range
169 * becomes HttpHdrRange rather than HttpHdrRangeSpec.
171 return http
->out
.offset
+ reply
->content_range
->spec
.offset
;
174 return http
->out
.offset
;
178 * increments iterator "i"
179 * used by clientPackMoreRanges
181 * \retval true there is still data available to pack more ranges
185 Http::Stream::canPackMoreRanges() const
187 /** first update iterator "i" if needed */
188 if (!http
->range_iter
.debt()) {
189 debugs(33, 5, "At end of current range spec for " << clientConnection
);
191 if (http
->range_iter
.pos
!= http
->range_iter
.end
)
192 ++http
->range_iter
.pos
;
194 http
->range_iter
.updateSpec();
197 assert(!http
->range_iter
.debt() == !http
->range_iter
.currentSpec());
199 /* paranoid sync condition */
200 /* continue condition: need_more_data */
201 debugs(33, 5, "returning " << (http
->range_iter
.currentSpec() ? true : false));
202 return http
->range_iter
.currentSpec() ? true : false;
205 /// Adapt stream status to account for Range cases
206 clientStream_status_t
207 Http::Stream::socketState()
209 switch (clientStreamStatus(getTail(), http
)) {
212 /* check for range support ending */
213 if (http
->request
->range
) {
214 /* check: reply was parsed and range iterator was initialized */
215 assert(http
->range_iter
.valid
);
216 /* filter out data according to range specs */
218 if (!canPackMoreRanges()) {
219 debugs(33, 5, "Range request at end of returnable " <<
220 "range sequence on " << clientConnection
);
221 // we got everything we wanted from the store
222 return STREAM_COMPLETE
;
224 } else if (reply
&& reply
->content_range
) {
225 /* reply has content-range, but Squid is not managing ranges */
226 const int64_t &bytesSent
= http
->out
.offset
;
227 const int64_t &bytesExpected
= reply
->content_range
->spec
.length
;
229 debugs(33, 7, "body bytes sent vs. expected: " <<
230 bytesSent
<< " ? " << bytesExpected
<< " (+" <<
231 reply
->content_range
->spec
.offset
<< ")");
233 // did we get at least what we expected, based on range specs?
235 if (bytesSent
== bytesExpected
) // got everything
236 return STREAM_COMPLETE
;
238 if (bytesSent
> bytesExpected
) // Error: Sent more than expected
239 return STREAM_UNPLANNED_COMPLETE
;
244 case STREAM_COMPLETE
:
245 return STREAM_COMPLETE
;
247 case STREAM_UNPLANNED_COMPLETE
:
248 return STREAM_UNPLANNED_COMPLETE
;
251 return STREAM_FAILED
;
254 fatal ("unreachable code\n");
259 Http::Stream::sendStartOfMessage(HttpReply
*rep
, StoreIOBuffer bodyData
)
263 MemBuf
*mb
= rep
->pack();
265 // dump now, so we dont output any body.
266 debugs(11, 2, "HTTP Client " << clientConnection
);
267 debugs(11, 2, "HTTP Client REPLY:\n---------\n" << mb
->buf
<< "\n----------");
269 /* Save length of headers for persistent conn checks */
270 http
->out
.headers_sz
= mb
->contentSize();
272 headersLog(0, 0, http
->request
->method
, rep
);
275 if (bodyData
.data
&& bodyData
.length
) {
276 if (multipartRangeRequest())
277 packRange(bodyData
, mb
);
278 else if (http
->request
->flags
.chunkedReply
) {
279 packChunk(bodyData
, *mb
);
281 size_t length
= lengthToSend(bodyData
.range());
282 noteSentBodyBytes(length
);
283 mb
->append(bodyData
.data
, length
);
287 getConn()->write(mb
);
292 Http::Stream::sendBody(StoreIOBuffer bodyData
)
294 if (!multipartRangeRequest() && !http
->request
->flags
.chunkedReply
) {
295 size_t length
= lengthToSend(bodyData
.range());
296 noteSentBodyBytes(length
);
297 getConn()->write(bodyData
.data
, length
);
303 if (multipartRangeRequest())
304 packRange(bodyData
, &mb
);
306 packChunk(bodyData
, mb
);
308 if (mb
.contentSize())
309 getConn()->write(&mb
);
315 Http::Stream::lengthToSend(Range
<int64_t> const &available
) const
317 // the size of available range can always fit into a size_t type
318 size_t maximum
= available
.size();
320 if (!http
->request
->range
)
323 assert(canPackMoreRanges());
325 if (http
->range_iter
.debt() == -1)
328 assert(http
->range_iter
.debt() > 0);
330 /* TODO this + the last line could be a range intersection calculation */
331 if (available
.start
< http
->range_iter
.currentSpec()->offset
)
334 return min(http
->range_iter
.debt(), static_cast<int64_t>(maximum
));
338 Http::Stream::noteSentBodyBytes(size_t bytes
)
340 debugs(33, 7, bytes
<< " body bytes");
341 http
->out
.offset
+= bytes
;
343 if (!http
->request
->range
)
346 if (http
->range_iter
.debt() != -1) {
347 http
->range_iter
.debt(http
->range_iter
.debt() - bytes
);
348 assert (http
->range_iter
.debt() >= 0);
351 /* debt() always stops at -1, below that is a bug */
352 assert(http
->range_iter
.debt() >= -1);
355 /// \return true when If-Range specs match reply, false otherwise
357 clientIfRangeMatch(ClientHttpRequest
* http
, HttpReply
* rep
)
359 const TimeOrTag spec
= http
->request
->header
.getTimeOrTag(Http::HdrType::IF_RANGE
);
361 /* check for parsing falure */
367 ETag rep_tag
= rep
->header
.getETag(Http::HdrType::ETAG
);
368 debugs(33, 3, "ETags: " << spec
.tag
.str
<< " and " <<
369 (rep_tag
.str
? rep_tag
.str
: "<none>"));
372 return false; // entity has no etag to compare with!
374 if (spec
.tag
.weak
|| rep_tag
.weak
) {
375 debugs(33, DBG_IMPORTANT
, "Weak ETags are not allowed in If-Range: " <<
376 spec
.tag
.str
<< " ? " << rep_tag
.str
);
377 return false; // must use strong validator for sub-range requests
380 return etagIsStrongEqual(rep_tag
, spec
.tag
);
383 /* got modification time? */
385 return !http
->storeEntry()->modifiedSince(spec
.time
);
387 assert(0); /* should not happen */
391 // seems to be something better suited to Server logic
392 /** adds appropriate Range headers if needed */
394 Http::Stream::buildRangeHeader(HttpReply
*rep
)
396 HttpHeader
*hdr
= rep
? &rep
->header
: nullptr;
397 const char *range_err
= nullptr;
398 HttpRequest
*request
= http
->request
;
399 assert(request
->range
);
400 /* check if we still want to do ranges */
401 int64_t roffLimit
= request
->getRangeOffsetLimit();
404 range_err
= "no [parse-able] reply";
405 else if ((rep
->sline
.status() != Http::scOkay
) && (rep
->sline
.status() != Http::scPartialContent
))
406 range_err
= "wrong status code";
407 else if (hdr
->has(Http::HdrType::CONTENT_RANGE
))
408 range_err
= "origin server does ranges";
409 else if (rep
->content_length
< 0)
410 range_err
= "unknown length";
411 else if (rep
->content_length
!= http
->memObject()->getReply()->content_length
)
412 range_err
= "INCONSISTENT length"; /* a bug? */
414 /* hits only - upstream CachePeer determines correct behaviour on misses,
415 * and client_side_reply determines hits candidates
417 else if (http
->logType
.isTcpHit() &&
418 http
->request
->header
.has(Http::HdrType::IF_RANGE
) &&
419 !clientIfRangeMatch(http
, rep
))
420 range_err
= "If-Range match failed";
422 else if (!http
->request
->range
->canonize(rep
))
423 range_err
= "canonization failed";
424 else if (http
->request
->range
->isComplex())
425 range_err
= "too complex range header";
426 else if (!http
->logType
.isTcpHit() && http
->request
->range
->offsetLimitExceeded(roffLimit
))
427 range_err
= "range outside range_offset_limit";
429 /* get rid of our range specs on error */
431 /* XXX We do this here because we need canonisation etc. However, this current
432 * code will lead to incorrect store offset requests - the store will have the
433 * offset data, but we won't be requesting it.
434 * So, we can either re-request, or generate an error
436 http
->request
->ignoreRange(range_err
);
438 /* XXX: TODO: Review, this unconditional set may be wrong. */
439 rep
->sline
.set(rep
->sline
.version
, Http::scPartialContent
);
440 // web server responded with a valid, but unexpected range.
441 // will (try-to) forward as-is.
442 //TODO: we should cope with multirange request/responses
443 bool replyMatchRequest
= rep
->content_range
!= nullptr ?
444 request
->range
->contains(rep
->content_range
->spec
) :
446 const int spec_count
= http
->request
->range
->specs
.size();
447 int64_t actual_clen
= -1;
449 debugs(33, 3, "range spec count: " << spec_count
<<
450 " virgin clen: " << rep
->content_length
);
451 assert(spec_count
> 0);
452 /* append appropriate header(s) */
453 if (spec_count
== 1) {
454 if (!replyMatchRequest
) {
455 hdr
->delById(Http::HdrType::CONTENT_RANGE
);
456 hdr
->putContRange(rep
->content_range
);
457 actual_clen
= rep
->content_length
;
458 //http->range_iter.pos = rep->content_range->spec.begin();
459 (*http
->range_iter
.pos
)->offset
= rep
->content_range
->spec
.offset
;
460 (*http
->range_iter
.pos
)->length
= rep
->content_range
->spec
.length
;
463 HttpHdrRange::iterator pos
= http
->request
->range
->begin();
465 /* append Content-Range */
467 if (!hdr
->has(Http::HdrType::CONTENT_RANGE
)) {
468 /* No content range, so this was a full object we are
471 httpHeaderAddContRange(hdr
, **pos
, rep
->content_length
);
474 /* set new Content-Length to the actual number of bytes
475 * transmitted in the message-body */
476 actual_clen
= (*pos
)->length
;
480 /* generate boundary string */
481 http
->range_iter
.boundary
= http
->rangeBoundaryStr();
482 /* delete old Content-Type, add ours */
483 hdr
->delById(Http::HdrType::CONTENT_TYPE
);
484 httpHeaderPutStrf(hdr
, Http::HdrType::CONTENT_TYPE
,
485 "multipart/byteranges; boundary=\"" SQUIDSTRINGPH
"\"",
486 SQUIDSTRINGPRINT(http
->range_iter
.boundary
));
487 /* Content-Length is not required in multipart responses
488 * but it is always nice to have one */
489 actual_clen
= http
->mRangeCLen();
491 /* http->out needs to start where we want data at */
492 http
->out
.offset
= http
->range_iter
.currentSpec()->offset
;
495 /* replace Content-Length header */
496 assert(actual_clen
>= 0);
497 hdr
->delById(Http::HdrType::CONTENT_LENGTH
);
498 hdr
->putInt64(Http::HdrType::CONTENT_LENGTH
, actual_clen
);
499 debugs(33, 3, "actual content length: " << actual_clen
);
501 /* And start the range iter off */
502 http
->range_iter
.updateSpec();
507 Http::Stream::getTail() const
509 if (http
->client_stream
.tail
)
510 return static_cast<clientStreamNode
*>(http
->client_stream
.tail
->data
);
516 Http::Stream::getClientReplyContext() const
518 return static_cast<clientStreamNode
*>(http
->client_stream
.tail
->prev
->data
);
522 Http::Stream::getConn() const
524 assert(http
&& http
->getConn());
525 return http
->getConn();
528 /// remembers the abnormal connection termination for logging purposes
530 Http::Stream::noteIoError(const int xerrno
)
533 http
->logType
.err
.timedout
= (xerrno
== ETIMEDOUT
);
534 // aborted even if xerrno is zero (which means read abort/eof)
535 http
->logType
.err
.aborted
= (xerrno
!= ETIMEDOUT
);
540 Http::Stream::finished()
542 ConnStateData
*conn
= getConn();
544 /* we can't handle any more stream data - detach */
545 clientStreamDetach(getTail(), http
);
547 assert(connRegistered_
);
548 connRegistered_
= false;
549 conn
->pipeline
.popMe(Http::StreamPointer(this));
552 /// called when we encounter a response-related error
554 Http::Stream::initiateClose(const char *reason
)
556 debugs(33, 4, clientConnection
<< " because " << reason
);
557 getConn()->stopSending(reason
); // closes ASAP
561 Http::Stream::deferRecipientForLater(clientStreamNode
*node
, HttpReply
*rep
, StoreIOBuffer receivedData
)
563 debugs(33, 2, "Deferring request " << http
->uri
);
564 assert(flags
.deferred
== 0);
566 deferredparams
.node
= node
;
567 deferredparams
.rep
= rep
;
568 deferredparams
.queuedBuffer
= receivedData
;
572 Http::Stream::prepareReply(HttpReply
*rep
)
575 if (http
->request
->range
)
576 buildRangeHeader(rep
);
580 * Packs bodyData into mb using chunked encoding.
581 * Packs the last-chunk if bodyData is empty.
584 Http::Stream::packChunk(const StoreIOBuffer
&bodyData
, MemBuf
&mb
)
586 const uint64_t length
=
587 static_cast<uint64_t>(lengthToSend(bodyData
.range()));
588 noteSentBodyBytes(length
);
590 mb
.appendf("%" PRIX64
"\r\n", length
);
591 mb
.append(bodyData
.data
, length
);
592 mb
.append("\r\n", 2);
596 * extracts a "range" from *buf and appends them to mb, updating
597 * all offsets and such.
600 Http::Stream::packRange(StoreIOBuffer
const &source
, MemBuf
*mb
)
602 HttpHdrRangeIter
* i
= &http
->range_iter
;
603 Range
<int64_t> available(source
.range());
604 char const *buf
= source
.data
;
606 while (i
->currentSpec() && available
.size()) {
607 const size_t copy_sz
= lengthToSend(available
);
609 // intersection of "have" and "need" ranges must not be empty
610 assert(http
->out
.offset
< i
->currentSpec()->offset
+ i
->currentSpec()->length
);
611 assert(http
->out
.offset
+ (int64_t)available
.size() > i
->currentSpec()->offset
);
614 * put boundary and headers at the beginning of a range in a
617 if (http
->multipartRangeRequest() && i
->debt() == i
->currentSpec()->length
) {
618 assert(http
->memObject());
620 http
->memObject()->getReply(), /* original reply */
621 i
->currentSpec(), /* current range */
622 i
->boundary
, /* boundary, the same for all */
627 debugs(33, 3, "appending " << copy_sz
<< " bytes");
628 noteSentBodyBytes(copy_sz
);
629 mb
->append(buf
, copy_sz
);
632 available
.start
+= copy_sz
;
636 if (!canPackMoreRanges()) {
637 debugs(33, 3, "Returning because !canPackMoreRanges.");
639 // put terminating boundary for multiparts
640 clientPackTermBound(i
->boundary
, mb
);
644 int64_t nextOffset
= getNextRangeOffset();
645 assert(nextOffset
>= http
->out
.offset
);
646 int64_t skip
= nextOffset
- http
->out
.offset
;
647 /* adjust for not to be transmitted bytes */
648 http
->out
.offset
= nextOffset
;
650 if (available
.size() <= (uint64_t)skip
)
653 available
.start
+= skip
;
662 Http::Stream::doClose()
664 clientConnection
->close();