From: Amos Jeffries Date: Tue, 11 Nov 2014 13:55:41 +0000 (-0800) Subject: Convert HttpStateData (aka. Http::Client) response buffer to use SBuf X-Git-Tag: merge-candidate-3-v1~240^2~17 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=395a814a5b0c68847ebdfc70f8a30573b4fea8d6;p=thirdparty%2Fsquid.git Convert HttpStateData (aka. Http::Client) response buffer to use SBuf This replaces the MemBuf used to receive server responses with an SBuf in preparation for use by an Http1::ResponseParser. As part of this convert the I/O read processing to Comm::Read / ReadNow API with read(2) operation happening in the read handler instead of buried in the comm code. A side effect of this is detaching the read operation from StoreEntry::delayAwareRead(). Two temporary performance regressions are necessarily introduced with these changes: * The worst is a data copy from SBuf input buffer to a temporary MemBuf so that the chunked body decoder can process the content. This will only be able to be removed when teh chunked decoder is itself converted to process an SBuf. * Also, until the new Http1::ResponseParser is implemented we need to copy the response header contents into a temporary MemBuf for the old HttpReply::parse() method to process. --- diff --git a/src/http.cc b/src/http.cc index 3426a15f3c..cf841d8105 100644 --- a/src/http.cc +++ b/src/http.cc @@ -22,6 +22,7 @@ #include "ChunkedCodingParser.h" #include "client_side.h" #include "comm/Connection.h" +#include "comm/Read.h" #include "comm/Write.h" #include "err_detail_type.h" #include "errorpage.h" @@ -90,8 +91,7 @@ HttpStateData::HttpStateData(FwdState *theFwdState) : AsyncJob("HttpStateData"), ignoreCacheControl = false; surrogateNoStore = false; serverConnection = fwd->serverConnection(); - readBuf = new MemBuf; - readBuf->init(16*1024, 256*1024); + inBuf.reserveSpace(16*1024); // reset peer response time stats for %hier.peer_http_request_sent.tv_sec = 0; @@ -129,11 +129,6 @@ HttpStateData::~HttpStateData() * don't forget that ~Client() gets called automatically */ - if (!readBuf->isNull()) - readBuf->clean(); - - delete readBuf; - if (httpChunkDecoder) delete httpChunkDecoder; @@ -694,17 +689,24 @@ HttpStateData::processReplyHeader() assert(!flags.headers_parsed); - if (!readBuf->hasContent()) { + if (!inBuf.length()) { ctx_exit(ctx); return; } Http::StatusCode error = Http::scNone; + // XXX: performance regression. Convert to Http1::ResponseParser + MemBuf tmp; + tmp.init(); + tmp.append(inBuf.rawContent(), inBuf.length()); + HttpReply *newrep = new HttpReply; - const bool parsed = newrep->parse(readBuf, eof, &error); + const bool parsed = newrep->parse(&tmp, eof, &error); - if (!parsed && readBuf->contentSize() > 5 && strncmp(readBuf->content(), "HTTP/", 5) != 0 && strncmp(readBuf->content(), "ICY", 3) != 0) { + static const SBuf httpMagic("HTTP/"); + static const SBuf icyMagic("ICY"); + if (!parsed && inBuf.length() > 5 && !inBuf.startsWith(httpMagic) && !inBuf.startsWith(icyMagic)) { MemBuf *mb; HttpReply *tmprep = new HttpReply; tmprep->setHeaders(Http::scOkay, "Gatewaying", NULL, -1, -1, -1); @@ -715,7 +717,7 @@ HttpStateData::processReplyHeader() delete tmprep; } else { if (!parsed && error > 0) { // unrecoverable parsing error - debugs(11, 3, "processReplyHeader: Non-HTTP-compliant header: '" << readBuf->content() << "'"); + debugs(11, 3, "processReplyHeader: Non-HTTP-compliant header:\n---------\n" << inBuf << "\n----------"); flags.headers_parsed = true; // XXX: when sanityCheck is gone and Http::StatusLine is used to parse, // the sline should be already set the appropriate values during that parser stage @@ -735,10 +737,10 @@ HttpStateData::processReplyHeader() } debugs(11, 2, "HTTP Server " << serverConnection); - debugs(11, 2, "HTTP Server REPLY:\n---------\n" << readBuf->content() << "\n----------"); + debugs(11, 2, "HTTP Server REPLY:\n---------\n" << inBuf << "\n----------"); - header_bytes_read = headersEnd(readBuf->content(), readBuf->contentSize()); - readBuf->consume(header_bytes_read); + header_bytes_read = headersEnd(inBuf.rawContent(), inBuf.length()); + inBuf.consume(header_bytes_read); } newrep->removeStaleWarnings(); @@ -1100,17 +1102,12 @@ HttpStateData::persistentConnStatus() const return statusIfComplete(); } -/* XXX this function is too long! */ void HttpStateData::readReply(const CommIoCbParams &io) { - int bin; - int clen; - int len = io.size; - flags.do_next_read = false; - debugs(11, 5, HERE << io.conn << ": len " << len << "."); + debugs(11, 5, io.conn); // Bail out early on Comm::ERR_CLOSING - close handlers will tidy up for us if (io.flag == Comm::ERR_CLOSING) { @@ -1123,37 +1120,60 @@ HttpStateData::readReply(const CommIoCbParams &io) return; } - // handle I/O errors - if (io.flag != Comm::OK || len < 0) { - debugs(11, 2, HERE << io.conn << ": read failure: " << xstrerror() << "."); + assert(Comm::IsConnOpen(serverConnection)); + assert(io.conn->fd == serverConnection->fd); - if (ignoreErrno(io.xerrno)) { - flags.do_next_read = true; - } else { - ErrorState *err = new ErrorState(ERR_READ_ERROR, Http::scBadGateway, fwd->request); - err->xerrno = io.xerrno; - fwd->fail(err); - flags.do_next_read = false; - serverConnection->close(); + /* + * Don't reset the timeout value here. The value should be + * counting Config.Timeout.request and applies to the request + * as a whole, not individual read() calls. + * Plus, it breaks our lame *HalfClosed() detection + */ + + CommIoCbParams rd(this); // will be expanded with ReadNow results + rd.conn = io.conn; + rd.size = entry->bytesWanted(Range(0, inBuf.spaceSize())); +#if USE_DELAY_POOLS + if (rd.size < 1) { + assert(entry->mem_obj); + + typedef CommCbMemFunT Dialer; + AsyncCall::Pointer call = JobCallback(11, 5, Dialer, this, HttpStateData::readReply); + + /* read ahead limit */ + /* Perhaps these two calls should both live in MemObject */ + if (!entry->mem_obj->readAheadPolicyCanRead()) { + entry->mem_obj->delayRead(DeferredRead(DeferReader, this, CommRead(io.conn, NULL, 0, call))); + return; } + /* delay id limit */ + entry->mem_obj->mostBytesAllowed().delayRead(DeferredRead(DeferReader, this, CommRead(io.conn, NULL, 0, call))); return; } +#endif - // update I/O stats - if (len > 0) { - readBuf->appended(len); - reply_bytes_read += len; + switch (Comm::ReadNow(rd, inBuf)) { + case Comm::INPROGRESS: + if (inBuf.isEmpty()) + debugs(33, 2, io.conn << ": no data to process, " << xstrerr(rd.xerrno)); + maybeReadVirginBody(); + return; + + case Comm::OK: + { + reply_bytes_read += rd.size; #if USE_DELAY_POOLS DelayId delayId = entry->mem_obj->mostBytesAllowed(); - delayId.bytesIn(len); + delayId.bytesIn(rd.size); #endif - kb_incr(&(statCounter.server.all.kbytes_in), len); - kb_incr(&(statCounter.server.http.kbytes_in), len); + kb_incr(&(statCounter.server.all.kbytes_in), rd.size); + kb_incr(&(statCounter.server.http.kbytes_in), rd.size); ++ IOStats.Http.reads; - for (clen = len - 1, bin = 0; clen; ++bin) + int bin = 0; + for (int clen = rd.size - 1; clen; ++bin) clen >>= 1; ++ IOStats.Http.read_hist[bin]; @@ -1164,17 +1184,10 @@ HttpStateData::readReply(const CommIoCbParams &io) sent.tv_sec ? tvSubMsec(sent, current_time) : -1; } - /** \par - * Here the RFC says we should ignore whitespace between replies, but we can't as - * doing so breaks HTTP/0.9 replies beginning with witespace, and in addition - * the response splitting countermeasures is extremely likely to trigger on this, - * not allowing connection reuse in the first place. - * - * 2012-02-10: which RFC? not 2068 or 2616, - * tolerance there is all about whitespace between requests and header tokens. - */ + /* Continue to process previously read data */ + break; - if (len == 0) { // reached EOF? + case Comm::ENDFILE: // close detected by 0-byte read eof = 1; flags.do_next_read = false; @@ -1182,7 +1195,7 @@ HttpStateData::readReply(const CommIoCbParams &io) * Ensure here that we have at minimum two \r\n when EOF is seen. * TODO: Add eof parameter to headersEnd() and move this hack there. */ - if (readBuf->contentSize() && !flags.headers_parsed) { + if (inBuf.length() && !flags.headers_parsed) { /* * Yes Henrik, there is a point to doing this. When we * called httpProcessReplyHeader() before, we didn't find @@ -1190,10 +1203,30 @@ HttpStateData::readReply(const CommIoCbParams &io) * we want to process the reply headers. */ /* Fake an "end-of-headers" to work around such broken servers */ - readBuf->append("\r\n", 2); + inBuf.append("\r\n", 2); + } + + /* Continue to process previously read data */ + break; + + // case Comm::COMM_ERROR: + default: // no other flags should ever occur + debugs(11, 2, io.conn << ": read failure: " << xstrerr(rd.xerrno)); + + if (ignoreErrno(rd.xerrno)) { + flags.do_next_read = true; + } else { + ErrorState *err = new ErrorState(ERR_READ_ERROR, Http::scBadGateway, fwd->request); + err->xerrno = rd.xerrno; + fwd->fail(err); + flags.do_next_read = false; + io.conn->close(); } + + return; } + /* Process next response from buffer */ processReply(); } @@ -1239,7 +1272,7 @@ HttpStateData::continueAfterParsingHeader() } if (!flags.headers_parsed && !eof) { - debugs(11, 9, HERE << "needs more at " << readBuf->contentSize()); + debugs(11, 9, "needs more at " << inBuf.length()); flags.do_next_read = true; /** \retval false If we have not finished parsing the headers and may get more data. * Schedules more reads to retrieve the missing data. @@ -1273,7 +1306,7 @@ HttpStateData::continueAfterParsingHeader() } } else { assert(eof); - if (readBuf->hasContent()) { + if (inBuf.length()) { error = ERR_INVALID_RESP; debugs(11, DBG_IMPORTANT, "WARNING: HTTP: Invalid Response: Headers did not parse at all for " << entry->url() << " AKA " << request->GetHost() << request->urlpath.termedBuf() ); } else { @@ -1313,7 +1346,7 @@ HttpStateData::truncateVirginBody() " clen=" << clen << '/' << vrep->content_length << " body_bytes_truncated=" << body_bytes_truncated << '+' << extras); - readBuf->truncate(extras); + inBuf.chop(0, inBuf.length() - extras); body_bytes_truncated += extras; } } @@ -1326,10 +1359,10 @@ void HttpStateData::writeReplyBody() { truncateVirginBody(); // if needed - const char *data = readBuf->content(); - int len = readBuf->contentSize(); + const char *data = inBuf.rawContent(); + int len = inBuf.length(); addVirginReplyBody(data, len); - readBuf->consume(len); + inBuf.consume(len); } bool @@ -1343,7 +1376,13 @@ HttpStateData::decodeAndWriteReplyBody() SQUID_ENTER_THROWING_CODE(); MemBuf decodedData; decodedData.init(); - const bool doneParsing = httpChunkDecoder->parse(readBuf,&decodedData); + // XXX: performance regression. SBuf-convert (or Parser-convert?) the chunked decoder. + MemBuf encodedData; + encodedData.init(); + // NP: we must do this instead of pointing encodedData at the SBuf::rawContent + // because chunked decoder uses MemBuf::consume, which shuffles buffer bytes around. + encodedData.append(inBuf.rawContent(), inBuf.length()); + const bool doneParsing = httpChunkDecoder->parse(&encodedData,&decodedData); len = decodedData.contentSize(); data=decodedData.content(); addVirginReplyBody(data, len); @@ -1474,28 +1513,23 @@ HttpStateData::maybeReadVirginBody() // we may need to grow the buffer if headers do not fit const int minRead = flags.headers_parsed ? 0 :1024; - const int read_size = replyBodySpace(*readBuf, minRead); + const int read_size = needBufferSpace(inBuf, minRead); - debugs(11,9, HERE << (flags.do_next_read ? "may" : "wont") << + debugs(11,9, (flags.do_next_read ? "may" : "wont") << " read up to " << read_size << " bytes from " << serverConnection); - /* - * why <2? Because delayAwareRead() won't actually read if - * you ask it to read 1 byte. The delayed read request - * just gets re-queued until the client side drains, then - * the I/O thread hangs. Better to not register any read - * handler until we get a notification from someone that - * its okay to read again. - */ - if (read_size < 2) + if (!flags.do_next_read) return; - if (flags.do_next_read) { - flags.do_next_read = false; - typedef CommCbMemFunT Dialer; - entry->delayAwareRead(serverConnection, readBuf->space(read_size), read_size, - JobCallback(11, 5, Dialer, this, HttpStateData::readReply)); - } + flags.do_next_read = false; + + // must not already be waiting for read(2) ... + assert(!Comm::MonitorsRead(serverConnection->fd)); + + // wait for read(2) to be possible. + typedef CommCbMemFunT Dialer; + AsyncCall::Pointer call = JobCallback(11, 5, Dialer, this, HttpStateData::readReply); + Comm::Read(serverConnection, call); } /// called after writing the very last request byte (body, last-chunk, etc) diff --git a/src/http.h b/src/http.h index e4606e5e9a..bded4027c4 100644 --- a/src/http.h +++ b/src/http.h @@ -50,7 +50,7 @@ public: int header_bytes_read; // to find end of response, int64_t reply_bytes_read; // without relying on StoreEntry int body_bytes_truncated; // positive when we read more than we wanted - MemBuf *readBuf; + SBuf inBuf; ///< I/O buffer for receiving server responses bool ignoreCacheControl; bool surrogateNoStore;