]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Convert HttpStateData (aka. Http::Client) response buffer to use SBuf
authorAmos Jeffries <squid3@treenet.co.nz>
Tue, 11 Nov 2014 13:55:41 +0000 (05:55 -0800)
committerAmos Jeffries <squid3@treenet.co.nz>
Tue, 11 Nov 2014 13:55:41 +0000 (05:55 -0800)
This replaces the MemBuf used to receive server responses with an SBuf in
preparation for use by an Http1::ResponseParser.

As part of this convert the I/O read processing to Comm::Read / ReadNow
API with read(2) operation happening in the read handler instead of
buried in the comm code. A side effect of this is detaching the read
operation from StoreEntry::delayAwareRead().

Two temporary performance regressions are necessarily introduced with
these changes:
* The worst is a data copy from SBuf input buffer to a temporary MemBuf
  so that the chunked body decoder can process the content. This will
  only be able to be removed when teh chunked decoder is itself converted
  to process an SBuf.

* Also, until the new Http1::ResponseParser is implemented we need to
  copy the response header contents into a temporary MemBuf for the old
  HttpReply::parse() method to process.

src/http.cc
src/http.h

index 3426a15f3c659bbd1493c4d62f66663e36102e89..cf841d8105e138a6c7fd9a0c47844f0b3ddb9769 100644 (file)
@@ -22,6 +22,7 @@
 #include "ChunkedCodingParser.h"
 #include "client_side.h"
 #include "comm/Connection.h"
+#include "comm/Read.h"
 #include "comm/Write.h"
 #include "err_detail_type.h"
 #include "errorpage.h"
@@ -90,8 +91,7 @@ HttpStateData::HttpStateData(FwdState *theFwdState) : AsyncJob("HttpStateData"),
     ignoreCacheControl = false;
     surrogateNoStore = false;
     serverConnection = fwd->serverConnection();
-    readBuf = new MemBuf;
-    readBuf->init(16*1024, 256*1024);
+    inBuf.reserveSpace(16*1024);
 
     // reset peer response time stats for %<pt
     request->hier.peer_http_request_sent.tv_sec = 0;
@@ -129,11 +129,6 @@ HttpStateData::~HttpStateData()
      * don't forget that ~Client() gets called automatically
      */
 
-    if (!readBuf->isNull())
-        readBuf->clean();
-
-    delete readBuf;
-
     if (httpChunkDecoder)
         delete httpChunkDecoder;
 
@@ -694,17 +689,24 @@ HttpStateData::processReplyHeader()
 
     assert(!flags.headers_parsed);
 
-    if (!readBuf->hasContent()) {
+    if (!inBuf.length()) {
         ctx_exit(ctx);
         return;
     }
 
     Http::StatusCode error = Http::scNone;
 
+    // XXX: performance regression. Convert to Http1::ResponseParser
+    MemBuf tmp;
+    tmp.init();
+    tmp.append(inBuf.rawContent(), inBuf.length());
+
     HttpReply *newrep = new HttpReply;
-    const bool parsed = newrep->parse(readBuf, eof, &error);
+    const bool parsed = newrep->parse(&tmp, eof, &error);
 
-    if (!parsed && readBuf->contentSize() > 5 && strncmp(readBuf->content(), "HTTP/", 5) != 0 && strncmp(readBuf->content(), "ICY", 3) != 0) {
+    static const SBuf httpMagic("HTTP/");
+    static const SBuf icyMagic("ICY");
+    if (!parsed && inBuf.length() > 5 && !inBuf.startsWith(httpMagic) && !inBuf.startsWith(icyMagic)) {
         MemBuf *mb;
         HttpReply *tmprep = new HttpReply;
         tmprep->setHeaders(Http::scOkay, "Gatewaying", NULL, -1, -1, -1);
@@ -715,7 +717,7 @@ HttpStateData::processReplyHeader()
         delete tmprep;
     } else {
         if (!parsed && error > 0) { // unrecoverable parsing error
-            debugs(11, 3, "processReplyHeader: Non-HTTP-compliant header: '" <<  readBuf->content() << "'");
+            debugs(11, 3, "processReplyHeader: Non-HTTP-compliant header:\n---------\n" << inBuf << "\n----------");
             flags.headers_parsed = true;
             // XXX: when sanityCheck is gone and Http::StatusLine is used to parse,
             //   the sline should be already set the appropriate values during that parser stage
@@ -735,10 +737,10 @@ HttpStateData::processReplyHeader()
         }
 
         debugs(11, 2, "HTTP Server " << serverConnection);
-        debugs(11, 2, "HTTP Server REPLY:\n---------\n" << readBuf->content() << "\n----------");
+        debugs(11, 2, "HTTP Server REPLY:\n---------\n" << inBuf << "\n----------");
 
-        header_bytes_read = headersEnd(readBuf->content(), readBuf->contentSize());
-        readBuf->consume(header_bytes_read);
+        header_bytes_read = headersEnd(inBuf.rawContent(), inBuf.length());
+        inBuf.consume(header_bytes_read);
     }
 
     newrep->removeStaleWarnings();
@@ -1100,17 +1102,12 @@ HttpStateData::persistentConnStatus() const
     return statusIfComplete();
 }
 
-/* XXX this function is too long! */
 void
 HttpStateData::readReply(const CommIoCbParams &io)
 {
-    int bin;
-    int clen;
-    int len = io.size;
-
     flags.do_next_read = false;
 
-    debugs(11, 5, HERE << io.conn << ": len " << len << ".");
+    debugs(11, 5, io.conn);
 
     // Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us
     if (io.flag == Comm::ERR_CLOSING) {
@@ -1123,37 +1120,60 @@ HttpStateData::readReply(const CommIoCbParams &io)
         return;
     }
 
-    // handle I/O errors
-    if (io.flag != Comm::OK || len < 0) {
-        debugs(11, 2, HERE << io.conn << ": read failure: " << xstrerror() << ".");
+    assert(Comm::IsConnOpen(serverConnection));
+    assert(io.conn->fd == serverConnection->fd);
 
-        if (ignoreErrno(io.xerrno)) {
-            flags.do_next_read = true;
-        } else {
-            ErrorState *err = new ErrorState(ERR_READ_ERROR, Http::scBadGateway, fwd->request);
-            err->xerrno = io.xerrno;
-            fwd->fail(err);
-            flags.do_next_read = false;
-            serverConnection->close();
+    /*
+     * Don't reset the timeout value here. The value should be
+     * counting Config.Timeout.request and applies to the request
+     * as a whole, not individual read() calls.
+     * Plus, it breaks our lame *HalfClosed() detection
+     */
+
+    CommIoCbParams rd(this); // will be expanded with ReadNow results
+    rd.conn = io.conn;
+    rd.size = entry->bytesWanted(Range<size_t>(0, inBuf.spaceSize()));
+#if USE_DELAY_POOLS
+    if (rd.size < 1) {
+        assert(entry->mem_obj);
+
+        typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
+        AsyncCall::Pointer call = JobCallback(11, 5, Dialer, this, HttpStateData::readReply);
+
+        /* read ahead limit */
+        /* Perhaps these two calls should both live in MemObject */
+        if (!entry->mem_obj->readAheadPolicyCanRead()) {
+            entry->mem_obj->delayRead(DeferredRead(DeferReader, this, CommRead(io.conn, NULL, 0, call)));
+            return;
         }
 
+        /* delay id limit */
+        entry->mem_obj->mostBytesAllowed().delayRead(DeferredRead(DeferReader, this, CommRead(io.conn, NULL, 0, call)));
         return;
     }
+#endif
 
-    // update I/O stats
-    if (len > 0) {
-        readBuf->appended(len);
-        reply_bytes_read += len;
+    switch (Comm::ReadNow(rd, inBuf)) {
+    case Comm::INPROGRESS:
+        if (inBuf.isEmpty())
+            debugs(33, 2, io.conn << ": no data to process, " << xstrerr(rd.xerrno));
+        maybeReadVirginBody();
+        return;
+
+    case Comm::OK:
+    {
+        reply_bytes_read += rd.size;
 #if USE_DELAY_POOLS
         DelayId delayId = entry->mem_obj->mostBytesAllowed();
-        delayId.bytesIn(len);
+        delayId.bytesIn(rd.size);
 #endif
 
-        kb_incr(&(statCounter.server.all.kbytes_in), len);
-        kb_incr(&(statCounter.server.http.kbytes_in), len);
+        kb_incr(&(statCounter.server.all.kbytes_in), rd.size);
+        kb_incr(&(statCounter.server.http.kbytes_in), rd.size);
         ++ IOStats.Http.reads;
 
-        for (clen = len - 1, bin = 0; clen; ++bin)
+        int bin = 0;
+        for (int clen = rd.size - 1; clen; ++bin)
             clen >>= 1;
 
         ++ IOStats.Http.read_hist[bin];
@@ -1164,17 +1184,10 @@ HttpStateData::readReply(const CommIoCbParams &io)
             sent.tv_sec ? tvSubMsec(sent, current_time) : -1;
     }
 
-    /** \par
-     * Here the RFC says we should ignore whitespace between replies, but we can't as
-     * doing so breaks HTTP/0.9 replies beginning with witespace, and in addition
-     * the response splitting countermeasures is extremely likely to trigger on this,
-     * not allowing connection reuse in the first place.
-     *
-     * 2012-02-10: which RFC? not 2068 or 2616,
-     *     tolerance there is all about whitespace between requests and header tokens.
-     */
+        /* Continue to process previously read data */
+        break;
 
-    if (len == 0) { // reached EOF?
+    case Comm::ENDFILE: // close detected by 0-byte read
         eof = 1;
         flags.do_next_read = false;
 
@@ -1182,7 +1195,7 @@ HttpStateData::readReply(const CommIoCbParams &io)
          * Ensure here that we have at minimum two \r\n when EOF is seen.
          * TODO: Add eof parameter to headersEnd() and move this hack there.
          */
-        if (readBuf->contentSize() && !flags.headers_parsed) {
+        if (inBuf.length() && !flags.headers_parsed) {
             /*
              * Yes Henrik, there is a point to doing this.  When we
              * called httpProcessReplyHeader() before, we didn't find
@@ -1190,10 +1203,30 @@ HttpStateData::readReply(const CommIoCbParams &io)
              * we want to process the reply headers.
              */
             /* Fake an "end-of-headers" to work around such broken servers */
-            readBuf->append("\r\n", 2);
+            inBuf.append("\r\n", 2);
+        }
+
+        /* Continue to process previously read data */
+        break;
+
+        // case Comm::COMM_ERROR:
+    default: // no other flags should ever occur
+        debugs(11, 2, io.conn << ": read failure: " << xstrerr(rd.xerrno));
+
+        if (ignoreErrno(rd.xerrno)) {
+            flags.do_next_read = true;
+        } else {
+            ErrorState *err = new ErrorState(ERR_READ_ERROR, Http::scBadGateway, fwd->request);
+            err->xerrno = rd.xerrno;
+            fwd->fail(err);
+            flags.do_next_read = false;
+            io.conn->close();
         }
+
+        return;
     }
 
+    /* Process next response from buffer */
     processReply();
 }
 
@@ -1239,7 +1272,7 @@ HttpStateData::continueAfterParsingHeader()
     }
 
     if (!flags.headers_parsed && !eof) {
-        debugs(11, 9, HERE << "needs more at " << readBuf->contentSize());
+        debugs(11, 9, "needs more at " << inBuf.length());
         flags.do_next_read = true;
         /** \retval false If we have not finished parsing the headers and may get more data.
          *                Schedules more reads to retrieve the missing data.
@@ -1273,7 +1306,7 @@ HttpStateData::continueAfterParsingHeader()
         }
     } else {
         assert(eof);
-        if (readBuf->hasContent()) {
+        if (inBuf.length()) {
             error = ERR_INVALID_RESP;
             debugs(11, DBG_IMPORTANT, "WARNING: HTTP: Invalid Response: Headers did not parse at all for " << entry->url() << " AKA " << request->GetHost() << request->urlpath.termedBuf() );
         } else {
@@ -1313,7 +1346,7 @@ HttpStateData::truncateVirginBody()
                " clen=" << clen << '/' << vrep->content_length <<
                " body_bytes_truncated=" << body_bytes_truncated << '+' << extras);
 
-        readBuf->truncate(extras);
+        inBuf.chop(0, inBuf.length() - extras);
         body_bytes_truncated += extras;
     }
 }
@@ -1326,10 +1359,10 @@ void
 HttpStateData::writeReplyBody()
 {
     truncateVirginBody(); // if needed
-    const char *data = readBuf->content();
-    int len = readBuf->contentSize();
+    const char *data = inBuf.rawContent();
+    int len = inBuf.length();
     addVirginReplyBody(data, len);
-    readBuf->consume(len);
+    inBuf.consume(len);
 }
 
 bool
@@ -1343,7 +1376,13 @@ HttpStateData::decodeAndWriteReplyBody()
     SQUID_ENTER_THROWING_CODE();
     MemBuf decodedData;
     decodedData.init();
-    const bool doneParsing = httpChunkDecoder->parse(readBuf,&decodedData);
+    // XXX: performance regression. SBuf-convert (or Parser-convert?) the chunked decoder.
+    MemBuf encodedData;
+    encodedData.init();
+    // NP: we must do this instead of pointing encodedData at the SBuf::rawContent
+    // because chunked decoder uses MemBuf::consume, which shuffles buffer bytes around.
+    encodedData.append(inBuf.rawContent(), inBuf.length());
+    const bool doneParsing = httpChunkDecoder->parse(&encodedData,&decodedData);
     len = decodedData.contentSize();
     data=decodedData.content();
     addVirginReplyBody(data, len);
@@ -1474,28 +1513,23 @@ HttpStateData::maybeReadVirginBody()
 
     // we may need to grow the buffer if headers do not fit
     const int minRead = flags.headers_parsed ? 0 :1024;
-    const int read_size = replyBodySpace(*readBuf, minRead);
+    const int read_size = needBufferSpace(inBuf, minRead);
 
-    debugs(11,9, HERE << (flags.do_next_read ? "may" : "wont") <<
+    debugs(11,9, (flags.do_next_read ? "may" : "wont") <<
            " read up to " << read_size << " bytes from " << serverConnection);
 
-    /*
-     * why <2? Because delayAwareRead() won't actually read if
-     * you ask it to read 1 byte.  The delayed read request
-     * just gets re-queued until the client side drains, then
-     * the I/O thread hangs.  Better to not register any read
-     * handler until we get a notification from someone that
-     * its okay to read again.
-     */
-    if (read_size < 2)
+    if (!flags.do_next_read)
         return;
 
-    if (flags.do_next_read) {
-        flags.do_next_read = false;
-        typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
-        entry->delayAwareRead(serverConnection, readBuf->space(read_size), read_size,
-                              JobCallback(11, 5, Dialer, this,  HttpStateData::readReply));
-    }
+    flags.do_next_read = false;
+
+    // must not already be waiting for read(2) ...
+    assert(!Comm::MonitorsRead(serverConnection->fd));
+
+    // wait for read(2) to be possible.
+    typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
+    AsyncCall::Pointer call = JobCallback(11, 5, Dialer, this, HttpStateData::readReply);
+    Comm::Read(serverConnection, call);
 }
 
 /// called after writing the very last request byte (body, last-chunk, etc)
index e4606e5e9ac3b89f7223e7c16f8cd5e4288ca7c6..bded4027c40980b8d8b9636aa5d1c31e23b3fe83 100644 (file)
@@ -50,7 +50,7 @@ public:
     int header_bytes_read;     // to find end of response,
     int64_t reply_bytes_read;  // without relying on StoreEntry
     int body_bytes_truncated; // positive when we read more than we wanted
-    MemBuf *readBuf;
+    SBuf inBuf;                ///< I/O buffer for receiving server responses
     bool ignoreCacheControl;
     bool surrogateNoStore;