]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Support forwarding intercepted but not bumped connections to cache_peers.
authorAlex Rousskov <rousskov@measurement-factory.com>
Mon, 10 Jun 2013 20:46:08 +0000 (14:46 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Mon, 10 Jun 2013 20:46:08 +0000 (14:46 -0600)
When talking to a cache_peer (i.e., sending a CONNECT request before tunneling
the transaction), tunnel code is using a clever hack: Squid does not parse
the CONNECT response from peer but blindly forwards it to the client. This
works great and simplifies code a lot, except when the client connection
was intercepted and, hence, the client did not send a CONNECT request and
is not expecting a CONNECT response.

In those situations, we now accumulate, parse, and strip the peer CONNECT
response (or close connection on errors).

The existing tunnel I/O code is too simple to accommodate that task -- it
cannot accumulate read data (its I/O buffers work in lockstep fashion, writing
everything it reads before reading again). Instead of rewriting the entire
tunnel code to use more complex buffers, I added a temporary accumulation
buffer for the CONNECT response. That buffer is not allocated unless it is
needed and does not grow beyond SQUID_TCP_SO_RCVBUF size, just like the
simple buffers.

src/tunnel.cc

index 93b9d727e230c338308039b4a807a28b319e79a0..60265e33a67480cd8654aed25a7a973424a88a97 100644 (file)
@@ -91,6 +91,12 @@ public:
     static void WriteClientDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data);
     static void WriteServerDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data);
 
+    /// Starts reading peer response to our CONNECT request.
+    void readConnectResponse();
+
+    /// Called when we may be done handling a CONNECT exchange with the peer.
+    void connectExchangeCheckpoint();
+
     bool noConnections() const;
     char *url;
     HttpRequest::Pointer request;
@@ -100,6 +106,17 @@ public:
         return (server.conn != NULL && server.conn->getPeer() ? server.conn->getPeer()->host : request->GetHost());
     };
 
+    /// Whether we are writing a CONNECT request to a peer.
+    bool waitingForConnectRequest() const { return connectReqWriting; }
+    /// Whether we are reading a CONNECT response from a peer.
+    bool waitingForConnectResponse() const { return connectRespBuf; }
+    /// Whether we are waiting for the CONNECT request/response exchange with the peer.
+    bool waitingForConnectExchange() const { return waitingForConnectRequest() || waitingForConnectResponse(); }
+
+    /// Whether the client sent a CONNECT request to us.
+    bool clientExpectsConnectResponse() const { return !(request != NULL &&
+        (request->flags.interceptTproxy || request->flags.intercepted)); }
+
     class Connection
     {
 
@@ -117,6 +134,8 @@ public:
 
         void error(int const xerrno);
         int debugLevelForError(int const xerrno) const;
+        /// handles a non-I/O error associated with this Connection
+        void logicError(const char *errMsg);
         void closeIfOpen();
         void dataSent (size_t amount);
         int len;
@@ -135,15 +154,23 @@ public:
 
     Connection client, server;
     int *status_ptr;           /* pointer to status for logging */
+    MemBuf *connectRespBuf; ///< accumulates peer CONNECT response when we need it
+    bool connectReqWriting; ///< whether we are writing a CONNECT request to a peer
+
     void copyRead(Connection &from, IOCB *completion);
 
 private:
     CBDATA_CLASS2(TunnelStateData);
-    void copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, IOCB *);
+    bool keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to);
+    void copy(size_t len, Connection &from, Connection &to, IOCB *);
+    void handleConnectResponse(const size_t chunkSize);
     void readServer(char *buf, size_t len, comm_err_t errcode, int xerrno);
     void readClient(char *buf, size_t len, comm_err_t errcode, int xerrno);
     void writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno);
     void writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno);
+
+    static void ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data);
+    void readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, int xerrno);
 };
 
 static const char *const conn_established = "HTTP/1.1 200 Connection established\r\n\r\n";
@@ -196,7 +223,9 @@ tunnelClientClosed(const CommCloseCbParams &params)
 TunnelStateData::TunnelStateData() :
         url(NULL),
         request(NULL),
-        status_ptr(NULL)
+        status_ptr(NULL),
+        connectRespBuf(NULL),
+        connectReqWriting(false)
 {
     debugs(26, 3, "TunnelStateData constructed this=" << this);
 }
@@ -207,6 +236,7 @@ TunnelStateData::~TunnelStateData()
     assert(noConnections());
     xfree(url);
     serverDestinations.clean();
+    delete connectRespBuf;
 }
 
 TunnelStateData::Connection::~Connection()
@@ -282,7 +312,108 @@ TunnelStateData::readServer(char *buf, size_t len, comm_err_t errcode, int xerrn
         kb_incr(&(statCounter.server.other.kbytes_in), len);
     }
 
-    copy (len, errcode, xerrno, server, client, WriteClientDone);
+    if (keepGoingAfterRead(len, errcode, xerrno, server, client))
+        copy(len, server, client, WriteClientDone);
+}
+
+/// Called when we read [a part of] CONNECT response from the peer
+void
+TunnelStateData::readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, int xerrno)
+{
+    debugs(26, 3, server.conn << ", read " << len << " bytes, err=" << errcode);
+    assert(waitingForConnectResponse());
+
+    if (errcode == COMM_ERR_CLOSING)
+        return;
+
+    if (len > 0) {
+        connectRespBuf->appended(len);
+        server.bytesIn(len);
+        kb_incr(&(statCounter.server.all.kbytes_in), len);
+        kb_incr(&(statCounter.server.other.kbytes_in), len);
+    }
+
+    if (keepGoingAfterRead(len, errcode, xerrno, server, client))
+        handleConnectResponse(len);
+}
+
+/* Read from client side and queue it for writing to the server */
+void
+TunnelStateData::ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data)
+{
+    TunnelStateData *tunnelState = (TunnelStateData *)data;
+    assert (cbdataReferenceValid (tunnelState));
+
+    tunnelState->readConnectResponseDone(buf, len, errcode, xerrno);
+}
+
+/// Parses [possibly incomplete] CONNECT response and reacts to it.
+/// If the tunnel is being closed or more response data is needed, returns false.
+/// Otherwise, the caller should handle the remaining read data, if any.
+void
+TunnelStateData::handleConnectResponse(const size_t chunkSize)
+{
+    assert(waitingForConnectResponse());
+
+    // Ideally, client and server should use MemBuf or better, but current code
+    // never accumulates more than one read when shoveling data (XXX) so it does
+    // not need to deal with MemBuf complexity. To keep it simple, we use a 
+    // dedicated MemBuf for accumulating CONNECT responses. TODO: When shoveling
+    // is optimized, reuse server.buf for CONNEC response accumulation instead.
+
+    /* mimic the basic parts of HttpStateData::processReplyHeader() */
+    HttpReply rep;
+    Http::StatusCode parseErr = Http::scNone;
+    const bool eof = !chunkSize;
+    const bool parsed = rep.parse(connectRespBuf, eof, &parseErr);
+    if (!parsed) {
+        if (parseErr > 0) { // unrecoverable parsing error
+            server.logicError("malformed CONNECT response from peer");
+            return;
+        }
+
+        // need more data
+        assert(!eof);
+        assert(!parseErr);
+
+        if (!connectRespBuf->hasSpace()) {
+            server.logicError("huge CONNECT response from peer");
+            return;
+        }
+
+        // keep reading
+        readConnectResponse();
+        return;
+    }
+
+    // CONNECT response was successfully parsed
+    *status_ptr = rep.sline.status();
+
+    // bail if we did not get an HTTP 200 (Connection Established) response
+    if (rep.sline.status() != Http::scOkay) {
+        server.logicError("unsupported CONNECT response status code");
+        return;
+    }
+
+    if (rep.hdr_sz < connectRespBuf->contentSize()) {
+        // preserve bytes that the server already sent after the CONNECT response
+        server.len = connectRespBuf->contentSize() - rep.hdr_sz;
+        memcpy(server.buf, connectRespBuf->content()+rep.hdr_sz, server.len);
+    } else {
+        // reset; delay pools were using this field to throttle CONNECT response
+        server.len = 0;
+    }
+
+    delete connectRespBuf;
+    connectRespBuf = NULL;
+    connectExchangeCheckpoint();
+}
+
+void
+TunnelStateData::Connection::logicError(const char *errMsg)
+{
+    debugs(50, 3, conn << " closing on error: " << errMsg);
+    conn->close();
 }
 
 void
@@ -325,11 +456,14 @@ TunnelStateData::readClient(char *buf, size_t len, comm_err_t errcode, int xerrn
         kb_incr(&(statCounter.client_http.kbytes_in), len);
     }
 
-    copy (len, errcode, xerrno, client, server, WriteServerDone);
+    if (keepGoingAfterRead(len, errcode, xerrno, client, server))
+        copy(len, client, server, WriteServerDone);
 }
 
-void
-TunnelStateData::copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, IOCB *completion)
+/// Updates state after reading from client or server.
+/// Returns whether the caller should use the data just read.
+bool
+TunnelStateData::keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to)
 {
     debugs(26, 3, HERE << "from={" << from.conn << "}, to={" << to.conn << "}");
 
@@ -365,11 +499,19 @@ TunnelStateData::copy (size_t len, comm_err_t errcode, int xerrno, Connection &f
             to.conn->close();
         }
     } else if (cbdataReferenceValid(this)) {
+        return true;
+    }
+
+    return false;
+}
+
+void
+TunnelStateData::copy(size_t len, Connection &from, Connection &to, IOCB *completion)
+{
         debugs(26, 3, HERE << "Schedule Write");
         AsyncCall::Pointer call = commCbCall(5,5, "TunnelBlindCopyWriteHandler",
                                              CommIoCbPtrFun(completion, this));
         Comm::Write(to.conn, from.buf, len, call, NULL);
-    }
 }
 
 /* Writes data from the client buffer to the server side */
@@ -509,6 +651,17 @@ TunnelStateData::copyRead(Connection &from, IOCB *completion)
     comm_read(from.conn, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), call);
 }
 
+void
+TunnelStateData::readConnectResponse()
+{
+    assert(waitingForConnectResponse());
+
+    AsyncCall::Pointer call = commCbCall(5,4, "readConnectResponseDone",
+                                         CommIoCbPtrFun(ReadConnectResponseDone, this));
+    comm_read(server.conn, connectRespBuf->space(),
+              server.bytesWanted(1, connectRespBuf->spaceSize()), call);
+}
+
 /**
  * Set the HTTP status for this request and sets the read handlers for client
  * and server side connections.
@@ -516,9 +669,13 @@ TunnelStateData::copyRead(Connection &from, IOCB *completion)
 static void
 tunnelStartShoveling(TunnelStateData *tunnelState)
 {
+    assert(!tunnelState->waitingForConnectExchange());
     *tunnelState->status_ptr = Http::scOkay;
     if (cbdataReferenceValid(tunnelState)) {
+        if (!tunnelState->server.len)
         tunnelState->copyRead(tunnelState->server, TunnelStateData::ReadServer);
+        else
+            tunnelState->copy(tunnelState->server.len, tunnelState->server, tunnelState->client, TunnelStateData::WriteClientDone);
         tunnelState->copyRead(tunnelState->client, TunnelStateData::ReadClient);
     }
 }
@@ -543,6 +700,38 @@ tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t
     tunnelStartShoveling(tunnelState);
 }
 
+/// Called when we are done writing CONNECT request to a peer.
+static void
+tunnelConnectReqWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t size, comm_err_t flag, int xerrno, void *data)
+{
+    TunnelStateData *tunnelState = (TunnelStateData *)data;
+    debugs(26, 3, conn << ", flag=" << flag);
+    assert(tunnelState->waitingForConnectRequest());
+
+    if (flag != COMM_OK) {
+        *tunnelState->status_ptr = Http::scInternalServerError;
+        tunnelErrorComplete(conn->fd, data, 0);
+        return;
+    }
+
+    tunnelState->connectReqWriting = false;
+    tunnelState->connectExchangeCheckpoint();
+}
+
+void
+TunnelStateData::connectExchangeCheckpoint()
+{
+    if (waitingForConnectResponse()) {
+        debugs(26, 5, "still reading CONNECT response on " << server.conn);
+    } else if (waitingForConnectRequest()) {
+        debugs(26, 5, "still writing CONNECT request on " << server.conn);
+    } else {
+        assert(!waitingForConnectExchange());
+        debugs(26, 3, "done with CONNECT exchange on " << server.conn);
+        tunnelConnected(server.conn, this);
+    }
+}
+
 /*
  * handle the write completion from a proxy request to an upstream origin
  */
@@ -552,7 +741,7 @@ tunnelConnected(const Comm::ConnectionPointer &server, void *data)
     TunnelStateData *tunnelState = (TunnelStateData *)data;
     debugs(26, 3, HERE << server << ", tunnelState=" << tunnelState);
 
-    if (tunnelState->request != NULL && (tunnelState->request->flags.interceptTproxy || tunnelState->request->flags.intercepted))
+    if (!tunnelState->clientExpectsConnectResponse())
         tunnelStartShoveling(tunnelState); // ssl-bumped connection, be quiet
     else {
         AsyncCall::Pointer call = commCbCall(5,5, "tunnelConnectedWriteDone",
@@ -718,6 +907,7 @@ static void
 tunnelRelayConnectRequest(const Comm::ConnectionPointer &srv, void *data)
 {
     TunnelStateData *tunnelState = (TunnelStateData *)data;
+    assert(!tunnelState->waitingForConnectExchange());
     HttpHeader hdr_out(hoRequest);
     Packer p;
     HttpStateFlags flags;
@@ -738,9 +928,29 @@ tunnelRelayConnectRequest(const Comm::ConnectionPointer &srv, void *data)
     packerClean(&p);
     mb.append("\r\n", 2);
 
+    if (tunnelState->clientExpectsConnectResponse()) {
+    // hack: blindly tunnel peer response (to our CONNECT request) to the client as ours.
     AsyncCall::Pointer writeCall = commCbCall(5,5, "tunnelConnectedWriteDone",
                                    CommIoCbPtrFun(tunnelConnectedWriteDone, tunnelState));
     Comm::Write(srv, &mb, writeCall);
+    } else {
+        // we have to eat the connect response from the peer (so that the client
+        // does not see it) and only then start shoveling data to the client
+        AsyncCall::Pointer writeCall = commCbCall(5,5, "tunnelConnectReqWriteDone",
+                                       CommIoCbPtrFun(tunnelConnectReqWriteDone,
+                                       tunnelState));
+        Comm::Write(srv, &mb, writeCall);
+        tunnelState->connectReqWriting = true;
+
+        tunnelState->connectRespBuf = new MemBuf;
+        // SQUID_TCP_SO_RCVBUF: we should not accumulate more than regular I/O buffer
+        // can hold since any CONNECT response leftovers have to fit into server.buf.
+        // 2*SQUID_TCP_SO_RCVBUF: HttpMsg::parse() zero-terminates, which uses space.
+        tunnelState->connectRespBuf->init(SQUID_TCP_SO_RCVBUF, 2*SQUID_TCP_SO_RCVBUF);
+        tunnelState->readConnectResponse();
+
+        assert(tunnelState->waitingForConnectExchange());
+    }
 
     AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
                                      CommTimeoutCbPtrFun(tunnelTimeout, tunnelState));