]> git.ipfire.org Git - thirdparty/squid.git/blob - src/http/Stream.cc
471d83f66aa0f84b68f3644a8ab64e250d475317
[thirdparty/squid.git] / src / http / Stream.cc
1 /*
2 * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 #include "squid.h"
10 #include "client_side_request.h"
11 #include "http/Stream.h"
12 #include "HttpHdrContRange.h"
13 #include "HttpHeaderTools.h"
14 #include "Store.h"
15 #include "TimeOrTag.h"
16
17 Http::Stream::Stream(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
18 clientConnection(aConn),
19 http(aReq),
20 reply(nullptr),
21 writtenToSocket(0),
22 mayUseConnection_(false),
23 connRegistered_(false)
24 {
25 assert(http != nullptr);
26 memset(reqbuf, '\0', sizeof (reqbuf));
27 flags.deferred = 0;
28 flags.parsed_ok = 0;
29 deferredparams.node = nullptr;
30 deferredparams.rep = nullptr;
31 }
32
33 Http::Stream::~Stream()
34 {
35 if (auto node = getTail()) {
36 if (auto ctx = dynamic_cast<Http::Stream *>(node->data.getRaw())) {
37 /* We are *always* the tail - prevent recursive free */
38 assert(this == ctx);
39 node->data = nullptr;
40 }
41 }
42 httpRequestFree(http);
43 }
44
45 void
46 Http::Stream::registerWithConn()
47 {
48 assert(!connRegistered_);
49 assert(getConn());
50 connRegistered_ = true;
51 getConn()->pipeline.add(Http::StreamPointer(this));
52 }
53
54 bool
55 Http::Stream::startOfOutput() const
56 {
57 return http->out.size == 0;
58 }
59
60 void
61 Http::Stream::writeComplete(size_t size)
62 {
63 const StoreEntry *entry = http->storeEntry();
64 debugs(33, 5, clientConnection << ", sz " << size <<
65 ", off " << (http->out.size + size) << ", len " <<
66 (entry ? entry->objectLen() : 0));
67
68 http->out.size += size;
69
70 if (clientHttpRequestStatus(clientConnection->fd, http)) {
71 initiateClose("failure or true request status");
72 /* Do we leak here ? */
73 return;
74 }
75
76 switch (socketState()) {
77
78 case STREAM_NONE:
79 pullData();
80 break;
81
82 case STREAM_COMPLETE: {
83 debugs(33, 5, clientConnection << " Stream complete, keepalive is " <<
84 http->request->flags.proxyKeepalive);
85 ConnStateData *c = getConn();
86 if (!http->request->flags.proxyKeepalive)
87 clientConnection->close();
88 finished();
89 c->kick();
90 }
91 return;
92
93 case STREAM_UNPLANNED_COMPLETE:
94 initiateClose("STREAM_UNPLANNED_COMPLETE");
95 return;
96
97 case STREAM_FAILED:
98 initiateClose("STREAM_FAILED");
99 return;
100
101 default:
102 fatal("Hit unreachable code in Http::Stream::writeComplete\n");
103 }
104 }
105
106 void
107 Http::Stream::pullData()
108 {
109 debugs(33, 5, reply << " written " << http->out.size << " into " << clientConnection);
110
111 /* More data will be coming from the stream. */
112 StoreIOBuffer readBuffer;
113 /* XXX: Next requested byte in the range sequence */
114 /* XXX: length = getmaximumrangelenfgth */
115 readBuffer.offset = getNextRangeOffset();
116 readBuffer.length = HTTP_REQBUF_SZ;
117 readBuffer.data = reqbuf;
118 /* we may note we have reached the end of the wanted ranges */
119 clientStreamRead(getTail(), http, readBuffer);
120 }
121
122 bool
123 Http::Stream::multipartRangeRequest() const
124 {
125 return http->multipartRangeRequest();
126 }
127
128 int64_t
129 Http::Stream::getNextRangeOffset() const
130 {
131 debugs (33, 5, "range: " << http->request->range <<
132 "; http offset " << http->out.offset <<
133 "; reply " << reply);
134
135 // XXX: This method is called from many places, including pullData() which
136 // may be called before prepareReply() [on some Squid-generated errors].
137 // Hence, we may not even know yet whether we should honor/do ranges.
138
139 if (http->request->range) {
140 /* offset in range specs does not count the prefix of an http msg */
141 /* check: reply was parsed and range iterator was initialized */
142 assert(http->range_iter.valid);
143 /* filter out data according to range specs */
144 assert(canPackMoreRanges());
145 {
146 assert(http->range_iter.currentSpec());
147 /* offset of still missing data */
148 int64_t start = http->range_iter.currentSpec()->offset +
149 http->range_iter.currentSpec()->length -
150 http->range_iter.debt();
151 debugs(33, 3, "clientPackMoreRanges: in: offset: " << http->out.offset);
152 debugs(33, 3, "clientPackMoreRanges: out:"
153 " start: " << start <<
154 " spec[" << http->range_iter.pos - http->request->range->begin() << "]:" <<
155 " [" << http->range_iter.currentSpec()->offset <<
156 ", " << http->range_iter.currentSpec()->offset +
157 http->range_iter.currentSpec()->length << "),"
158 " len: " << http->range_iter.currentSpec()->length <<
159 " debt: " << http->range_iter.debt());
160 if (http->range_iter.currentSpec()->length != -1)
161 assert(http->out.offset <= start); /* we did not miss it */
162
163 return start;
164 }
165
166 } else if (reply && reply->content_range) {
167 /* request does not have ranges, but reply does */
168 /** \todo FIXME: should use range_iter_pos on reply, as soon as reply->content_range
169 * becomes HttpHdrRange rather than HttpHdrRangeSpec.
170 */
171 return http->out.offset + reply->content_range->spec.offset;
172 }
173
174 return http->out.offset;
175 }
176
177 /**
178 * increments iterator "i"
179 * used by clientPackMoreRanges
180 *
181 * \retval true there is still data available to pack more ranges
182 * \retval false
183 */
184 bool
185 Http::Stream::canPackMoreRanges() const
186 {
187 /** first update iterator "i" if needed */
188 if (!http->range_iter.debt()) {
189 debugs(33, 5, "At end of current range spec for " << clientConnection);
190
191 if (http->range_iter.pos != http->range_iter.end)
192 ++http->range_iter.pos;
193
194 http->range_iter.updateSpec();
195 }
196
197 assert(!http->range_iter.debt() == !http->range_iter.currentSpec());
198
199 /* paranoid sync condition */
200 /* continue condition: need_more_data */
201 debugs(33, 5, "returning " << (http->range_iter.currentSpec() ? true : false));
202 return http->range_iter.currentSpec() ? true : false;
203 }
204
205 /// Adapt stream status to account for Range cases
206 clientStream_status_t
207 Http::Stream::socketState()
208 {
209 switch (clientStreamStatus(getTail(), http)) {
210
211 case STREAM_NONE:
212 /* check for range support ending */
213 if (http->request->range) {
214 /* check: reply was parsed and range iterator was initialized */
215 assert(http->range_iter.valid);
216 /* filter out data according to range specs */
217
218 if (!canPackMoreRanges()) {
219 debugs(33, 5, "Range request at end of returnable " <<
220 "range sequence on " << clientConnection);
221 // we got everything we wanted from the store
222 return STREAM_COMPLETE;
223 }
224 } else if (reply && reply->content_range) {
225 /* reply has content-range, but Squid is not managing ranges */
226 const int64_t &bytesSent = http->out.offset;
227 const int64_t &bytesExpected = reply->content_range->spec.length;
228
229 debugs(33, 7, "body bytes sent vs. expected: " <<
230 bytesSent << " ? " << bytesExpected << " (+" <<
231 reply->content_range->spec.offset << ")");
232
233 // did we get at least what we expected, based on range specs?
234
235 if (bytesSent == bytesExpected) // got everything
236 return STREAM_COMPLETE;
237
238 if (bytesSent > bytesExpected) // Error: Sent more than expected
239 return STREAM_UNPLANNED_COMPLETE;
240 }
241
242 return STREAM_NONE;
243
244 case STREAM_COMPLETE:
245 return STREAM_COMPLETE;
246
247 case STREAM_UNPLANNED_COMPLETE:
248 return STREAM_UNPLANNED_COMPLETE;
249
250 case STREAM_FAILED:
251 return STREAM_FAILED;
252 }
253
254 fatal ("unreachable code\n");
255 return STREAM_NONE;
256 }
257
258 void
259 Http::Stream::sendStartOfMessage(HttpReply *rep, StoreIOBuffer bodyData)
260 {
261 prepareReply(rep);
262 assert(rep);
263 MemBuf *mb = rep->pack();
264
265 // dump now, so we dont output any body.
266 debugs(11, 2, "HTTP Client " << clientConnection);
267 debugs(11, 2, "HTTP Client REPLY:\n---------\n" << mb->buf << "\n----------");
268
269 /* Save length of headers for persistent conn checks */
270 http->out.headers_sz = mb->contentSize();
271 #if HEADERS_LOG
272 headersLog(0, 0, http->request->method, rep);
273 #endif
274
275 if (bodyData.data && bodyData.length) {
276 if (multipartRangeRequest())
277 packRange(bodyData, mb);
278 else if (http->request->flags.chunkedReply) {
279 packChunk(bodyData, *mb);
280 } else {
281 size_t length = lengthToSend(bodyData.range());
282 noteSentBodyBytes(length);
283 mb->append(bodyData.data, length);
284 }
285 }
286
287 getConn()->write(mb);
288 delete mb;
289 }
290
291 void
292 Http::Stream::sendBody(StoreIOBuffer bodyData)
293 {
294 if (!multipartRangeRequest() && !http->request->flags.chunkedReply) {
295 size_t length = lengthToSend(bodyData.range());
296 noteSentBodyBytes(length);
297 getConn()->write(bodyData.data, length);
298 return;
299 }
300
301 MemBuf mb;
302 mb.init();
303 if (multipartRangeRequest())
304 packRange(bodyData, &mb);
305 else
306 packChunk(bodyData, mb);
307
308 if (mb.contentSize())
309 getConn()->write(&mb);
310 else
311 writeComplete(0);
312 }
313
314 size_t
315 Http::Stream::lengthToSend(Range<int64_t> const &available) const
316 {
317 // the size of available range can always fit into a size_t type
318 size_t maximum = available.size();
319
320 if (!http->request->range)
321 return maximum;
322
323 assert(canPackMoreRanges());
324
325 if (http->range_iter.debt() == -1)
326 return maximum;
327
328 assert(http->range_iter.debt() > 0);
329
330 /* TODO this + the last line could be a range intersection calculation */
331 if (available.start < http->range_iter.currentSpec()->offset)
332 return 0;
333
334 return min(http->range_iter.debt(), static_cast<int64_t>(maximum));
335 }
336
337 void
338 Http::Stream::noteSentBodyBytes(size_t bytes)
339 {
340 debugs(33, 7, bytes << " body bytes");
341 http->out.offset += bytes;
342
343 if (!http->request->range)
344 return;
345
346 if (http->range_iter.debt() != -1) {
347 http->range_iter.debt(http->range_iter.debt() - bytes);
348 assert (http->range_iter.debt() >= 0);
349 }
350
351 /* debt() always stops at -1, below that is a bug */
352 assert(http->range_iter.debt() >= -1);
353 }
354
355 /// \return true when If-Range specs match reply, false otherwise
356 static bool
357 clientIfRangeMatch(ClientHttpRequest * http, HttpReply * rep)
358 {
359 const TimeOrTag spec = http->request->header.getTimeOrTag(Http::HdrType::IF_RANGE);
360
361 /* check for parsing falure */
362 if (!spec.valid)
363 return false;
364
365 /* got an ETag? */
366 if (spec.tag.str) {
367 ETag rep_tag = rep->header.getETag(Http::HdrType::ETAG);
368 debugs(33, 3, "ETags: " << spec.tag.str << " and " <<
369 (rep_tag.str ? rep_tag.str : "<none>"));
370
371 if (!rep_tag.str)
372 return false; // entity has no etag to compare with!
373
374 if (spec.tag.weak || rep_tag.weak) {
375 debugs(33, DBG_IMPORTANT, "Weak ETags are not allowed in If-Range: " <<
376 spec.tag.str << " ? " << rep_tag.str);
377 return false; // must use strong validator for sub-range requests
378 }
379
380 return etagIsStrongEqual(rep_tag, spec.tag);
381 }
382
383 /* got modification time? */
384 if (spec.time >= 0)
385 return !http->storeEntry()->modifiedSince(spec.time);
386
387 assert(0); /* should not happen */
388 return false;
389 }
390
391 // seems to be something better suited to Server logic
392 /** adds appropriate Range headers if needed */
393 void
394 Http::Stream::buildRangeHeader(HttpReply *rep)
395 {
396 HttpHeader *hdr = rep ? &rep->header : nullptr;
397 const char *range_err = nullptr;
398 HttpRequest *request = http->request;
399 assert(request->range);
400 /* check if we still want to do ranges */
401 int64_t roffLimit = request->getRangeOffsetLimit();
402
403 if (!rep)
404 range_err = "no [parse-able] reply";
405 else if ((rep->sline.status() != Http::scOkay) && (rep->sline.status() != Http::scPartialContent))
406 range_err = "wrong status code";
407 else if (hdr->has(Http::HdrType::CONTENT_RANGE))
408 range_err = "origin server does ranges";
409 else if (rep->content_length < 0)
410 range_err = "unknown length";
411 else if (rep->content_length != http->memObject()->getReply()->content_length)
412 range_err = "INCONSISTENT length"; /* a bug? */
413
414 /* hits only - upstream CachePeer determines correct behaviour on misses,
415 * and client_side_reply determines hits candidates
416 */
417 else if (http->logType.isTcpHit() &&
418 http->request->header.has(Http::HdrType::IF_RANGE) &&
419 !clientIfRangeMatch(http, rep))
420 range_err = "If-Range match failed";
421
422 else if (!http->request->range->canonize(rep))
423 range_err = "canonization failed";
424 else if (http->request->range->isComplex())
425 range_err = "too complex range header";
426 else if (!http->logType.isTcpHit() && http->request->range->offsetLimitExceeded(roffLimit))
427 range_err = "range outside range_offset_limit";
428
429 /* get rid of our range specs on error */
430 if (range_err) {
431 /* XXX We do this here because we need canonisation etc. However, this current
432 * code will lead to incorrect store offset requests - the store will have the
433 * offset data, but we won't be requesting it.
434 * So, we can either re-request, or generate an error
435 */
436 http->request->ignoreRange(range_err);
437 } else {
438 /* XXX: TODO: Review, this unconditional set may be wrong. */
439 rep->sline.set(rep->sline.version, Http::scPartialContent);
440 // web server responded with a valid, but unexpected range.
441 // will (try-to) forward as-is.
442 //TODO: we should cope with multirange request/responses
443 bool replyMatchRequest = rep->content_range != nullptr ?
444 request->range->contains(rep->content_range->spec) :
445 true;
446 const int spec_count = http->request->range->specs.size();
447 int64_t actual_clen = -1;
448
449 debugs(33, 3, "range spec count: " << spec_count <<
450 " virgin clen: " << rep->content_length);
451 assert(spec_count > 0);
452 /* append appropriate header(s) */
453 if (spec_count == 1) {
454 if (!replyMatchRequest) {
455 hdr->delById(Http::HdrType::CONTENT_RANGE);
456 hdr->putContRange(rep->content_range);
457 actual_clen = rep->content_length;
458 //http->range_iter.pos = rep->content_range->spec.begin();
459 (*http->range_iter.pos)->offset = rep->content_range->spec.offset;
460 (*http->range_iter.pos)->length = rep->content_range->spec.length;
461
462 } else {
463 HttpHdrRange::iterator pos = http->request->range->begin();
464 assert(*pos);
465 /* append Content-Range */
466
467 if (!hdr->has(Http::HdrType::CONTENT_RANGE)) {
468 /* No content range, so this was a full object we are
469 * sending parts of.
470 */
471 httpHeaderAddContRange(hdr, **pos, rep->content_length);
472 }
473
474 /* set new Content-Length to the actual number of bytes
475 * transmitted in the message-body */
476 actual_clen = (*pos)->length;
477 }
478 } else {
479 /* multipart! */
480 /* generate boundary string */
481 http->range_iter.boundary = http->rangeBoundaryStr();
482 /* delete old Content-Type, add ours */
483 hdr->delById(Http::HdrType::CONTENT_TYPE);
484 httpHeaderPutStrf(hdr, Http::HdrType::CONTENT_TYPE,
485 "multipart/byteranges; boundary=\"" SQUIDSTRINGPH "\"",
486 SQUIDSTRINGPRINT(http->range_iter.boundary));
487 /* Content-Length is not required in multipart responses
488 * but it is always nice to have one */
489 actual_clen = http->mRangeCLen();
490
491 /* http->out needs to start where we want data at */
492 http->out.offset = http->range_iter.currentSpec()->offset;
493 }
494
495 /* replace Content-Length header */
496 assert(actual_clen >= 0);
497 hdr->delById(Http::HdrType::CONTENT_LENGTH);
498 hdr->putInt64(Http::HdrType::CONTENT_LENGTH, actual_clen);
499 debugs(33, 3, "actual content length: " << actual_clen);
500
501 /* And start the range iter off */
502 http->range_iter.updateSpec();
503 }
504 }
505
506 clientStreamNode *
507 Http::Stream::getTail() const
508 {
509 if (http->client_stream.tail)
510 return static_cast<clientStreamNode *>(http->client_stream.tail->data);
511
512 return nullptr;
513 }
514
515 clientStreamNode *
516 Http::Stream::getClientReplyContext() const
517 {
518 return static_cast<clientStreamNode *>(http->client_stream.tail->prev->data);
519 }
520
521 ConnStateData *
522 Http::Stream::getConn() const
523 {
524 assert(http && http->getConn());
525 return http->getConn();
526 }
527
528 /// remembers the abnormal connection termination for logging purposes
529 void
530 Http::Stream::noteIoError(const int xerrno)
531 {
532 if (http) {
533 http->logType.err.timedout = (xerrno == ETIMEDOUT);
534 // aborted even if xerrno is zero (which means read abort/eof)
535 http->logType.err.aborted = (xerrno != ETIMEDOUT);
536 }
537 }
538
539 void
540 Http::Stream::finished()
541 {
542 ConnStateData *conn = getConn();
543
544 /* we can't handle any more stream data - detach */
545 clientStreamDetach(getTail(), http);
546
547 assert(connRegistered_);
548 connRegistered_ = false;
549 conn->pipeline.popMe(Http::StreamPointer(this));
550 }
551
552 /// called when we encounter a response-related error
553 void
554 Http::Stream::initiateClose(const char *reason)
555 {
556 debugs(33, 4, clientConnection << " because " << reason);
557 getConn()->stopSending(reason); // closes ASAP
558 }
559
560 void
561 Http::Stream::deferRecipientForLater(clientStreamNode *node, HttpReply *rep, StoreIOBuffer receivedData)
562 {
563 debugs(33, 2, "Deferring request " << http->uri);
564 assert(flags.deferred == 0);
565 flags.deferred = 1;
566 deferredparams.node = node;
567 deferredparams.rep = rep;
568 deferredparams.queuedBuffer = receivedData;
569 }
570
571 void
572 Http::Stream::prepareReply(HttpReply *rep)
573 {
574 reply = rep;
575 if (http->request->range)
576 buildRangeHeader(rep);
577 }
578
579 /**
580 * Packs bodyData into mb using chunked encoding.
581 * Packs the last-chunk if bodyData is empty.
582 */
583 void
584 Http::Stream::packChunk(const StoreIOBuffer &bodyData, MemBuf &mb)
585 {
586 const uint64_t length =
587 static_cast<uint64_t>(lengthToSend(bodyData.range()));
588 noteSentBodyBytes(length);
589
590 mb.appendf("%" PRIX64 "\r\n", length);
591 mb.append(bodyData.data, length);
592 mb.append("\r\n", 2);
593 }
594
595 /**
596 * extracts a "range" from *buf and appends them to mb, updating
597 * all offsets and such.
598 */
599 void
600 Http::Stream::packRange(StoreIOBuffer const &source, MemBuf *mb)
601 {
602 HttpHdrRangeIter * i = &http->range_iter;
603 Range<int64_t> available(source.range());
604 char const *buf = source.data;
605
606 while (i->currentSpec() && available.size()) {
607 const size_t copy_sz = lengthToSend(available);
608 if (copy_sz) {
609 // intersection of "have" and "need" ranges must not be empty
610 assert(http->out.offset < i->currentSpec()->offset + i->currentSpec()->length);
611 assert(http->out.offset + (int64_t)available.size() > i->currentSpec()->offset);
612
613 /*
614 * put boundary and headers at the beginning of a range in a
615 * multi-range
616 */
617 if (http->multipartRangeRequest() && i->debt() == i->currentSpec()->length) {
618 assert(http->memObject());
619 clientPackRangeHdr(
620 http->memObject()->getReply(), /* original reply */
621 i->currentSpec(), /* current range */
622 i->boundary, /* boundary, the same for all */
623 mb);
624 }
625
626 // append content
627 debugs(33, 3, "appending " << copy_sz << " bytes");
628 noteSentBodyBytes(copy_sz);
629 mb->append(buf, copy_sz);
630
631 // update offsets
632 available.start += copy_sz;
633 buf += copy_sz;
634 }
635
636 if (!canPackMoreRanges()) {
637 debugs(33, 3, "Returning because !canPackMoreRanges.");
638 if (i->debt() == 0)
639 // put terminating boundary for multiparts
640 clientPackTermBound(i->boundary, mb);
641 return;
642 }
643
644 int64_t nextOffset = getNextRangeOffset();
645 assert(nextOffset >= http->out.offset);
646 int64_t skip = nextOffset - http->out.offset;
647 /* adjust for not to be transmitted bytes */
648 http->out.offset = nextOffset;
649
650 if (available.size() <= (uint64_t)skip)
651 return;
652
653 available.start += skip;
654 buf += skip;
655
656 if (copy_sz == 0)
657 return;
658 }
659 }
660
661 void
662 Http::Stream::doClose()
663 {
664 clientConnection->close();
665 }
666