]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Shuffle Http::StreamContext methods to libsquid-http.la
authorAmos Jeffries <squid3@treenet.co.nz>
Sat, 9 Jan 2016 17:09:01 +0000 (06:09 +1300)
committerAmos Jeffries <squid3@treenet.co.nz>
Sat, 9 Jan 2016 17:09:01 +0000 (06:09 +1300)
src/client_side.cc
src/client_side.h
src/http/Makefile.am
src/http/StreamContext.cc [new file with mode: 0644]
src/http/StreamContext.h
src/servers/Http1Server.cc
src/tests/stub_client_side.cc

index 4deace0fd50497456c45b76aa27fc9267e1ceabd..51077963e6360664df6f11655a130560be51f77d 100644 (file)
@@ -190,88 +190,9 @@ static void clientUpdateHierCounters(HierarchyLogEntry *);
 static bool clientPingHasFinished(ping_data const *aPing);
 void prepareLogWithRequestDetails(HttpRequest *, AccessLogEntry::Pointer &);
 static void ClientSocketContextPushDeferredIfNeeded(Http::StreamContextPointer deferredRequest, ConnStateData * conn);
-static void clientUpdateSocketStats(const LogTags &logType, size_t size);
 
 char *skipLeadingSpace(char *aString);
 
-clientStreamNode *
-Http::StreamContext::getTail() const
-{
-    if (http->client_stream.tail)
-        return (clientStreamNode *)http->client_stream.tail->data;
-
-    return NULL;
-}
-
-clientStreamNode *
-Http::StreamContext::getClientReplyContext() const
-{
-    return (clientStreamNode *)http->client_stream.tail->prev->data;
-}
-
-ConnStateData *
-Http::StreamContext::getConn() const
-{
-    return http->getConn();
-}
-
-Http::StreamContext::~StreamContext()
-{
-    clientStreamNode *node = getTail();
-
-    if (node) {
-        if (auto ctx = dynamic_cast<Http::StreamContext *>(node->data.getRaw())) {
-            /* We are *always* the tail - prevent recursive free */
-            assert(this == ctx);
-            node->data = nullptr;
-        }
-    }
-
-    httpRequestFree(http);
-}
-
-void
-Http::StreamContext::registerWithConn()
-{
-    assert (!connRegistered_);
-    assert (http);
-    assert (http->getConn() != NULL);
-    connRegistered_ = true;
-    http->getConn()->pipeline.add(Http::StreamContextPointer(this));
-}
-
-void
-Http::StreamContext::finished()
-{
-    assert (http);
-    assert (http->getConn() != NULL);
-    ConnStateData *conn = http->getConn();
-
-    /* we can't handle any more stream data - detach */
-    clientStreamDetach(getTail(), http);
-
-    assert(connRegistered_);
-    connRegistered_ = false;
-    conn->pipeline.popById(id);
-}
-
-Http::StreamContext::StreamContext(uint32_t anId, const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
-    id(anId),
-    clientConnection(aConn),
-    http(aReq),
-    reply(NULL),
-    writtenToSocket(0),
-    mayUseConnection_ (false),
-    connRegistered_ (false)
-{
-    assert(http != NULL);
-    memset (reqbuf, '\0', sizeof (reqbuf));
-    flags.deferred = 0;
-    flags.parsed_ok = 0;
-    deferredparams.node = NULL;
-    deferredparams.rep = NULL;
-}
-
 #if USE_IDENT
 static void
 clientIdentDone(const char *ident, void *data)
@@ -764,129 +685,20 @@ clientIsRequestBodyTooLargeForPolicy(int64_t bodyLength)
     return 0;
 }
 
-void
-Http::StreamContext::deferRecipientForLater(clientStreamNode * node, HttpReply * rep, StoreIOBuffer receivedData)
-{
-    debugs(33, 2, "clientSocketRecipient: Deferring request " << http->uri);
-    assert(flags.deferred == 0);
-    flags.deferred = 1;
-    deferredparams.node = node;
-    deferredparams.rep = rep;
-    deferredparams.queuedBuffer = receivedData;
-    return;
-}
-
-bool
-Http::StreamContext::startOfOutput() const
-{
-    return http->out.size == 0;
-}
-
-size_t
-Http::StreamContext::lengthToSend(Range<int64_t> const &available)
-{
-    /*the size of available range can always fit in a size_t type*/
-    size_t maximum = (size_t)available.size();
-
-    if (!http->request->range)
-        return maximum;
-
-    assert (canPackMoreRanges());
-
-    if (http->range_iter.debt() == -1)
-        return maximum;
-
-    assert (http->range_iter.debt() > 0);
-
-    /* TODO this + the last line could be a range intersection calculation */
-    if (available.start < http->range_iter.currentSpec()->offset)
-        return 0;
-
-    return min(http->range_iter.debt(), (int64_t)maximum);
-}
-
-void
-Http::StreamContext::noteSentBodyBytes(size_t bytes)
-{
-    debugs(33, 7, bytes << " body bytes");
-
-    http->out.offset += bytes;
-
-    if (!http->request->range)
-        return;
-
-    if (http->range_iter.debt() != -1) {
-        http->range_iter.debt(http->range_iter.debt() - bytes);
-        assert (http->range_iter.debt() >= 0);
-    }
-
-    /* debt() always stops at -1, below that is a bug */
-    assert (http->range_iter.debt() >= -1);
-}
-
 bool
 ClientHttpRequest::multipartRangeRequest() const
 {
     return request->multipartRangeRequest();
 }
 
-bool
-Http::StreamContext::multipartRangeRequest() const
-{
-    return http->multipartRangeRequest();
-}
-
-void
-Http::StreamContext::sendBody(HttpReply * rep, StoreIOBuffer bodyData)
-{
-    assert(rep == NULL);
-
-    if (!multipartRangeRequest() && !http->request->flags.chunkedReply) {
-        size_t length = lengthToSend(bodyData.range());
-        noteSentBodyBytes (length);
-        getConn()->write(bodyData.data, length);
-        return;
-    }
-
-    MemBuf mb;
-    mb.init();
-    if (multipartRangeRequest())
-        packRange(bodyData, &mb);
-    else
-        packChunk(bodyData, mb);
-
-    if (mb.contentSize())
-        getConn()->write(&mb);
-    else
-        writeComplete(0);
-}
-
-/**
- * Packs bodyData into mb using chunked encoding. Packs the last-chunk
- * if bodyData is empty.
- */
 void
-Http::StreamContext::packChunk(const StoreIOBuffer &bodyData, MemBuf &mb)
-{
-    const uint64_t length =
-        static_cast<uint64_t>(lengthToSend(bodyData.range()));
-    noteSentBodyBytes(length);
-
-    mb.appendf("%" PRIX64 "\r\n", length);
-    mb.append(bodyData.data, length);
-    mb.append("\r\n", 2);
-}
-
-/** put terminating boundary for multiparts */
-static void
-clientPackTermBound(String boundary, MemBuf * mb)
+clientPackTermBound(String boundary, MemBuf *mb)
 {
     mb->appendf("\r\n--" SQUIDSTRINGPH "--\r\n", SQUIDSTRINGPRINT(boundary));
-    debugs(33, 6, "clientPackTermBound: buf offset: " << mb->size);
+    debugs(33, 6, "buf offset: " << mb->size);
 }
 
-/** appends a "part" HTTP header (as in a multi-part/range reply) to the buffer */
-static void
+void
 clientPackRangeHdr(const HttpReply * rep, const HttpHdrRangeSpec * spec, String boundary, MemBuf * mb)
 {
     HttpHeader hdr(hoReply);
@@ -894,7 +706,7 @@ clientPackRangeHdr(const HttpReply * rep, const HttpHdrRangeSpec * spec, String
     assert(spec);
 
     /* put boundary */
-    debugs(33, 5, "clientPackRangeHdr: appending boundary: " << boundary);
+    debugs(33, 5, "appending boundary: " << boundary);
     /* rfc2046 requires to _prepend_ boundary with <crlf>! */
     mb->appendf("\r\n--" SQUIDSTRINGPH "\r\n", SQUIDSTRINGPRINT(boundary));
 
@@ -912,90 +724,6 @@ clientPackRangeHdr(const HttpReply * rep, const HttpHdrRangeSpec * spec, String
     mb->append("\r\n", 2);
 }
 
-/**
- * extracts a "range" from *buf and appends them to mb, updating
- * all offsets and such.
- */
-void
-Http::StreamContext::packRange(StoreIOBuffer const &source, MemBuf * mb)
-{
-    HttpHdrRangeIter * i = &http->range_iter;
-    Range<int64_t> available (source.range());
-    char const *buf = source.data;
-
-    while (i->currentSpec() && available.size()) {
-        const size_t copy_sz = lengthToSend(available);
-
-        if (copy_sz) {
-            /*
-             * intersection of "have" and "need" ranges must not be empty
-             */
-            assert(http->out.offset < i->currentSpec()->offset + i->currentSpec()->length);
-            assert(http->out.offset + (int64_t)available.size() > i->currentSpec()->offset);
-
-            /*
-             * put boundary and headers at the beginning of a range in a
-             * multi-range
-             */
-
-            if (http->multipartRangeRequest() && i->debt() == i->currentSpec()->length) {
-                assert(http->memObject());
-                clientPackRangeHdr(
-                    http->memObject()->getReply(),  /* original reply */
-                    i->currentSpec(),       /* current range */
-                    i->boundary,    /* boundary, the same for all */
-                    mb);
-            }
-
-            /*
-             * append content
-             */
-            debugs(33, 3, "clientPackRange: appending " << copy_sz << " bytes");
-
-            noteSentBodyBytes (copy_sz);
-
-            mb->append(buf, copy_sz);
-
-            /*
-             * update offsets
-             */
-            available.start += copy_sz;
-
-            buf += copy_sz;
-
-        }
-
-        if (!canPackMoreRanges()) {
-            debugs(33, 3, "clientPackRange: Returning because !canPackMoreRanges.");
-
-            if (i->debt() == 0)
-                /* put terminating boundary for multiparts */
-                clientPackTermBound(i->boundary, mb);
-
-            return;
-        }
-
-        int64_t nextOffset = getNextRangeOffset();
-
-        assert (nextOffset >= http->out.offset);
-
-        int64_t skip = nextOffset - http->out.offset;
-
-        /* adjust for not to be transmitted bytes */
-        http->out.offset = nextOffset;
-
-        if (available.size() <= (uint64_t)skip)
-            return;
-
-        available.start += skip;
-
-        buf += skip;
-
-        if (copy_sz == 0)
-            return;
-    }
-}
-
 /** returns expected content length for multi-range replies
  * note: assumes that httpHdrRangeCanonize has already been called
  * warning: assumes that HTTP headers for individual ranges at the
@@ -1038,44 +766,6 @@ ClientHttpRequest::mRangeCLen()
     return clen;
 }
 
-/**
- * returns true if If-Range specs match reply, false otherwise
- */
-static int
-clientIfRangeMatch(ClientHttpRequest * http, HttpReply * rep)
-{
-    const TimeOrTag spec = http->request->header.getTimeOrTag(Http::HdrType::IF_RANGE);
-    /* check for parsing falure */
-
-    if (!spec.valid)
-        return 0;
-
-    /* got an ETag? */
-    if (spec.tag.str) {
-        ETag rep_tag = rep->header.getETag(Http::HdrType::ETAG);
-        debugs(33, 3, "clientIfRangeMatch: ETags: " << spec.tag.str << " and " <<
-               (rep_tag.str ? rep_tag.str : "<none>"));
-
-        if (!rep_tag.str)
-            return 0;       /* entity has no etag to compare with! */
-
-        if (spec.tag.weak || rep_tag.weak) {
-            debugs(33, DBG_IMPORTANT, "clientIfRangeMatch: Weak ETags are not allowed in If-Range: " << spec.tag.str << " ? " << rep_tag.str);
-            return 0;       /* must use strong validator for sub-range requests */
-        }
-
-        return etagIsStrongEqual(rep_tag, spec.tag);
-    }
-
-    /* got modification time? */
-    if (spec.time >= 0) {
-        return http->storeEntry()->lastmod <= spec.time;
-    }
-
-    assert(0);          /* should not happen */
-    return 0;
-}
-
 /**
  * generates a "unique" boundary string for multipart responses
  * the caller is responsible for cleaning the string */
@@ -1090,165 +780,6 @@ ClientHttpRequest::rangeBoundaryStr() const
     return b;
 }
 
-/** adds appropriate Range headers if needed */
-void
-Http::StreamContext::buildRangeHeader(HttpReply * rep)
-{
-    HttpHeader *hdr = rep ? &rep->header : 0;
-    const char *range_err = NULL;
-    HttpRequest *request = http->request;
-    assert(request->range);
-    /* check if we still want to do ranges */
-
-    int64_t roffLimit = request->getRangeOffsetLimit();
-
-    if (!rep)
-        range_err = "no [parse-able] reply";
-    else if ((rep->sline.status() != Http::scOkay) && (rep->sline.status() != Http::scPartialContent))
-        range_err = "wrong status code";
-    else if (hdr->has(Http::HdrType::CONTENT_RANGE))
-        range_err = "origin server does ranges";
-    else if (rep->content_length < 0)
-        range_err = "unknown length";
-    else if (rep->content_length != http->memObject()->getReply()->content_length)
-        range_err = "INCONSISTENT length";  /* a bug? */
-
-    /* hits only - upstream CachePeer determines correct behaviour on misses, and client_side_reply determines
-     * hits candidates
-     */
-    else if (http->logType.isTcpHit() && http->request->header.has(Http::HdrType::IF_RANGE) && !clientIfRangeMatch(http, rep))
-        range_err = "If-Range match failed";
-    else if (!http->request->range->canonize(rep))
-        range_err = "canonization failed";
-    else if (http->request->range->isComplex())
-        range_err = "too complex range header";
-    else if (!http->logType.isTcpHit() && http->request->range->offsetLimitExceeded(roffLimit))
-        range_err = "range outside range_offset_limit";
-
-    /* get rid of our range specs on error */
-    if (range_err) {
-        /* XXX We do this here because we need canonisation etc. However, this current
-         * code will lead to incorrect store offset requests - the store will have the
-         * offset data, but we won't be requesting it.
-         * So, we can either re-request, or generate an error
-         */
-        http->request->ignoreRange(range_err);
-    } else {
-        /* XXX: TODO: Review, this unconditional set may be wrong. */
-        rep->sline.set(rep->sline.version, Http::scPartialContent);
-        // web server responded with a valid, but unexpected range.
-        // will (try-to) forward as-is.
-        //TODO: we should cope with multirange request/responses
-        bool replyMatchRequest = rep->content_range != NULL ?
-                                 request->range->contains(rep->content_range->spec) :
-                                 true;
-        const int spec_count = http->request->range->specs.size();
-        int64_t actual_clen = -1;
-
-        debugs(33, 3, "clientBuildRangeHeader: range spec count: " <<
-               spec_count << " virgin clen: " << rep->content_length);
-        assert(spec_count > 0);
-        /* append appropriate header(s) */
-
-        if (spec_count == 1) {
-            if (!replyMatchRequest) {
-                hdr->delById(Http::HdrType::CONTENT_RANGE);
-                hdr->putContRange(rep->content_range);
-                actual_clen = rep->content_length;
-                //http->range_iter.pos = rep->content_range->spec.begin();
-                (*http->range_iter.pos)->offset = rep->content_range->spec.offset;
-                (*http->range_iter.pos)->length = rep->content_range->spec.length;
-
-            } else {
-                HttpHdrRange::iterator pos = http->request->range->begin();
-                assert(*pos);
-                /* append Content-Range */
-
-                if (!hdr->has(Http::HdrType::CONTENT_RANGE)) {
-                    /* No content range, so this was a full object we are
-                     * sending parts of.
-                     */
-                    httpHeaderAddContRange(hdr, **pos, rep->content_length);
-                }
-
-                /* set new Content-Length to the actual number of bytes
-                 * transmitted in the message-body */
-                actual_clen = (*pos)->length;
-            }
-        } else {
-            /* multipart! */
-            /* generate boundary string */
-            http->range_iter.boundary = http->rangeBoundaryStr();
-            /* delete old Content-Type, add ours */
-            hdr->delById(Http::HdrType::CONTENT_TYPE);
-            httpHeaderPutStrf(hdr, Http::HdrType::CONTENT_TYPE,
-                              "multipart/byteranges; boundary=\"" SQUIDSTRINGPH "\"",
-                              SQUIDSTRINGPRINT(http->range_iter.boundary));
-            /* Content-Length is not required in multipart responses
-             * but it is always nice to have one */
-            actual_clen = http->mRangeCLen();
-            /* http->out needs to start where we want data at */
-            http->out.offset = http->range_iter.currentSpec()->offset;
-        }
-
-        /* replace Content-Length header */
-        assert(actual_clen >= 0);
-
-        hdr->delById(Http::HdrType::CONTENT_LENGTH);
-
-        hdr->putInt64(Http::HdrType::CONTENT_LENGTH, actual_clen);
-
-        debugs(33, 3, "clientBuildRangeHeader: actual content length: " << actual_clen);
-
-        /* And start the range iter off */
-        http->range_iter.updateSpec();
-    }
-}
-
-void
-Http::StreamContext::prepareReply(HttpReply * rep)
-{
-    reply = rep;
-
-    if (http->request->range)
-        buildRangeHeader(rep);
-}
-
-void
-Http::StreamContext::sendStartOfMessage(HttpReply * rep, StoreIOBuffer bodyData)
-{
-    prepareReply(rep);
-    assert (rep);
-    MemBuf *mb = rep->pack();
-
-    // dump now, so we dont output any body.
-    debugs(11, 2, "HTTP Client " << clientConnection);
-    debugs(11, 2, "HTTP Client REPLY:\n---------\n" << mb->buf << "\n----------");
-
-    /* Save length of headers for persistent conn checks */
-    http->out.headers_sz = mb->contentSize();
-#if HEADERS_LOG
-
-    headersLog(0, 0, http->request->method, rep);
-#endif
-
-    if (bodyData.data && bodyData.length) {
-        if (multipartRangeRequest())
-            packRange(bodyData, mb);
-        else if (http->request->flags.chunkedReply) {
-            packChunk(bodyData, *mb);
-        } else {
-            size_t length = lengthToSend(bodyData.range());
-            noteSentBodyBytes (length);
-
-            mb->append(bodyData.data, length);
-        }
-    }
-
-    getConn()->write(mb);
-    delete mb;
-}
-
 /**
  * Write a chunk of data to a client socket. If the reply is present,
  * send the reply headers down the wire too, and clean them up when
@@ -1437,190 +968,6 @@ ConnStateData::kick()
     }
 }
 
-void
-clientUpdateSocketStats(const LogTags &logType, size_t size)
-{
-    if (size == 0)
-        return;
-
-    statCounter.client_http.kbytes_out += size;
-
-    if (logType.isTcpHit())
-        statCounter.client_http.hit_kbytes_out += size;
-}
-
-/**
- * increments iterator "i"
- * used by clientPackMoreRanges
- *
- \retval true    there is still data available to pack more ranges
- \retval false
- */
-bool
-Http::StreamContext::canPackMoreRanges() const
-{
-    /** first update iterator "i" if needed */
-
-    if (!http->range_iter.debt()) {
-        debugs(33, 5, HERE << "At end of current range spec for " << clientConnection);
-
-        if (http->range_iter.pos != http->range_iter.end)
-            ++http->range_iter.pos;
-
-        http->range_iter.updateSpec();
-    }
-
-    assert(!http->range_iter.debt() == !http->range_iter.currentSpec());
-
-    /* paranoid sync condition */
-    /* continue condition: need_more_data */
-    debugs(33, 5, "Http::StreamContext::canPackMoreRanges: returning " << (http->range_iter.currentSpec() ? true : false));
-    return http->range_iter.currentSpec() ? true : false;
-}
-
-int64_t
-Http::StreamContext::getNextRangeOffset() const
-{
-    debugs (33, 5, "range: " << http->request->range <<
-            "; http offset " << http->out.offset <<
-            "; reply " << reply);
-
-    // XXX: This method is called from many places, including pullData() which
-    // may be called before prepareReply() [on some Squid-generated errors].
-    // Hence, we may not even know yet whether we should honor/do ranges.
-
-    if (http->request->range) {
-        /* offset in range specs does not count the prefix of an http msg */
-        /* check: reply was parsed and range iterator was initialized */
-        assert(http->range_iter.valid);
-        /* filter out data according to range specs */
-        assert (canPackMoreRanges());
-        {
-            int64_t start;      /* offset of still missing data */
-            assert(http->range_iter.currentSpec());
-            start = http->range_iter.currentSpec()->offset + http->range_iter.currentSpec()->length - http->range_iter.debt();
-            debugs(33, 3, "clientPackMoreRanges: in:  offset: " << http->out.offset);
-            debugs(33, 3, "clientPackMoreRanges: out:"
-                   " start: " << start <<
-                   " spec[" << http->range_iter.pos - http->request->range->begin() << "]:" <<
-                   " [" << http->range_iter.currentSpec()->offset <<
-                   ", " << http->range_iter.currentSpec()->offset + http->range_iter.currentSpec()->length << "),"
-                   " len: " << http->range_iter.currentSpec()->length <<
-                   " debt: " << http->range_iter.debt());
-            if (http->range_iter.currentSpec()->length != -1)
-                assert(http->out.offset <= start);  /* we did not miss it */
-
-            return start;
-        }
-
-    } else if (reply && reply->content_range) {
-        /* request does not have ranges, but reply does */
-        /** \todo FIXME: should use range_iter_pos on reply, as soon as reply->content_range
-         *        becomes HttpHdrRange rather than HttpHdrRangeSpec.
-         */
-        return http->out.offset + reply->content_range->spec.offset;
-    }
-
-    return http->out.offset;
-}
-
-void
-Http::StreamContext::pullData()
-{
-    debugs(33, 5, reply << " written " << http->out.size << " into " << clientConnection);
-
-    /* More data will be coming from the stream. */
-    StoreIOBuffer readBuffer;
-    /* XXX: Next requested byte in the range sequence */
-    /* XXX: length = getmaximumrangelenfgth */
-    readBuffer.offset = getNextRangeOffset();
-    readBuffer.length = HTTP_REQBUF_SZ;
-    readBuffer.data = reqbuf;
-    /* we may note we have reached the end of the wanted ranges */
-    clientStreamRead(getTail(), http, readBuffer);
-}
-
-/** Adapt stream status to account for Range cases
- *
- */
-clientStream_status_t
-Http::StreamContext::socketState()
-{
-    switch (clientStreamStatus(getTail(), http)) {
-
-    case STREAM_NONE:
-        /* check for range support ending */
-
-        if (http->request->range) {
-            /* check: reply was parsed and range iterator was initialized */
-            assert(http->range_iter.valid);
-            /* filter out data according to range specs */
-
-            if (!canPackMoreRanges()) {
-                debugs(33, 5, HERE << "Range request at end of returnable " <<
-                       "range sequence on " << clientConnection);
-                // we got everything we wanted from the store
-                return STREAM_COMPLETE;
-            }
-        } else if (reply && reply->content_range) {
-            /* reply has content-range, but Squid is not managing ranges */
-            const int64_t &bytesSent = http->out.offset;
-            const int64_t &bytesExpected = reply->content_range->spec.length;
-
-            debugs(33, 7, HERE << "body bytes sent vs. expected: " <<
-                   bytesSent << " ? " << bytesExpected << " (+" <<
-                   reply->content_range->spec.offset << ")");
-
-            // did we get at least what we expected, based on range specs?
-
-            if (bytesSent == bytesExpected) // got everything
-                return STREAM_COMPLETE;
-
-            if (bytesSent > bytesExpected) // Error: Sent more than expected
-                return STREAM_UNPLANNED_COMPLETE;
-        }
-
-        return STREAM_NONE;
-
-    case STREAM_COMPLETE:
-        return STREAM_COMPLETE;
-
-    case STREAM_UNPLANNED_COMPLETE:
-        return STREAM_UNPLANNED_COMPLETE;
-
-    case STREAM_FAILED:
-        return STREAM_FAILED;
-    }
-
-    fatal ("unreachable code\n");
-    return STREAM_NONE;
-}
-
-/// remembers the abnormal connection termination for logging purposes
-void
-Http::StreamContext::noteIoError(const int xerrno)
-{
-    if (http) {
-        http->logType.err.timedout = (xerrno == ETIMEDOUT);
-        // aborted even if xerrno is zero (which means read abort/eof)
-        http->logType.err.aborted = (xerrno != ETIMEDOUT);
-    }
-}
-
-void
-Http::StreamContext::doClose()
-{
-    clientConnection->close();
-}
-
-/// called when we encounter a response-related error
-void
-Http::StreamContext::initiateClose(const char *reason)
-{
-    debugs(33, 4, clientConnection << " because " << reason);
-    http->getConn()->stopSending(reason); // closes ASAP
-}
-
 void
 ConnStateData::stopSending(const char *error)
 {
@@ -1651,54 +998,13 @@ ConnStateData::afterClientWrite(size_t size)
     if (pipeline.empty())
         return;
 
-    pipeline.front()->writeComplete(size);
-}
-
-// TODO: make this only need size parameter, ConnStateData handles the rest
-void
-Http::StreamContext::writeComplete(size_t size)
-{
-    const StoreEntry *entry = http->storeEntry();
-    debugs(33, 5, clientConnection << ", sz " << size <<
-           ", off " << (http->out.size + size) << ", len " <<
-           (entry ? entry->objectLen() : 0));
-
-    http->out.size += size;
-    clientUpdateSocketStats(http->logType, size);
-
-    if (clientHttpRequestStatus(clientConnection->fd, http)) {
-        initiateClose("failure or true request status");
-        /* Do we leak here ? */
-        return;
-    }
-
-    switch (socketState()) {
-
-    case STREAM_NONE:
-        pullData();
-        break;
-
-    case STREAM_COMPLETE: {
-        debugs(33, 5, clientConnection << " Stream complete, keepalive is " << http->request->flags.proxyKeepalive);
-        ConnStateData *c = http->getConn();
-        if (!http->request->flags.proxyKeepalive)
-            clientConnection->close();
-        finished();
-        c->kick();
-    }
-    return;
-
-    case STREAM_UNPLANNED_COMPLETE:
-        initiateClose("STREAM_UNPLANNED_COMPLETE");
-        return;
-
-    case STREAM_FAILED:
-        initiateClose("STREAM_FAILED");
-        return;
-
-    default:
-        fatal("Hit unreachable code in Http::StreamContext::writeComplete\n");
+    auto ctx = pipeline.front();
+    if (size) {
+        statCounter.client_http.kbytes_out += size;
+        if (ctx->http->logType.isTcpHit())
+            statCounter.client_http.hit_kbytes_out += size;
     }
+    ctx->writeComplete(size);
 }
 
 Http::StreamContext *
index a837f6a6de0aee810b72f7a49ccecf09f9c3e84d..d176e2bc2de81eaaf4bef8391dced17e30a8dcf5 100644 (file)
 #include "ssl/support.h"
 #endif
 
-class ConnStateData;
 class ClientHttpRequest;
-class clientStreamNode;
+class HttpHdrRangeSpec;
 
-class ConnectionDetail;
 #if USE_OPENSSL
 namespace Ssl
 {
@@ -387,6 +385,12 @@ void httpRequestFree(void *);
 /// decide whether to expect multiple requests on the corresponding connection
 void clientSetKeepaliveFlag(ClientHttpRequest *http);
 
+/// append a "part" HTTP header (as in a multi-part/range reply) to the buffer
+void clientPackRangeHdr(const HttpReply *, const HttpHdrRangeSpec *, String boundary, MemBuf *);
+
+/// put terminating boundary for multiparts to the buffer
+void clientPackTermBound(String boundary, MemBuf *);
+
 /* misplaced declaratrions of Stream callbacks provided/used by client side */
 SQUIDCEXTERN CSR clientGetMoreData;
 SQUIDCEXTERN CSS clientReplyStatus;
index 36bff2d97e8c0d854765ff396cd469d3a615522b..6999eb4ea0fd88f8a33aa5f71a6bd9a2aeb23860 100644 (file)
@@ -27,6 +27,7 @@ libsquid_http_la_SOURCES = \
        StatusCode.h \
        StatusLine.cc \
        StatusLine.h \
+       StreamContext.cc \
        StreamContext.h
 
 libsquid_http_la_LIBADD= one/libhttp1.la
diff --git a/src/http/StreamContext.cc b/src/http/StreamContext.cc
new file mode 100644 (file)
index 0000000..adf21a8
--- /dev/null
@@ -0,0 +1,666 @@
+/*
+ * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+#include "client_side_request.h"
+#include "http/StreamContext.h"
+#include "HttpHdrContRange.h"
+#include "HttpHeaderTools.h"
+#include "TimeOrTag.h"
+
+Http::StreamContext::StreamContext(uint32_t anId, const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) :
+    id(anId),
+    clientConnection(aConn),
+    http(aReq),
+    reply(nullptr),
+    writtenToSocket(0),
+    mayUseConnection_(false),
+    connRegistered_(false)
+{
+    assert(http != nullptr);
+    memset(reqbuf, '\0', sizeof (reqbuf));
+    flags.deferred = 0;
+    flags.parsed_ok = 0;
+    deferredparams.node = nullptr;
+    deferredparams.rep = nullptr;
+}
+
+Http::StreamContext::~StreamContext()
+{
+    if (auto node = getTail()) {
+        if (auto ctx = dynamic_cast<Http::StreamContext *>(node->data.getRaw())) {
+            /* We are *always* the tail - prevent recursive free */
+            assert(this == ctx);
+            node->data = nullptr;
+        }
+    }
+    httpRequestFree(http);
+}
+
+void
+Http::StreamContext::registerWithConn()
+{
+    assert(!connRegistered_);
+    assert(getConn());
+    connRegistered_ = true;
+    getConn()->pipeline.add(Http::StreamContextPointer(this));
+}
+
+bool
+Http::StreamContext::startOfOutput() const
+{
+    return http->out.size == 0;
+}
+
+void
+Http::StreamContext::writeComplete(size_t size)
+{
+    const StoreEntry *entry = http->storeEntry();
+    debugs(33, 5, clientConnection << ", sz " << size <<
+           ", off " << (http->out.size + size) << ", len " <<
+           (entry ? entry->objectLen() : 0));
+
+    http->out.size += size;
+
+    if (clientHttpRequestStatus(clientConnection->fd, http)) {
+        initiateClose("failure or true request status");
+        /* Do we leak here ? */
+        return;
+    }
+
+    switch (socketState()) {
+
+    case STREAM_NONE:
+        pullData();
+        break;
+
+    case STREAM_COMPLETE: {
+        debugs(33, 5, clientConnection << " Stream complete, keepalive is " <<
+                      http->request->flags.proxyKeepalive);
+        ConnStateData *c = getConn();
+        if (!http->request->flags.proxyKeepalive)
+            clientConnection->close();
+        finished();
+        c->kick();
+    }
+    return;
+
+    case STREAM_UNPLANNED_COMPLETE:
+        initiateClose("STREAM_UNPLANNED_COMPLETE");
+        return;
+
+    case STREAM_FAILED:
+        initiateClose("STREAM_FAILED");
+        return;
+
+    default:
+        fatal("Hit unreachable code in Http::StreamContext::writeComplete\n");
+    }
+}
+
+void
+Http::StreamContext::pullData()
+{
+    debugs(33, 5, reply << " written " << http->out.size << " into " << clientConnection);
+
+    /* More data will be coming from the stream. */
+    StoreIOBuffer readBuffer;
+    /* XXX: Next requested byte in the range sequence */
+    /* XXX: length = getmaximumrangelenfgth */
+    readBuffer.offset = getNextRangeOffset();
+    readBuffer.length = HTTP_REQBUF_SZ;
+    readBuffer.data = reqbuf;
+    /* we may note we have reached the end of the wanted ranges */
+    clientStreamRead(getTail(), http, readBuffer);
+}
+
+bool
+Http::StreamContext::multipartRangeRequest() const
+{
+    return http->multipartRangeRequest();
+}
+
+int64_t
+Http::StreamContext::getNextRangeOffset() const
+{
+    debugs (33, 5, "range: " << http->request->range <<
+            "; http offset " << http->out.offset <<
+            "; reply " << reply);
+
+    // XXX: This method is called from many places, including pullData() which
+    // may be called before prepareReply() [on some Squid-generated errors].
+    // Hence, we may not even know yet whether we should honor/do ranges.
+
+    if (http->request->range) {
+        /* offset in range specs does not count the prefix of an http msg */
+        /* check: reply was parsed and range iterator was initialized */
+        assert(http->range_iter.valid);
+        /* filter out data according to range specs */
+        assert(canPackMoreRanges());
+        {
+            assert(http->range_iter.currentSpec());
+            /* offset of still missing data */
+            int64_t start = http->range_iter.currentSpec()->offset +
+                            http->range_iter.currentSpec()->length -
+                            http->range_iter.debt();
+            debugs(33, 3, "clientPackMoreRanges: in:  offset: " << http->out.offset);
+            debugs(33, 3, "clientPackMoreRanges: out:"
+                   " start: " << start <<
+                   " spec[" << http->range_iter.pos - http->request->range->begin() << "]:" <<
+                   " [" << http->range_iter.currentSpec()->offset <<
+                   ", " << http->range_iter.currentSpec()->offset +
+                   http->range_iter.currentSpec()->length << "),"
+                   " len: " << http->range_iter.currentSpec()->length <<
+                   " debt: " << http->range_iter.debt());
+            if (http->range_iter.currentSpec()->length != -1)
+                assert(http->out.offset <= start);  /* we did not miss it */
+
+            return start;
+        }
+
+    } else if (reply && reply->content_range) {
+        /* request does not have ranges, but reply does */
+        /** \todo FIXME: should use range_iter_pos on reply, as soon as reply->content_range
+         *        becomes HttpHdrRange rather than HttpHdrRangeSpec.
+         */
+        return http->out.offset + reply->content_range->spec.offset;
+    }
+
+    return http->out.offset;
+}
+
+/**
+ * increments iterator "i"
+ * used by clientPackMoreRanges
+ *
+ * \retval true    there is still data available to pack more ranges
+ * \retval false
+ */
+bool
+Http::StreamContext::canPackMoreRanges() const
+{
+    /** first update iterator "i" if needed */
+    if (!http->range_iter.debt()) {
+        debugs(33, 5, "At end of current range spec for " << clientConnection);
+
+        if (http->range_iter.pos != http->range_iter.end)
+            ++http->range_iter.pos;
+
+        http->range_iter.updateSpec();
+    }
+
+    assert(!http->range_iter.debt() == !http->range_iter.currentSpec());
+
+    /* paranoid sync condition */
+    /* continue condition: need_more_data */
+    debugs(33, 5, "returning " << (http->range_iter.currentSpec() ? true : false));
+    return http->range_iter.currentSpec() ? true : false;
+}
+
+/// Adapt stream status to account for Range cases
+clientStream_status_t
+Http::StreamContext::socketState()
+{
+    switch (clientStreamStatus(getTail(), http)) {
+
+    case STREAM_NONE:
+        /* check for range support ending */
+        if (http->request->range) {
+            /* check: reply was parsed and range iterator was initialized */
+            assert(http->range_iter.valid);
+            /* filter out data according to range specs */
+
+            if (!canPackMoreRanges()) {
+                debugs(33, 5, "Range request at end of returnable " <<
+                       "range sequence on " << clientConnection);
+                // we got everything we wanted from the store
+                return STREAM_COMPLETE;
+            }
+        } else if (reply && reply->content_range) {
+            /* reply has content-range, but Squid is not managing ranges */
+            const int64_t &bytesSent = http->out.offset;
+            const int64_t &bytesExpected = reply->content_range->spec.length;
+
+            debugs(33, 7, "body bytes sent vs. expected: " <<
+                   bytesSent << " ? " << bytesExpected << " (+" <<
+                   reply->content_range->spec.offset << ")");
+
+            // did we get at least what we expected, based on range specs?
+
+            if (bytesSent == bytesExpected) // got everything
+                return STREAM_COMPLETE;
+
+            if (bytesSent > bytesExpected) // Error: Sent more than expected
+                return STREAM_UNPLANNED_COMPLETE;
+        }
+
+        return STREAM_NONE;
+
+    case STREAM_COMPLETE:
+        return STREAM_COMPLETE;
+
+    case STREAM_UNPLANNED_COMPLETE:
+        return STREAM_UNPLANNED_COMPLETE;
+
+    case STREAM_FAILED:
+        return STREAM_FAILED;
+    }
+
+    fatal ("unreachable code\n");
+    return STREAM_NONE;
+}
+
+void
+Http::StreamContext::sendStartOfMessage(HttpReply *rep, StoreIOBuffer bodyData)
+{
+    prepareReply(rep);
+    assert(rep);
+    MemBuf *mb = rep->pack();
+
+    // dump now, so we dont output any body.
+    debugs(11, 2, "HTTP Client " << clientConnection);
+    debugs(11, 2, "HTTP Client REPLY:\n---------\n" << mb->buf << "\n----------");
+
+    /* Save length of headers for persistent conn checks */
+    http->out.headers_sz = mb->contentSize();
+#if HEADERS_LOG
+    headersLog(0, 0, http->request->method, rep);
+#endif
+
+    if (bodyData.data && bodyData.length) {
+        if (multipartRangeRequest())
+            packRange(bodyData, mb);
+        else if (http->request->flags.chunkedReply) {
+            packChunk(bodyData, *mb);
+        } else {
+            size_t length = lengthToSend(bodyData.range());
+            noteSentBodyBytes(length);
+            mb->append(bodyData.data, length);
+        }
+    }
+
+    getConn()->write(mb);
+    delete mb;
+}
+
+void
+Http::StreamContext::sendBody(StoreIOBuffer bodyData)
+{
+    if (!multipartRangeRequest() && !http->request->flags.chunkedReply) {
+        size_t length = lengthToSend(bodyData.range());
+        noteSentBodyBytes(length);
+        getConn()->write(bodyData.data, length);
+        return;
+    }
+
+    MemBuf mb;
+    mb.init();
+    if (multipartRangeRequest())
+        packRange(bodyData, &mb);
+    else
+        packChunk(bodyData, mb);
+
+    if (mb.contentSize())
+        getConn()->write(&mb);
+    else
+        writeComplete(0);
+}
+
+size_t
+Http::StreamContext::lengthToSend(Range<int64_t> const &available) const
+{
+    // the size of available range can always fit into a size_t type
+    size_t maximum = available.size();
+
+    if (!http->request->range)
+        return maximum;
+
+    assert(canPackMoreRanges());
+
+    if (http->range_iter.debt() == -1)
+        return maximum;
+
+    assert(http->range_iter.debt() > 0);
+
+    /* TODO this + the last line could be a range intersection calculation */
+    if (available.start < http->range_iter.currentSpec()->offset)
+        return 0;
+
+    return min(http->range_iter.debt(), static_cast<int64_t>(maximum));
+}
+
+void
+Http::StreamContext::noteSentBodyBytes(size_t bytes)
+{
+    debugs(33, 7, bytes << " body bytes");
+    http->out.offset += bytes;
+
+    if (!http->request->range)
+        return;
+
+    if (http->range_iter.debt() != -1) {
+        http->range_iter.debt(http->range_iter.debt() - bytes);
+        assert (http->range_iter.debt() >= 0);
+    }
+
+    /* debt() always stops at -1, below that is a bug */
+    assert(http->range_iter.debt() >= -1);
+}
+
+/// \return true when If-Range specs match reply, false otherwise
+static bool
+clientIfRangeMatch(ClientHttpRequest * http, HttpReply * rep)
+{
+    const TimeOrTag spec = http->request->header.getTimeOrTag(Http::HdrType::IF_RANGE);
+
+    /* check for parsing falure */
+    if (!spec.valid)
+        return false;
+
+    /* got an ETag? */
+    if (spec.tag.str) {
+        ETag rep_tag = rep->header.getETag(Http::HdrType::ETAG);
+        debugs(33, 3, "ETags: " << spec.tag.str << " and " <<
+               (rep_tag.str ? rep_tag.str : "<none>"));
+
+        if (!rep_tag.str)
+            return false; // entity has no etag to compare with!
+
+        if (spec.tag.weak || rep_tag.weak) {
+            debugs(33, DBG_IMPORTANT, "Weak ETags are not allowed in If-Range: " <<
+                   spec.tag.str << " ? " << rep_tag.str);
+            return false; // must use strong validator for sub-range requests
+        }
+
+        return etagIsStrongEqual(rep_tag, spec.tag);
+    }
+
+    /* got modification time? */
+    if (spec.time >= 0)
+        return http->storeEntry()->lastmod <= spec.time;
+
+    assert(0);          /* should not happen */
+    return false;
+}
+
+// seems to be something better suited to Server logic
+/** adds appropriate Range headers if needed */
+void
+Http::StreamContext::buildRangeHeader(HttpReply *rep)
+{
+    HttpHeader *hdr = rep ? &rep->header : nullptr;
+    const char *range_err = nullptr;
+    HttpRequest *request = http->request;
+    assert(request->range);
+    /* check if we still want to do ranges */
+    int64_t roffLimit = request->getRangeOffsetLimit();
+
+    if (!rep)
+        range_err = "no [parse-able] reply";
+    else if ((rep->sline.status() != Http::scOkay) && (rep->sline.status() != Http::scPartialContent))
+        range_err = "wrong status code";
+    else if (hdr->has(Http::HdrType::CONTENT_RANGE))
+        range_err = "origin server does ranges";
+    else if (rep->content_length < 0)
+        range_err = "unknown length";
+    else if (rep->content_length != http->memObject()->getReply()->content_length)
+        range_err = "INCONSISTENT length";  /* a bug? */
+
+    /* hits only - upstream CachePeer determines correct behaviour on misses,
+     * and client_side_reply determines hits candidates
+     */
+    else if (http->logType.isTcpHit() &&
+             http->request->header.has(Http::HdrType::IF_RANGE) &&
+             !clientIfRangeMatch(http, rep))
+        range_err = "If-Range match failed";
+
+    else if (!http->request->range->canonize(rep))
+        range_err = "canonization failed";
+    else if (http->request->range->isComplex())
+        range_err = "too complex range header";
+    else if (!http->logType.isTcpHit() && http->request->range->offsetLimitExceeded(roffLimit))
+        range_err = "range outside range_offset_limit";
+
+    /* get rid of our range specs on error */
+    if (range_err) {
+        /* XXX We do this here because we need canonisation etc. However, this current
+         * code will lead to incorrect store offset requests - the store will have the
+         * offset data, but we won't be requesting it.
+         * So, we can either re-request, or generate an error
+         */
+        http->request->ignoreRange(range_err);
+    } else {
+        /* XXX: TODO: Review, this unconditional set may be wrong. */
+        rep->sline.set(rep->sline.version, Http::scPartialContent);
+        // web server responded with a valid, but unexpected range.
+        // will (try-to) forward as-is.
+        //TODO: we should cope with multirange request/responses
+        bool replyMatchRequest = rep->content_range != nullptr ?
+                                 request->range->contains(rep->content_range->spec) :
+                                 true;
+        const int spec_count = http->request->range->specs.size();
+        int64_t actual_clen = -1;
+
+        debugs(33, 3, "range spec count: " << spec_count <<
+               " virgin clen: " << rep->content_length);
+        assert(spec_count > 0);
+        /* append appropriate header(s) */
+        if (spec_count == 1) {
+            if (!replyMatchRequest) {
+                hdr->delById(Http::HdrType::CONTENT_RANGE);
+                hdr->putContRange(rep->content_range);
+                actual_clen = rep->content_length;
+                //http->range_iter.pos = rep->content_range->spec.begin();
+                (*http->range_iter.pos)->offset = rep->content_range->spec.offset;
+                (*http->range_iter.pos)->length = rep->content_range->spec.length;
+
+            } else {
+                HttpHdrRange::iterator pos = http->request->range->begin();
+                assert(*pos);
+                /* append Content-Range */
+
+                if (!hdr->has(Http::HdrType::CONTENT_RANGE)) {
+                    /* No content range, so this was a full object we are
+                     * sending parts of.
+                     */
+                    httpHeaderAddContRange(hdr, **pos, rep->content_length);
+                }
+
+                /* set new Content-Length to the actual number of bytes
+                 * transmitted in the message-body */
+                actual_clen = (*pos)->length;
+            }
+        } else {
+            /* multipart! */
+            /* generate boundary string */
+            http->range_iter.boundary = http->rangeBoundaryStr();
+            /* delete old Content-Type, add ours */
+            hdr->delById(Http::HdrType::CONTENT_TYPE);
+            httpHeaderPutStrf(hdr, Http::HdrType::CONTENT_TYPE,
+                              "multipart/byteranges; boundary=\"" SQUIDSTRINGPH "\"",
+                              SQUIDSTRINGPRINT(http->range_iter.boundary));
+            /* Content-Length is not required in multipart responses
+             * but it is always nice to have one */
+            actual_clen = http->mRangeCLen();
+
+            /* http->out needs to start where we want data at */
+            http->out.offset = http->range_iter.currentSpec()->offset;
+        }
+
+        /* replace Content-Length header */
+        assert(actual_clen >= 0);
+        hdr->delById(Http::HdrType::CONTENT_LENGTH);
+        hdr->putInt64(Http::HdrType::CONTENT_LENGTH, actual_clen);
+        debugs(33, 3, "actual content length: " << actual_clen);
+
+        /* And start the range iter off */
+        http->range_iter.updateSpec();
+    }
+}
+
+clientStreamNode *
+Http::StreamContext::getTail() const
+{
+    if (http->client_stream.tail)
+        return static_cast<clientStreamNode *>(http->client_stream.tail->data);
+
+    return nullptr;
+}
+
+clientStreamNode *
+Http::StreamContext::getClientReplyContext() const
+{
+    return static_cast<clientStreamNode *>(http->client_stream.tail->prev->data);
+}
+
+ConnStateData *
+Http::StreamContext::getConn() const
+{
+    assert(http && http->getConn());
+    return http->getConn();
+}
+
+/// remembers the abnormal connection termination for logging purposes
+void
+Http::StreamContext::noteIoError(const int xerrno)
+{
+    if (http) {
+        http->logType.err.timedout = (xerrno == ETIMEDOUT);
+        // aborted even if xerrno is zero (which means read abort/eof)
+        http->logType.err.aborted = (xerrno != ETIMEDOUT);
+    }
+}
+
+void
+Http::StreamContext::finished()
+{
+    ConnStateData *conn = getConn();
+
+    /* we can't handle any more stream data - detach */
+    clientStreamDetach(getTail(), http);
+
+    assert(connRegistered_);
+    connRegistered_ = false;
+    conn->pipeline.popById(id);
+}
+
+/// called when we encounter a response-related error
+void
+Http::StreamContext::initiateClose(const char *reason)
+{
+    debugs(33, 4, clientConnection << " because " << reason);
+    getConn()->stopSending(reason); // closes ASAP
+}
+
+void
+Http::StreamContext::deferRecipientForLater(clientStreamNode *node, HttpReply *rep, StoreIOBuffer receivedData)
+{
+    debugs(33, 2, "Deferring request " << http->uri);
+    assert(flags.deferred == 0);
+    flags.deferred = 1;
+    deferredparams.node = node;
+    deferredparams.rep = rep;
+    deferredparams.queuedBuffer = receivedData;
+}
+
+void
+Http::StreamContext::prepareReply(HttpReply *rep)
+{
+    reply = rep;
+    if (http->request->range)
+        buildRangeHeader(rep);
+}
+
+/**
+ * Packs bodyData into mb using chunked encoding.
+ * Packs the last-chunk if bodyData is empty.
+ */
+void
+Http::StreamContext::packChunk(const StoreIOBuffer &bodyData, MemBuf &mb)
+{
+    const uint64_t length =
+        static_cast<uint64_t>(lengthToSend(bodyData.range()));
+    noteSentBodyBytes(length);
+
+    mb.appendf("%" PRIX64 "\r\n", length);
+    mb.append(bodyData.data, length);
+    mb.append("\r\n", 2);
+}
+
+/**
+ * extracts a "range" from *buf and appends them to mb, updating
+ * all offsets and such.
+ */
+void
+Http::StreamContext::packRange(StoreIOBuffer const &source, MemBuf *mb)
+{
+    HttpHdrRangeIter * i = &http->range_iter;
+    Range<int64_t> available(source.range());
+    char const *buf = source.data;
+
+    while (i->currentSpec() && available.size()) {
+        const size_t copy_sz = lengthToSend(available);
+        if (copy_sz) {
+            // intersection of "have" and "need" ranges must not be empty
+            assert(http->out.offset < i->currentSpec()->offset + i->currentSpec()->length);
+            assert(http->out.offset + (int64_t)available.size() > i->currentSpec()->offset);
+
+            /*
+             * put boundary and headers at the beginning of a range in a
+             * multi-range
+             */
+            if (http->multipartRangeRequest() && i->debt() == i->currentSpec()->length) {
+                assert(http->memObject());
+                clientPackRangeHdr(
+                    http->memObject()->getReply(),  /* original reply */
+                    i->currentSpec(),       /* current range */
+                    i->boundary,    /* boundary, the same for all */
+                    mb);
+            }
+
+            // append content
+            debugs(33, 3, "appending " << copy_sz << " bytes");
+            noteSentBodyBytes(copy_sz);
+            mb->append(buf, copy_sz);
+
+            // update offsets
+            available.start += copy_sz;
+            buf += copy_sz;
+        }
+
+        if (!canPackMoreRanges()) {
+            debugs(33, 3, "Returning because !canPackMoreRanges.");
+            if (i->debt() == 0)
+                // put terminating boundary for multiparts
+                clientPackTermBound(i->boundary, mb);
+            return;
+        }
+
+        int64_t nextOffset = getNextRangeOffset();
+        assert(nextOffset >= http->out.offset);
+        int64_t skip = nextOffset - http->out.offset;
+        /* adjust for not to be transmitted bytes */
+        http->out.offset = nextOffset;
+
+        if (available.size() <= (uint64_t)skip)
+            return;
+
+        available.start += skip;
+        buf += skip;
+
+        if (copy_sz == 0)
+            return;
+    }
+}
+
+void
+Http::StreamContext::doClose()
+{
+    clientConnection->close();
+}
+
index 2d6ed6497bd0afb1c152432897e785d77161571c..714402119cc6eaa79689bfca5fdc9cab289d300d 100644 (file)
@@ -71,9 +71,54 @@ public:
     StreamContext(uint32_t id, const Comm::ConnectionPointer &, ClientHttpRequest *);
     ~StreamContext();
 
+    /// register this stream with the Server
+    void registerWithConn();
+
+    /// whether the reply has started being sent
     bool startOfOutput() const;
+
+    /// update stream state after a write, may initiate more I/O
     void writeComplete(size_t size);
 
+    /// get more data to send
+    void pullData();
+
+    /// \return true if the HTTP request is for multiple ranges
+    bool multipartRangeRequest() const;
+
+    int64_t getNextRangeOffset() const;
+    bool canPackMoreRanges() const;
+    size_t lengthToSend(Range<int64_t> const &available) const;
+
+    clientStream_status_t socketState();
+
+    /// send an HTTP reply message headers and maybe some initial payload
+    void sendStartOfMessage(HttpReply *, StoreIOBuffer bodyData);
+    /// send some HTTP reply message payload
+    void sendBody(StoreIOBuffer bodyData);
+    /// update stream state when N bytes are being sent.
+    /// NP: Http1Server bytes actually not sent yet, just packed into a MemBuf ready
+    void noteSentBodyBytes(size_t);
+
+    /// add Range headers (if any) to the given HTTP reply message
+    void buildRangeHeader(HttpReply *);
+
+    clientStreamNode * getTail() const;
+    clientStreamNode * getClientReplyContext() const;
+
+    ConnStateData *getConn() const;
+
+    /// update state to reflect I/O error
+    void noteIoError(const int xerrno);
+
+    /// cleanup when the transaction has finished. may destroy 'this'
+    void finished();
+
+    /// terminate due to a send/write error (may continue reading)
+    void initiateClose(const char *reason);
+
+    void deferRecipientForLater(clientStreamNode *, HttpReply *, StoreIOBuffer receivedData);
+
 public:
     // NP: stream ID is relative to the connection, not global.
     uint32_t id; ///< stream ID within the client connection.
@@ -85,16 +130,15 @@ public: // HTTP/1.x state data
     HttpReply *reply;
     char reqbuf[HTTP_REQBUF_SZ];
     struct {
-
-        unsigned deferred:1; /* This is a pipelined request waiting for the current object to complete */
-
-        unsigned parsed_ok:1; /* Was this parsed correctly? */
+        unsigned deferred:1; ///< This is a pipelined request waiting for the current object to complete
+        unsigned parsed_ok:1; ///< Was this parsed correctly?
     } flags;
+
     bool mayUseConnection() const {return mayUseConnection_;}
 
     void mayUseConnection(bool aBool) {
         mayUseConnection_ = aBool;
-        debugs(33,3, HERE << "This " << this << " marked " << aBool);
+        debugs(33, 3, "This " << this << " marked " << aBool);
     }
 
     class DeferredParams
@@ -109,32 +153,12 @@ public: // HTTP/1.x state data
     DeferredParams deferredparams;
     int64_t writtenToSocket;
 
-    void pullData();
-    int64_t getNextRangeOffset() const;
-    bool canPackMoreRanges() const;
-    clientStream_status_t socketState();
-    void sendBody(HttpReply * rep, StoreIOBuffer bodyData);
-    void sendStartOfMessage(HttpReply * rep, StoreIOBuffer bodyData);
-    size_t lengthToSend(Range<int64_t> const &available);
-    void noteSentBodyBytes(size_t);
-    void buildRangeHeader(HttpReply * rep);
-    clientStreamNode * getTail() const;
-    clientStreamNode * getClientReplyContext() const;
-    ConnStateData *getConn() const;
-    void finished(); ///< cleanup when the transaction has finished. may destroy 'this'
-    void deferRecipientForLater(clientStreamNode * node, HttpReply * rep, StoreIOBuffer receivedData);
-    bool multipartRangeRequest() const;
-    void registerWithConn();
-    void noteIoError(const int xerrno); ///< update state to reflect I/O error
-    void initiateClose(const char *reason); ///< terminate due to a send/write error (may continue reading)
-
 private:
-    void prepareReply(HttpReply * rep);
-    void packChunk(const StoreIOBuffer &bodyData, MemBuf &mb);
-    void packRange(StoreIOBuffer const &, MemBuf * mb);
+    void prepareReply(HttpReply *);
+    void packChunk(const StoreIOBuffer &bodyData, MemBuf &);
+    void packRange(StoreIOBuffer const &, MemBuf *);
     void doClose();
 
-private:
     bool mayUseConnection_; /* This request may use the connection. Don't read anymore requests for now */
     bool connRegistered_;
 };
index e36a38444785e8f227b0a079a2363aeceb749e09..1d77ca15f6e2f386e5587224221a01c6ecba02e5 100644 (file)
@@ -263,7 +263,7 @@ Http::One::Server::handleReply(HttpReply *rep, StoreIOBuffer receivedData)
     }
 
     if (!context->startOfOutput()) {
-        context->sendBody(rep, receivedData);
+        context->sendBody(receivedData);
         return;
     }
 
index 9c71e002561cce9a67a5f6fbcfe080b9b65f67a8..246ca3ffdc45f9dc95fecbba74999629d233a7a9 100644 (file)
 //Http::StreamContext::Http::StreamContext(const ConnectionPointer&, ClientHttpRequest*) STUB
 //Http::StreamContext::~Http::StreamContext() STUB
 bool Http::StreamContext::startOfOutput() const STUB_RETVAL(false)
-void Http::StreamContext::writeComplete(size_t size) STUB
+void Http::StreamContext::writeComplete(size_t) STUB
 void Http::StreamContext::pullData() STUB
 int64_t Http::StreamContext::getNextRangeOffset() const STUB_RETVAL(0)
 bool Http::StreamContext::canPackMoreRanges() const STUB_RETVAL(false)
 clientStream_status_t Http::StreamContext::socketState() STUB_RETVAL(STREAM_NONE)
-void Http::StreamContext::sendBody(HttpReply * rep, StoreIOBuffer bodyData) STUB
+void Http::StreamContext::sendBody(StoreIOBuffer) STUB
 void Http::StreamContext::sendStartOfMessage(HttpReply * rep, StoreIOBuffer bodyData) STUB
-size_t Http::StreamContext::lengthToSend(Range<int64_t> const &available) STUB_RETVAL(0)
+size_t Http::StreamContext::lengthToSend(Range<int64_t> const &available) const STUB_RETVAL(0)
 void Http::StreamContext::noteSentBodyBytes(size_t) STUB
 void Http::StreamContext::buildRangeHeader(HttpReply * rep) STUB
 clientStreamNode * Http::StreamContext::getTail() const STUB_RETVAL(NULL)