From: Amos Jeffries Date: Fri, 8 Jan 2016 20:13:40 +0000 (+1300) Subject: Support HTTP/2 multiplex stream ID mechanism in pipeline management X-Git-Tag: SQUID_4_0_5~14^2~7 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=99039d374b277d4ca9f87b2ce563856e1d6e3ad8;p=thirdparty%2Fsquid.git Support HTTP/2 multiplex stream ID mechanism in pipeline management * Add stream ID member to contexts. Set by the connection Server. * Use stream ID to clear pipeline entries when contexts finish. Removing HTTP/1.x sequential processing assumptions. --- diff --git a/src/Pipeline.cc b/src/Pipeline.cc index 32155f7c79..58f2cc4448 100644 --- a/src/Pipeline.cc +++ b/src/Pipeline.cc @@ -21,6 +21,7 @@ Pipeline::add(const Http::StreamContextPointer &c) { requests.push_back(c); ++nrequests; + ++nactive; debugs(33, 3, "Pipeline " << (void*)this << " add request " << nrequests << ' ' << c); } @@ -49,15 +50,24 @@ Pipeline::terminateAll(int xerrno) } void -Pipeline::popMe(const Http::StreamContextPointer &which) +Pipeline::popById(uint32_t which) { if (requests.empty()) return; - debugs(33, 3, "Pipeline " << (void*)this << " drop " << requests.front()); - // in reality there may be multiple contexts doing processing in parallel. - // XXX: pipeline still assumes HTTP/1 FIFO semantics are obeyed. - assert(which == requests.front()); - requests.pop_front(); + debugs(33, 3, "Pipeline " << (void*)this << " drop id=" << which); + + // find the context and clear its Pointer + for (auto &&i : requests) { + if (i->id == which) { + i = nullptr; + --nactive; + break; + } + } + + // trim closed contexts from the list head (if any) + while (!requests.empty() && !requests.front()) + requests.pop_front(); } diff --git a/src/Pipeline.h b/src/Pipeline.h index 14d25d1427..7515687a68 100644 --- a/src/Pipeline.h +++ b/src/Pipeline.h @@ -37,7 +37,7 @@ class Pipeline Pipeline & operator =(const Pipeline &) = delete; public: - Pipeline() : nrequests(0) {} + Pipeline() : nrequests(0), nactive(0) {} ~Pipeline() = default; /// register a new request context to the pipeline @@ -47,7 +47,7 @@ public: Http::StreamContextPointer front() const; /// how many requests are currently pipelined - size_t count() const {return requests.size();} + size_t count() const {return nactive;} /// whether there are none or any requests currently pipelined bool empty() const {return requests.empty();} @@ -55,8 +55,8 @@ public: /// tell everybody about the err, and abort all waiting requests void terminateAll(const int xerrno); - /// deregister the front request from the pipeline - void popMe(const Http::StreamContextPointer &); + /// deregister a request from the pipeline + void popById(uint32_t); /// Number of requests seen in this pipeline (so far). /// Includes incomplete transactions. @@ -65,6 +65,10 @@ public: private: /// requests parsed from the connection but not yet completed. std::list requests; + + /// Number of still-active streams in this pipeline (so far). + /// Includes incomplete transactions. + uint32_t nactive; }; #endif /* SQUID_SRC_PIPELINE_H */ diff --git a/src/client_side.cc b/src/client_side.cc index e71a3d2e67..4deace0fd5 100644 --- a/src/client_side.cc +++ b/src/client_side.cc @@ -252,11 +252,11 @@ Http::StreamContext::finished() assert(connRegistered_); connRegistered_ = false; - assert(conn->pipeline.front() == this); // XXX: still assumes HTTP/1 semantics - conn->pipeline.popMe(Http::StreamContextPointer(this)); + conn->pipeline.popById(id); } -Http::StreamContext::StreamContext(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) : +Http::StreamContext::StreamContext(uint32_t anId, const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq) : + id(anId), clientConnection(aConn), http(aReq), reply(NULL), @@ -1708,7 +1708,7 @@ ConnStateData::abortRequestParsing(const char *const uri) http->req_sz = inBuf.length(); http->uri = xstrdup(uri); setLogUri (http, uri); - auto *context = new Http::StreamContext(clientConnection, http); + auto *context = new Http::StreamContext(nextStreamId(), clientConnection, http); StoreIOBuffer tempBuffer; tempBuffer.data = context->reqbuf; tempBuffer.length = HTTP_REQBUF_SZ; @@ -2054,7 +2054,7 @@ parseHttpRequest(ConnStateData *csd, const Http1::RequestParserPointer &hp) ClientHttpRequest *http = new ClientHttpRequest(csd); http->req_sz = hp->messageHeaderSize(); - Http::StreamContext *result = new Http::StreamContext(csd->clientConnection, http); + Http::StreamContext *result = new Http::StreamContext(csd->nextStreamId(), csd->clientConnection, http); StoreIOBuffer tempBuffer; tempBuffer.data = result->reqbuf; @@ -2271,8 +2271,7 @@ clientTunnelOnError(ConnStateData *conn, Http::StreamContext *context, HttpReque // XXX: Either the context is finished() or it should stay queued. // The below may leak client streams BodyPipe objects. BUT, we need // to check if client-streams detatch is safe to do here (finished() will detatch). - assert(conn->pipeline.front() == context); // XXX: still assumes HTTP/1 semantics - conn->pipeline.popMe(Http::StreamContextPointer(context)); + conn->pipeline.popById(context->id); } Comm::SetSelect(conn->clientConnection->fd, COMM_SELECT_READ, NULL, NULL, 0); conn->fakeAConnectRequest("unknown-protocol", conn->preservedClientData); diff --git a/src/client_side.h b/src/client_side.h index 8d375f03e2..a837f6a6de 100644 --- a/src/client_side.h +++ b/src/client_side.h @@ -71,6 +71,7 @@ public: virtual ~ConnStateData(); /* ::Server API */ + virtual uint32_t nextStreamId() {return ++nextStreamId_;} virtual void receivedFirstByte(); virtual bool handleReadData(); virtual void afterClientRead(); diff --git a/src/http/StreamContext.h b/src/http/StreamContext.h index 47a0dcf577..2d6ed6497b 100644 --- a/src/http/StreamContext.h +++ b/src/http/StreamContext.h @@ -68,12 +68,16 @@ class StreamContext : public RefCountable public: /// construct with HTTP/1.x details - StreamContext(const Comm::ConnectionPointer &aConn, ClientHttpRequest *aReq); + StreamContext(uint32_t id, const Comm::ConnectionPointer &, ClientHttpRequest *); ~StreamContext(); bool startOfOutput() const; void writeComplete(size_t size); +public: + // NP: stream ID is relative to the connection, not global. + uint32_t id; ///< stream ID within the client connection. + public: // HTTP/1.x state data Comm::ConnectionPointer clientConnection; ///< details about the client connection socket diff --git a/src/servers/FtpServer.cc b/src/servers/FtpServer.cc index 5b8f08cda2..00993a69ba 100644 --- a/src/servers/FtpServer.cc +++ b/src/servers/FtpServer.cc @@ -742,7 +742,7 @@ Ftp::Server::parseOneRequest() http->uri = newUri; Http::StreamContext *const result = - new Http::StreamContext(clientConnection, http); + new Http::StreamContext(nextStreamId(), clientConnection, http); StoreIOBuffer tempBuffer; tempBuffer.data = result->reqbuf; diff --git a/src/servers/Server.cc b/src/servers/Server.cc index 65bd30b6d4..ab4fe68e66 100644 --- a/src/servers/Server.cc +++ b/src/servers/Server.cc @@ -26,7 +26,8 @@ Server::Server(const MasterXaction::Pointer &xact) : clientConnection(xact->tcpClient), transferProtocol(xact->squidPort->transport), port(xact->squidPort), - receivedFirstByte_(false) + receivedFirstByte_(false), + nextStreamId_(0) {} bool diff --git a/src/servers/Server.h b/src/servers/Server.h index e9bfbf3146..21f8ff999a 100644 --- a/src/servers/Server.h +++ b/src/servers/Server.h @@ -35,6 +35,9 @@ public: virtual bool doneAll() const; virtual void swanSong(); + /// fetch the next available stream ID + virtual uint32_t nextStreamId() = 0; + /// ?? virtual bool connFinishedWithConn(int size) = 0; @@ -117,6 +120,7 @@ protected: void doClientRead(const CommIoCbParams &io); void clientWriteDone(const CommIoCbParams &io); + uint32_t nextStreamId_; ///< incremented as streams are initiated AsyncCall::Pointer reader; ///< set when we are reading AsyncCall::Pointer writer; ///< set when we are writing };