From 3ed5793b4f8b919aaa698567d0334c94034ad9f2 Mon Sep 17 00:00:00 2001 From: Alex Rousskov Date: Mon, 10 Jun 2013 14:46:08 -0600 Subject: [PATCH] Support forwarding intercepted but not bumped connections to cache_peers. When talking to a cache_peer (i.e., sending a CONNECT request before tunneling the transaction), tunnel code is using a clever hack: Squid does not parse the CONNECT response from peer but blindly forwards it to the client. This works great and simplifies code a lot, except when the client connection was intercepted and, hence, the client did not send a CONNECT request and is not expecting a CONNECT response. In those situations, we now accumulate, parse, and strip the peer CONNECT response (or close connection on errors). The existing tunnel I/O code is too simple to accommodate that task -- it cannot accumulate read data (its I/O buffers work in lockstep fashion, writing everything it reads before reading again). Instead of rewriting the entire tunnel code to use more complex buffers, I added a temporary accumulation buffer for the CONNECT response. That buffer is not allocated unless it is needed and does not grow beyond SQUID_TCP_SO_RCVBUF size, just like the simple buffers. --- src/tunnel.cc | 226 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 218 insertions(+), 8 deletions(-) diff --git a/src/tunnel.cc b/src/tunnel.cc index 93b9d727e2..60265e33a6 100644 --- a/src/tunnel.cc +++ b/src/tunnel.cc @@ -91,6 +91,12 @@ public: static void WriteClientDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data); static void WriteServerDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data); + /// Starts reading peer response to our CONNECT request. + void readConnectResponse(); + + /// Called when we may be done handling a CONNECT exchange with the peer. + void connectExchangeCheckpoint(); + bool noConnections() const; char *url; HttpRequest::Pointer request; @@ -100,6 +106,17 @@ public: return (server.conn != NULL && server.conn->getPeer() ? server.conn->getPeer()->host : request->GetHost()); }; + /// Whether we are writing a CONNECT request to a peer. + bool waitingForConnectRequest() const { return connectReqWriting; } + /// Whether we are reading a CONNECT response from a peer. + bool waitingForConnectResponse() const { return connectRespBuf; } + /// Whether we are waiting for the CONNECT request/response exchange with the peer. + bool waitingForConnectExchange() const { return waitingForConnectRequest() || waitingForConnectResponse(); } + + /// Whether the client sent a CONNECT request to us. + bool clientExpectsConnectResponse() const { return !(request != NULL && + (request->flags.interceptTproxy || request->flags.intercepted)); } + class Connection { @@ -117,6 +134,8 @@ public: void error(int const xerrno); int debugLevelForError(int const xerrno) const; + /// handles a non-I/O error associated with this Connection + void logicError(const char *errMsg); void closeIfOpen(); void dataSent (size_t amount); int len; @@ -135,15 +154,23 @@ public: Connection client, server; int *status_ptr; /* pointer to status for logging */ + MemBuf *connectRespBuf; ///< accumulates peer CONNECT response when we need it + bool connectReqWriting; ///< whether we are writing a CONNECT request to a peer + void copyRead(Connection &from, IOCB *completion); private: CBDATA_CLASS2(TunnelStateData); - void copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, IOCB *); + bool keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to); + void copy(size_t len, Connection &from, Connection &to, IOCB *); + void handleConnectResponse(const size_t chunkSize); void readServer(char *buf, size_t len, comm_err_t errcode, int xerrno); void readClient(char *buf, size_t len, comm_err_t errcode, int xerrno); void writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno); void writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno); + + static void ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data); + void readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, int xerrno); }; static const char *const conn_established = "HTTP/1.1 200 Connection established\r\n\r\n"; @@ -196,7 +223,9 @@ tunnelClientClosed(const CommCloseCbParams ¶ms) TunnelStateData::TunnelStateData() : url(NULL), request(NULL), - status_ptr(NULL) + status_ptr(NULL), + connectRespBuf(NULL), + connectReqWriting(false) { debugs(26, 3, "TunnelStateData constructed this=" << this); } @@ -207,6 +236,7 @@ TunnelStateData::~TunnelStateData() assert(noConnections()); xfree(url); serverDestinations.clean(); + delete connectRespBuf; } TunnelStateData::Connection::~Connection() @@ -282,7 +312,108 @@ TunnelStateData::readServer(char *buf, size_t len, comm_err_t errcode, int xerrn kb_incr(&(statCounter.server.other.kbytes_in), len); } - copy (len, errcode, xerrno, server, client, WriteClientDone); + if (keepGoingAfterRead(len, errcode, xerrno, server, client)) + copy(len, server, client, WriteClientDone); +} + +/// Called when we read [a part of] CONNECT response from the peer +void +TunnelStateData::readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, int xerrno) +{ + debugs(26, 3, server.conn << ", read " << len << " bytes, err=" << errcode); + assert(waitingForConnectResponse()); + + if (errcode == COMM_ERR_CLOSING) + return; + + if (len > 0) { + connectRespBuf->appended(len); + server.bytesIn(len); + kb_incr(&(statCounter.server.all.kbytes_in), len); + kb_incr(&(statCounter.server.other.kbytes_in), len); + } + + if (keepGoingAfterRead(len, errcode, xerrno, server, client)) + handleConnectResponse(len); +} + +/* Read from client side and queue it for writing to the server */ +void +TunnelStateData::ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data) +{ + TunnelStateData *tunnelState = (TunnelStateData *)data; + assert (cbdataReferenceValid (tunnelState)); + + tunnelState->readConnectResponseDone(buf, len, errcode, xerrno); +} + +/// Parses [possibly incomplete] CONNECT response and reacts to it. +/// If the tunnel is being closed or more response data is needed, returns false. +/// Otherwise, the caller should handle the remaining read data, if any. +void +TunnelStateData::handleConnectResponse(const size_t chunkSize) +{ + assert(waitingForConnectResponse()); + + // Ideally, client and server should use MemBuf or better, but current code + // never accumulates more than one read when shoveling data (XXX) so it does + // not need to deal with MemBuf complexity. To keep it simple, we use a + // dedicated MemBuf for accumulating CONNECT responses. TODO: When shoveling + // is optimized, reuse server.buf for CONNEC response accumulation instead. + + /* mimic the basic parts of HttpStateData::processReplyHeader() */ + HttpReply rep; + Http::StatusCode parseErr = Http::scNone; + const bool eof = !chunkSize; + const bool parsed = rep.parse(connectRespBuf, eof, &parseErr); + if (!parsed) { + if (parseErr > 0) { // unrecoverable parsing error + server.logicError("malformed CONNECT response from peer"); + return; + } + + // need more data + assert(!eof); + assert(!parseErr); + + if (!connectRespBuf->hasSpace()) { + server.logicError("huge CONNECT response from peer"); + return; + } + + // keep reading + readConnectResponse(); + return; + } + + // CONNECT response was successfully parsed + *status_ptr = rep.sline.status(); + + // bail if we did not get an HTTP 200 (Connection Established) response + if (rep.sline.status() != Http::scOkay) { + server.logicError("unsupported CONNECT response status code"); + return; + } + + if (rep.hdr_sz < connectRespBuf->contentSize()) { + // preserve bytes that the server already sent after the CONNECT response + server.len = connectRespBuf->contentSize() - rep.hdr_sz; + memcpy(server.buf, connectRespBuf->content()+rep.hdr_sz, server.len); + } else { + // reset; delay pools were using this field to throttle CONNECT response + server.len = 0; + } + + delete connectRespBuf; + connectRespBuf = NULL; + connectExchangeCheckpoint(); +} + +void +TunnelStateData::Connection::logicError(const char *errMsg) +{ + debugs(50, 3, conn << " closing on error: " << errMsg); + conn->close(); } void @@ -325,11 +456,14 @@ TunnelStateData::readClient(char *buf, size_t len, comm_err_t errcode, int xerrn kb_incr(&(statCounter.client_http.kbytes_in), len); } - copy (len, errcode, xerrno, client, server, WriteServerDone); + if (keepGoingAfterRead(len, errcode, xerrno, client, server)) + copy(len, client, server, WriteServerDone); } -void -TunnelStateData::copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, IOCB *completion) +/// Updates state after reading from client or server. +/// Returns whether the caller should use the data just read. +bool +TunnelStateData::keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to) { debugs(26, 3, HERE << "from={" << from.conn << "}, to={" << to.conn << "}"); @@ -365,11 +499,19 @@ TunnelStateData::copy (size_t len, comm_err_t errcode, int xerrno, Connection &f to.conn->close(); } } else if (cbdataReferenceValid(this)) { + return true; + } + + return false; +} + +void +TunnelStateData::copy(size_t len, Connection &from, Connection &to, IOCB *completion) +{ debugs(26, 3, HERE << "Schedule Write"); AsyncCall::Pointer call = commCbCall(5,5, "TunnelBlindCopyWriteHandler", CommIoCbPtrFun(completion, this)); Comm::Write(to.conn, from.buf, len, call, NULL); - } } /* Writes data from the client buffer to the server side */ @@ -509,6 +651,17 @@ TunnelStateData::copyRead(Connection &from, IOCB *completion) comm_read(from.conn, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), call); } +void +TunnelStateData::readConnectResponse() +{ + assert(waitingForConnectResponse()); + + AsyncCall::Pointer call = commCbCall(5,4, "readConnectResponseDone", + CommIoCbPtrFun(ReadConnectResponseDone, this)); + comm_read(server.conn, connectRespBuf->space(), + server.bytesWanted(1, connectRespBuf->spaceSize()), call); +} + /** * Set the HTTP status for this request and sets the read handlers for client * and server side connections. @@ -516,9 +669,13 @@ TunnelStateData::copyRead(Connection &from, IOCB *completion) static void tunnelStartShoveling(TunnelStateData *tunnelState) { + assert(!tunnelState->waitingForConnectExchange()); *tunnelState->status_ptr = Http::scOkay; if (cbdataReferenceValid(tunnelState)) { + if (!tunnelState->server.len) tunnelState->copyRead(tunnelState->server, TunnelStateData::ReadServer); + else + tunnelState->copy(tunnelState->server.len, tunnelState->server, tunnelState->client, TunnelStateData::WriteClientDone); tunnelState->copyRead(tunnelState->client, TunnelStateData::ReadClient); } } @@ -543,6 +700,38 @@ tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t tunnelStartShoveling(tunnelState); } +/// Called when we are done writing CONNECT request to a peer. +static void +tunnelConnectReqWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t size, comm_err_t flag, int xerrno, void *data) +{ + TunnelStateData *tunnelState = (TunnelStateData *)data; + debugs(26, 3, conn << ", flag=" << flag); + assert(tunnelState->waitingForConnectRequest()); + + if (flag != COMM_OK) { + *tunnelState->status_ptr = Http::scInternalServerError; + tunnelErrorComplete(conn->fd, data, 0); + return; + } + + tunnelState->connectReqWriting = false; + tunnelState->connectExchangeCheckpoint(); +} + +void +TunnelStateData::connectExchangeCheckpoint() +{ + if (waitingForConnectResponse()) { + debugs(26, 5, "still reading CONNECT response on " << server.conn); + } else if (waitingForConnectRequest()) { + debugs(26, 5, "still writing CONNECT request on " << server.conn); + } else { + assert(!waitingForConnectExchange()); + debugs(26, 3, "done with CONNECT exchange on " << server.conn); + tunnelConnected(server.conn, this); + } +} + /* * handle the write completion from a proxy request to an upstream origin */ @@ -552,7 +741,7 @@ tunnelConnected(const Comm::ConnectionPointer &server, void *data) TunnelStateData *tunnelState = (TunnelStateData *)data; debugs(26, 3, HERE << server << ", tunnelState=" << tunnelState); - if (tunnelState->request != NULL && (tunnelState->request->flags.interceptTproxy || tunnelState->request->flags.intercepted)) + if (!tunnelState->clientExpectsConnectResponse()) tunnelStartShoveling(tunnelState); // ssl-bumped connection, be quiet else { AsyncCall::Pointer call = commCbCall(5,5, "tunnelConnectedWriteDone", @@ -718,6 +907,7 @@ static void tunnelRelayConnectRequest(const Comm::ConnectionPointer &srv, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; + assert(!tunnelState->waitingForConnectExchange()); HttpHeader hdr_out(hoRequest); Packer p; HttpStateFlags flags; @@ -738,9 +928,29 @@ tunnelRelayConnectRequest(const Comm::ConnectionPointer &srv, void *data) packerClean(&p); mb.append("\r\n", 2); + if (tunnelState->clientExpectsConnectResponse()) { + // hack: blindly tunnel peer response (to our CONNECT request) to the client as ours. AsyncCall::Pointer writeCall = commCbCall(5,5, "tunnelConnectedWriteDone", CommIoCbPtrFun(tunnelConnectedWriteDone, tunnelState)); Comm::Write(srv, &mb, writeCall); + } else { + // we have to eat the connect response from the peer (so that the client + // does not see it) and only then start shoveling data to the client + AsyncCall::Pointer writeCall = commCbCall(5,5, "tunnelConnectReqWriteDone", + CommIoCbPtrFun(tunnelConnectReqWriteDone, + tunnelState)); + Comm::Write(srv, &mb, writeCall); + tunnelState->connectReqWriting = true; + + tunnelState->connectRespBuf = new MemBuf; + // SQUID_TCP_SO_RCVBUF: we should not accumulate more than regular I/O buffer + // can hold since any CONNECT response leftovers have to fit into server.buf. + // 2*SQUID_TCP_SO_RCVBUF: HttpMsg::parse() zero-terminates, which uses space. + tunnelState->connectRespBuf->init(SQUID_TCP_SO_RCVBUF, 2*SQUID_TCP_SO_RCVBUF); + tunnelState->readConnectResponse(); + + assert(tunnelState->waitingForConnectExchange()); + } AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", CommTimeoutCbPtrFun(tunnelTimeout, tunnelState)); -- 2.47.2