]> git.ipfire.org Git - thirdparty/squid.git/blame - src/http/StreamContext.cc
Add missing includes in delay pool
[thirdparty/squid.git] / src / http / StreamContext.cc
CommitLineData
d6fdeb41
AJ
1/*
2 * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9#include "squid.h"
10#include "client_side_request.h"
11#include "http/StreamContext.h"
12#include "HttpHdrContRange.h"
13#include "HttpHeaderTools.h"
bc8628c7 14#include "Store.h"
d6fdeb41
AJ
15#include "TimeOrTag.h"
16
17Http::StreamContext::StreamContext(uint32_t anId, const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
18 id(anId),
19 clientConnection(aConn),
20 http(aReq),
21 reply(nullptr),
22 writtenToSocket(0),
23 mayUseConnection_(false),
24 connRegistered_(false)
25{
26 assert(http != nullptr);
27 memset(reqbuf, '\0', sizeof (reqbuf));
28 flags.deferred = 0;
29 flags.parsed_ok = 0;
30 deferredparams.node = nullptr;
31 deferredparams.rep = nullptr;
32}
33
34Http::StreamContext::~StreamContext()
35{
36 if (auto node = getTail()) {
37 if (auto ctx = dynamic_cast<Http::StreamContext *>(node->data.getRaw())) {
38 /* We are *always* the tail - prevent recursive free */
39 assert(this == ctx);
40 node->data = nullptr;
41 }
42 }
43 httpRequestFree(http);
44}
45
46void
47Http::StreamContext::registerWithConn()
48{
49 assert(!connRegistered_);
50 assert(getConn());
51 connRegistered_ = true;
52 getConn()->pipeline.add(Http::StreamContextPointer(this));
53}
54
55bool
56Http::StreamContext::startOfOutput() const
57{
58 return http->out.size == 0;
59}
60
61void
62Http::StreamContext::writeComplete(size_t size)
63{
64 const StoreEntry *entry = http->storeEntry();
65 debugs(33, 5, clientConnection << ", sz " << size <<
66 ", off " << (http->out.size + size) << ", len " <<
67 (entry ? entry->objectLen() : 0));
68
69 http->out.size += size;
70
71 if (clientHttpRequestStatus(clientConnection->fd, http)) {
72 initiateClose("failure or true request status");
73 /* Do we leak here ? */
74 return;
75 }
76
77 switch (socketState()) {
78
79 case STREAM_NONE:
80 pullData();
81 break;
82
83 case STREAM_COMPLETE: {
84 debugs(33, 5, clientConnection << " Stream complete, keepalive is " <<
85 http->request->flags.proxyKeepalive);
86 ConnStateData *c = getConn();
87 if (!http->request->flags.proxyKeepalive)
88 clientConnection->close();
89 finished();
90 c->kick();
91 }
92 return;
93
94 case STREAM_UNPLANNED_COMPLETE:
95 initiateClose("STREAM_UNPLANNED_COMPLETE");
96 return;
97
98 case STREAM_FAILED:
99 initiateClose("STREAM_FAILED");
100 return;
101
102 default:
103 fatal("Hit unreachable code in Http::StreamContext::writeComplete\n");
104 }
105}
106
107void
108Http::StreamContext::pullData()
109{
110 debugs(33, 5, reply << " written " << http->out.size << " into " << clientConnection);
111
112 /* More data will be coming from the stream. */
113 StoreIOBuffer readBuffer;
114 /* XXX: Next requested byte in the range sequence */
115 /* XXX: length = getmaximumrangelenfgth */
116 readBuffer.offset = getNextRangeOffset();
117 readBuffer.length = HTTP_REQBUF_SZ;
118 readBuffer.data = reqbuf;
119 /* we may note we have reached the end of the wanted ranges */
120 clientStreamRead(getTail(), http, readBuffer);
121}
122
123bool
124Http::StreamContext::multipartRangeRequest() const
125{
126 return http->multipartRangeRequest();
127}
128
129int64_t
130Http::StreamContext::getNextRangeOffset() const
131{
132 debugs (33, 5, "range: " << http->request->range <<
133 "; http offset " << http->out.offset <<
134 "; reply " << reply);
135
136 // XXX: This method is called from many places, including pullData() which
137 // may be called before prepareReply() [on some Squid-generated errors].
138 // Hence, we may not even know yet whether we should honor/do ranges.
139
140 if (http->request->range) {
141 /* offset in range specs does not count the prefix of an http msg */
142 /* check: reply was parsed and range iterator was initialized */
143 assert(http->range_iter.valid);
144 /* filter out data according to range specs */
145 assert(canPackMoreRanges());
146 {
147 assert(http->range_iter.currentSpec());
148 /* offset of still missing data */
149 int64_t start = http->range_iter.currentSpec()->offset +
150 http->range_iter.currentSpec()->length -
151 http->range_iter.debt();
152 debugs(33, 3, "clientPackMoreRanges: in: offset: " << http->out.offset);
153 debugs(33, 3, "clientPackMoreRanges: out:"
154 " start: " << start <<
155 " spec[" << http->range_iter.pos - http->request->range->begin() << "]:" <<
156 " [" << http->range_iter.currentSpec()->offset <<
157 ", " << http->range_iter.currentSpec()->offset +
158 http->range_iter.currentSpec()->length << "),"
159 " len: " << http->range_iter.currentSpec()->length <<
160 " debt: " << http->range_iter.debt());
161 if (http->range_iter.currentSpec()->length != -1)
162 assert(http->out.offset <= start); /* we did not miss it */
163
164 return start;
165 }
166
167 } else if (reply && reply->content_range) {
168 /* request does not have ranges, but reply does */
169 /** \todo FIXME: should use range_iter_pos on reply, as soon as reply->content_range
170 * becomes HttpHdrRange rather than HttpHdrRangeSpec.
171 */
172 return http->out.offset + reply->content_range->spec.offset;
173 }
174
175 return http->out.offset;
176}
177
178/**
179 * increments iterator "i"
180 * used by clientPackMoreRanges
181 *
182 * \retval true there is still data available to pack more ranges
183 * \retval false
184 */
185bool
186Http::StreamContext::canPackMoreRanges() const
187{
188 /** first update iterator "i" if needed */
189 if (!http->range_iter.debt()) {
190 debugs(33, 5, "At end of current range spec for " << clientConnection);
191
192 if (http->range_iter.pos != http->range_iter.end)
193 ++http->range_iter.pos;
194
195 http->range_iter.updateSpec();
196 }
197
198 assert(!http->range_iter.debt() == !http->range_iter.currentSpec());
199
200 /* paranoid sync condition */
201 /* continue condition: need_more_data */
202 debugs(33, 5, "returning " << (http->range_iter.currentSpec() ? true : false));
203 return http->range_iter.currentSpec() ? true : false;
204}
205
206/// Adapt stream status to account for Range cases
207clientStream_status_t
208Http::StreamContext::socketState()
209{
210 switch (clientStreamStatus(getTail(), http)) {
211
212 case STREAM_NONE:
213 /* check for range support ending */
214 if (http->request->range) {
215 /* check: reply was parsed and range iterator was initialized */
216 assert(http->range_iter.valid);
217 /* filter out data according to range specs */
218
219 if (!canPackMoreRanges()) {
220 debugs(33, 5, "Range request at end of returnable " <<
221 "range sequence on " << clientConnection);
222 // we got everything we wanted from the store
223 return STREAM_COMPLETE;
224 }
225 } else if (reply && reply->content_range) {
226 /* reply has content-range, but Squid is not managing ranges */
227 const int64_t &bytesSent = http->out.offset;
228 const int64_t &bytesExpected = reply->content_range->spec.length;
229
230 debugs(33, 7, "body bytes sent vs. expected: " <<
231 bytesSent << " ? " << bytesExpected << " (+" <<
232 reply->content_range->spec.offset << ")");
233
234 // did we get at least what we expected, based on range specs?
235
236 if (bytesSent == bytesExpected) // got everything
237 return STREAM_COMPLETE;
238
239 if (bytesSent > bytesExpected) // Error: Sent more than expected
240 return STREAM_UNPLANNED_COMPLETE;
241 }
242
243 return STREAM_NONE;
244
245 case STREAM_COMPLETE:
246 return STREAM_COMPLETE;
247
248 case STREAM_UNPLANNED_COMPLETE:
249 return STREAM_UNPLANNED_COMPLETE;
250
251 case STREAM_FAILED:
252 return STREAM_FAILED;
253 }
254
255 fatal ("unreachable code\n");
256 return STREAM_NONE;
257}
258
259void
260Http::StreamContext::sendStartOfMessage(HttpReply *rep, StoreIOBuffer bodyData)
261{
262 prepareReply(rep);
263 assert(rep);
264 MemBuf *mb = rep->pack();
265
266 // dump now, so we dont output any body.
267 debugs(11, 2, "HTTP Client " << clientConnection);
268 debugs(11, 2, "HTTP Client REPLY:\n---------\n" << mb->buf << "\n----------");
269
270 /* Save length of headers for persistent conn checks */
271 http->out.headers_sz = mb->contentSize();
272#if HEADERS_LOG
273 headersLog(0, 0, http->request->method, rep);
274#endif
275
276 if (bodyData.data && bodyData.length) {
277 if (multipartRangeRequest())
278 packRange(bodyData, mb);
279 else if (http->request->flags.chunkedReply) {
280 packChunk(bodyData, *mb);
281 } else {
282 size_t length = lengthToSend(bodyData.range());
283 noteSentBodyBytes(length);
284 mb->append(bodyData.data, length);
285 }
286 }
287
288 getConn()->write(mb);
289 delete mb;
290}
291
292void
293Http::StreamContext::sendBody(StoreIOBuffer bodyData)
294{
295 if (!multipartRangeRequest() && !http->request->flags.chunkedReply) {
296 size_t length = lengthToSend(bodyData.range());
297 noteSentBodyBytes(length);
298 getConn()->write(bodyData.data, length);
299 return;
300 }
301
302 MemBuf mb;
303 mb.init();
304 if (multipartRangeRequest())
305 packRange(bodyData, &mb);
306 else
307 packChunk(bodyData, mb);
308
309 if (mb.contentSize())
310 getConn()->write(&mb);
311 else
312 writeComplete(0);
313}
314
315size_t
316Http::StreamContext::lengthToSend(Range<int64_t> const &available) const
317{
318 // the size of available range can always fit into a size_t type
319 size_t maximum = available.size();
320
321 if (!http->request->range)
322 return maximum;
323
324 assert(canPackMoreRanges());
325
326 if (http->range_iter.debt() == -1)
327 return maximum;
328
329 assert(http->range_iter.debt() > 0);
330
331 /* TODO this + the last line could be a range intersection calculation */
332 if (available.start < http->range_iter.currentSpec()->offset)
333 return 0;
334
335 return min(http->range_iter.debt(), static_cast<int64_t>(maximum));
336}
337
338void
339Http::StreamContext::noteSentBodyBytes(size_t bytes)
340{
341 debugs(33, 7, bytes << " body bytes");
342 http->out.offset += bytes;
343
344 if (!http->request->range)
345 return;
346
347 if (http->range_iter.debt() != -1) {
348 http->range_iter.debt(http->range_iter.debt() - bytes);
349 assert (http->range_iter.debt() >= 0);
350 }
351
352 /* debt() always stops at -1, below that is a bug */
353 assert(http->range_iter.debt() >= -1);
354}
355
356/// \return true when If-Range specs match reply, false otherwise
357static bool
358clientIfRangeMatch(ClientHttpRequest * http, HttpReply * rep)
359{
360 const TimeOrTag spec = http->request->header.getTimeOrTag(Http::HdrType::IF_RANGE);
361
362 /* check for parsing falure */
363 if (!spec.valid)
364 return false;
365
366 /* got an ETag? */
367 if (spec.tag.str) {
368 ETag rep_tag = rep->header.getETag(Http::HdrType::ETAG);
369 debugs(33, 3, "ETags: " << spec.tag.str << " and " <<
370 (rep_tag.str ? rep_tag.str : "<none>"));
371
372 if (!rep_tag.str)
373 return false; // entity has no etag to compare with!
374
375 if (spec.tag.weak || rep_tag.weak) {
376 debugs(33, DBG_IMPORTANT, "Weak ETags are not allowed in If-Range: " <<
377 spec.tag.str << " ? " << rep_tag.str);
378 return false; // must use strong validator for sub-range requests
379 }
380
381 return etagIsStrongEqual(rep_tag, spec.tag);
382 }
383
384 /* got modification time? */
385 if (spec.time >= 0)
386 return http->storeEntry()->lastmod <= spec.time;
387
388 assert(0); /* should not happen */
389 return false;
390}
391
392// seems to be something better suited to Server logic
393/** adds appropriate Range headers if needed */
394void
395Http::StreamContext::buildRangeHeader(HttpReply *rep)
396{
397 HttpHeader *hdr = rep ? &rep->header : nullptr;
398 const char *range_err = nullptr;
399 HttpRequest *request = http->request;
400 assert(request->range);
401 /* check if we still want to do ranges */
402 int64_t roffLimit = request->getRangeOffsetLimit();
403
404 if (!rep)
405 range_err = "no [parse-able] reply";
406 else if ((rep->sline.status() != Http::scOkay) && (rep->sline.status() != Http::scPartialContent))
407 range_err = "wrong status code";
408 else if (hdr->has(Http::HdrType::CONTENT_RANGE))
409 range_err = "origin server does ranges";
410 else if (rep->content_length < 0)
411 range_err = "unknown length";
412 else if (rep->content_length != http->memObject()->getReply()->content_length)
413 range_err = "INCONSISTENT length"; /* a bug? */
414
415 /* hits only - upstream CachePeer determines correct behaviour on misses,
416 * and client_side_reply determines hits candidates
417 */
418 else if (http->logType.isTcpHit() &&
419 http->request->header.has(Http::HdrType::IF_RANGE) &&
420 !clientIfRangeMatch(http, rep))
421 range_err = "If-Range match failed";
422
423 else if (!http->request->range->canonize(rep))
424 range_err = "canonization failed";
425 else if (http->request->range->isComplex())
426 range_err = "too complex range header";
427 else if (!http->logType.isTcpHit() && http->request->range->offsetLimitExceeded(roffLimit))
428 range_err = "range outside range_offset_limit";
429
430 /* get rid of our range specs on error */
431 if (range_err) {
432 /* XXX We do this here because we need canonisation etc. However, this current
433 * code will lead to incorrect store offset requests - the store will have the
434 * offset data, but we won't be requesting it.
435 * So, we can either re-request, or generate an error
436 */
437 http->request->ignoreRange(range_err);
438 } else {
439 /* XXX: TODO: Review, this unconditional set may be wrong. */
440 rep->sline.set(rep->sline.version, Http::scPartialContent);
441 // web server responded with a valid, but unexpected range.
442 // will (try-to) forward as-is.
443 //TODO: we should cope with multirange request/responses
444 bool replyMatchRequest = rep->content_range != nullptr ?
445 request->range->contains(rep->content_range->spec) :
446 true;
447 const int spec_count = http->request->range->specs.size();
448 int64_t actual_clen = -1;
449
450 debugs(33, 3, "range spec count: " << spec_count <<
451 " virgin clen: " << rep->content_length);
452 assert(spec_count > 0);
453 /* append appropriate header(s) */
454 if (spec_count == 1) {
455 if (!replyMatchRequest) {
456 hdr->delById(Http::HdrType::CONTENT_RANGE);
457 hdr->putContRange(rep->content_range);
458 actual_clen = rep->content_length;
459 //http->range_iter.pos = rep->content_range->spec.begin();
460 (*http->range_iter.pos)->offset = rep->content_range->spec.offset;
461 (*http->range_iter.pos)->length = rep->content_range->spec.length;
462
463 } else {
464 HttpHdrRange::iterator pos = http->request->range->begin();
465 assert(*pos);
466 /* append Content-Range */
467
468 if (!hdr->has(Http::HdrType::CONTENT_RANGE)) {
469 /* No content range, so this was a full object we are
470 * sending parts of.
471 */
472 httpHeaderAddContRange(hdr, **pos, rep->content_length);
473 }
474
475 /* set new Content-Length to the actual number of bytes
476 * transmitted in the message-body */
477 actual_clen = (*pos)->length;
478 }
479 } else {
480 /* multipart! */
481 /* generate boundary string */
482 http->range_iter.boundary = http->rangeBoundaryStr();
483 /* delete old Content-Type, add ours */
484 hdr->delById(Http::HdrType::CONTENT_TYPE);
485 httpHeaderPutStrf(hdr, Http::HdrType::CONTENT_TYPE,
486 "multipart/byteranges; boundary=\"" SQUIDSTRINGPH "\"",
487 SQUIDSTRINGPRINT(http->range_iter.boundary));
488 /* Content-Length is not required in multipart responses
489 * but it is always nice to have one */
490 actual_clen = http->mRangeCLen();
491
492 /* http->out needs to start where we want data at */
493 http->out.offset = http->range_iter.currentSpec()->offset;
494 }
495
496 /* replace Content-Length header */
497 assert(actual_clen >= 0);
498 hdr->delById(Http::HdrType::CONTENT_LENGTH);
499 hdr->putInt64(Http::HdrType::CONTENT_LENGTH, actual_clen);
500 debugs(33, 3, "actual content length: " << actual_clen);
501
502 /* And start the range iter off */
503 http->range_iter.updateSpec();
504 }
505}
506
507clientStreamNode *
508Http::StreamContext::getTail() const
509{
510 if (http->client_stream.tail)
511 return static_cast<clientStreamNode *>(http->client_stream.tail->data);
512
513 return nullptr;
514}
515
516clientStreamNode *
517Http::StreamContext::getClientReplyContext() const
518{
519 return static_cast<clientStreamNode *>(http->client_stream.tail->prev->data);
520}
521
522ConnStateData *
523Http::StreamContext::getConn() const
524{
525 assert(http && http->getConn());
526 return http->getConn();
527}
528
529/// remembers the abnormal connection termination for logging purposes
530void
531Http::StreamContext::noteIoError(const int xerrno)
532{
533 if (http) {
534 http->logType.err.timedout = (xerrno == ETIMEDOUT);
535 // aborted even if xerrno is zero (which means read abort/eof)
536 http->logType.err.aborted = (xerrno != ETIMEDOUT);
537 }
538}
539
540void
541Http::StreamContext::finished()
542{
543 ConnStateData *conn = getConn();
544
545 /* we can't handle any more stream data - detach */
546 clientStreamDetach(getTail(), http);
547
548 assert(connRegistered_);
549 connRegistered_ = false;
550 conn->pipeline.popById(id);
551}
552
553/// called when we encounter a response-related error
554void
555Http::StreamContext::initiateClose(const char *reason)
556{
557 debugs(33, 4, clientConnection << " because " << reason);
558 getConn()->stopSending(reason); // closes ASAP
559}
560
561void
562Http::StreamContext::deferRecipientForLater(clientStreamNode *node, HttpReply *rep, StoreIOBuffer receivedData)
563{
564 debugs(33, 2, "Deferring request " << http->uri);
565 assert(flags.deferred == 0);
566 flags.deferred = 1;
567 deferredparams.node = node;
568 deferredparams.rep = rep;
569 deferredparams.queuedBuffer = receivedData;
570}
571
572void
573Http::StreamContext::prepareReply(HttpReply *rep)
574{
575 reply = rep;
576 if (http->request->range)
577 buildRangeHeader(rep);
578}
579
580/**
581 * Packs bodyData into mb using chunked encoding.
582 * Packs the last-chunk if bodyData is empty.
583 */
584void
585Http::StreamContext::packChunk(const StoreIOBuffer &bodyData, MemBuf &mb)
586{
587 const uint64_t length =
588 static_cast<uint64_t>(lengthToSend(bodyData.range()));
589 noteSentBodyBytes(length);
590
591 mb.appendf("%" PRIX64 "\r\n", length);
592 mb.append(bodyData.data, length);
593 mb.append("\r\n", 2);
594}
595
596/**
597 * extracts a "range" from *buf and appends them to mb, updating
598 * all offsets and such.
599 */
600void
601Http::StreamContext::packRange(StoreIOBuffer const &source, MemBuf *mb)
602{
603 HttpHdrRangeIter * i = &http->range_iter;
604 Range<int64_t> available(source.range());
605 char const *buf = source.data;
606
607 while (i->currentSpec() && available.size()) {
608 const size_t copy_sz = lengthToSend(available);
609 if (copy_sz) {
610 // intersection of "have" and "need" ranges must not be empty
611 assert(http->out.offset < i->currentSpec()->offset + i->currentSpec()->length);
612 assert(http->out.offset + (int64_t)available.size() > i->currentSpec()->offset);
613
614 /*
615 * put boundary and headers at the beginning of a range in a
616 * multi-range
617 */
618 if (http->multipartRangeRequest() && i->debt() == i->currentSpec()->length) {
619 assert(http->memObject());
620 clientPackRangeHdr(
621 http->memObject()->getReply(), /* original reply */
622 i->currentSpec(), /* current range */
623 i->boundary, /* boundary, the same for all */
624 mb);
625 }
626
627 // append content
628 debugs(33, 3, "appending " << copy_sz << " bytes");
629 noteSentBodyBytes(copy_sz);
630 mb->append(buf, copy_sz);
631
632 // update offsets
633 available.start += copy_sz;
634 buf += copy_sz;
635 }
636
637 if (!canPackMoreRanges()) {
638 debugs(33, 3, "Returning because !canPackMoreRanges.");
639 if (i->debt() == 0)
640 // put terminating boundary for multiparts
641 clientPackTermBound(i->boundary, mb);
642 return;
643 }
644
645 int64_t nextOffset = getNextRangeOffset();
646 assert(nextOffset >= http->out.offset);
647 int64_t skip = nextOffset - http->out.offset;
648 /* adjust for not to be transmitted bytes */
649 http->out.offset = nextOffset;
650
651 if (available.size() <= (uint64_t)skip)
652 return;
653
654 available.start += skip;
655 buf += skip;
656
657 if (copy_sz == 0)
658 return;
659 }
660}
661
662void
663Http::StreamContext::doClose()
664{
665 clientConnection->close();
666}
667