static void WriteClientDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data);
static void WriteServerDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data);
+ /// Starts reading peer response to our CONNECT request.
+ void readConnectResponse();
+
+ /// Called when we may be done handling a CONNECT exchange with the peer.
+ void connectExchangeCheckpoint();
+
bool noConnections() const;
char *url;
HttpRequest::Pointer request;
return (server.conn != NULL && server.conn->getPeer() ? server.conn->getPeer()->host : request->GetHost());
};
+ /// Whether we are writing a CONNECT request to a peer.
+ bool waitingForConnectRequest() const { return connectReqWriting; }
+ /// Whether we are reading a CONNECT response from a peer.
+ bool waitingForConnectResponse() const { return connectRespBuf; }
+ /// Whether we are waiting for the CONNECT request/response exchange with the peer.
+ bool waitingForConnectExchange() const { return waitingForConnectRequest() || waitingForConnectResponse(); }
+
+ /// Whether the client sent a CONNECT request to us.
+ bool clientExpectsConnectResponse() const { return !(request != NULL &&
+ (request->flags.interceptTproxy || request->flags.intercepted)); }
+
class Connection
{
void error(int const xerrno);
int debugLevelForError(int const xerrno) const;
+ /// handles a non-I/O error associated with this Connection
+ void logicError(const char *errMsg);
void closeIfOpen();
void dataSent (size_t amount);
int len;
Connection client, server;
int *status_ptr; /* pointer to status for logging */
+ MemBuf *connectRespBuf; ///< accumulates peer CONNECT response when we need it
+ bool connectReqWriting; ///< whether we are writing a CONNECT request to a peer
+
void copyRead(Connection &from, IOCB *completion);
private:
CBDATA_CLASS2(TunnelStateData);
- void copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, IOCB *);
+ bool keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to);
+ void copy(size_t len, Connection &from, Connection &to, IOCB *);
+ void handleConnectResponse(const size_t chunkSize);
void readServer(char *buf, size_t len, comm_err_t errcode, int xerrno);
void readClient(char *buf, size_t len, comm_err_t errcode, int xerrno);
void writeClientDone(char *buf, size_t len, comm_err_t flag, int xerrno);
void writeServerDone(char *buf, size_t len, comm_err_t flag, int xerrno);
+
+ static void ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data);
+ void readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, int xerrno);
};
static const char *const conn_established = "HTTP/1.1 200 Connection established\r\n\r\n";
TunnelStateData::TunnelStateData() :
url(NULL),
request(NULL),
- status_ptr(NULL)
+ status_ptr(NULL),
+ connectRespBuf(NULL),
+ connectReqWriting(false)
{
debugs(26, 3, "TunnelStateData constructed this=" << this);
}
assert(noConnections());
xfree(url);
serverDestinations.clean();
+ delete connectRespBuf;
}
TunnelStateData::Connection::~Connection()
kb_incr(&(statCounter.server.other.kbytes_in), len);
}
- copy (len, errcode, xerrno, server, client, WriteClientDone);
+ if (keepGoingAfterRead(len, errcode, xerrno, server, client))
+ copy(len, server, client, WriteClientDone);
+}
+
+/// Called when we read [a part of] CONNECT response from the peer
+void
+TunnelStateData::readConnectResponseDone(char *buf, size_t len, comm_err_t errcode, int xerrno)
+{
+ debugs(26, 3, server.conn << ", read " << len << " bytes, err=" << errcode);
+ assert(waitingForConnectResponse());
+
+ if (errcode == COMM_ERR_CLOSING)
+ return;
+
+ if (len > 0) {
+ connectRespBuf->appended(len);
+ server.bytesIn(len);
+ kb_incr(&(statCounter.server.all.kbytes_in), len);
+ kb_incr(&(statCounter.server.other.kbytes_in), len);
+ }
+
+ if (keepGoingAfterRead(len, errcode, xerrno, server, client))
+ handleConnectResponse(len);
+}
+
+/* Read from client side and queue it for writing to the server */
+void
+TunnelStateData::ReadConnectResponseDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data)
+{
+ TunnelStateData *tunnelState = (TunnelStateData *)data;
+ assert (cbdataReferenceValid (tunnelState));
+
+ tunnelState->readConnectResponseDone(buf, len, errcode, xerrno);
+}
+
+/// Parses [possibly incomplete] CONNECT response and reacts to it.
+/// If the tunnel is being closed or more response data is needed, returns false.
+/// Otherwise, the caller should handle the remaining read data, if any.
+void
+TunnelStateData::handleConnectResponse(const size_t chunkSize)
+{
+ assert(waitingForConnectResponse());
+
+ // Ideally, client and server should use MemBuf or better, but current code
+ // never accumulates more than one read when shoveling data (XXX) so it does
+ // not need to deal with MemBuf complexity. To keep it simple, we use a
+ // dedicated MemBuf for accumulating CONNECT responses. TODO: When shoveling
+ // is optimized, reuse server.buf for CONNEC response accumulation instead.
+
+ /* mimic the basic parts of HttpStateData::processReplyHeader() */
+ HttpReply rep;
+ Http::StatusCode parseErr = Http::scNone;
+ const bool eof = !chunkSize;
+ const bool parsed = rep.parse(connectRespBuf, eof, &parseErr);
+ if (!parsed) {
+ if (parseErr > 0) { // unrecoverable parsing error
+ server.logicError("malformed CONNECT response from peer");
+ return;
+ }
+
+ // need more data
+ assert(!eof);
+ assert(!parseErr);
+
+ if (!connectRespBuf->hasSpace()) {
+ server.logicError("huge CONNECT response from peer");
+ return;
+ }
+
+ // keep reading
+ readConnectResponse();
+ return;
+ }
+
+ // CONNECT response was successfully parsed
+ *status_ptr = rep.sline.status();
+
+ // bail if we did not get an HTTP 200 (Connection Established) response
+ if (rep.sline.status() != Http::scOkay) {
+ server.logicError("unsupported CONNECT response status code");
+ return;
+ }
+
+ if (rep.hdr_sz < connectRespBuf->contentSize()) {
+ // preserve bytes that the server already sent after the CONNECT response
+ server.len = connectRespBuf->contentSize() - rep.hdr_sz;
+ memcpy(server.buf, connectRespBuf->content()+rep.hdr_sz, server.len);
+ } else {
+ // reset; delay pools were using this field to throttle CONNECT response
+ server.len = 0;
+ }
+
+ delete connectRespBuf;
+ connectRespBuf = NULL;
+ connectExchangeCheckpoint();
+}
+
+void
+TunnelStateData::Connection::logicError(const char *errMsg)
+{
+ debugs(50, 3, conn << " closing on error: " << errMsg);
+ conn->close();
}
void
kb_incr(&(statCounter.client_http.kbytes_in), len);
}
- copy (len, errcode, xerrno, client, server, WriteServerDone);
+ if (keepGoingAfterRead(len, errcode, xerrno, client, server))
+ copy(len, client, server, WriteServerDone);
}
-void
-TunnelStateData::copy (size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to, IOCB *completion)
+/// Updates state after reading from client or server.
+/// Returns whether the caller should use the data just read.
+bool
+TunnelStateData::keepGoingAfterRead(size_t len, comm_err_t errcode, int xerrno, Connection &from, Connection &to)
{
debugs(26, 3, HERE << "from={" << from.conn << "}, to={" << to.conn << "}");
to.conn->close();
}
} else if (cbdataReferenceValid(this)) {
+ return true;
+ }
+
+ return false;
+}
+
+void
+TunnelStateData::copy(size_t len, Connection &from, Connection &to, IOCB *completion)
+{
debugs(26, 3, HERE << "Schedule Write");
AsyncCall::Pointer call = commCbCall(5,5, "TunnelBlindCopyWriteHandler",
CommIoCbPtrFun(completion, this));
Comm::Write(to.conn, from.buf, len, call, NULL);
- }
}
/* Writes data from the client buffer to the server side */
comm_read(from.conn, from.buf, from.bytesWanted(1, SQUID_TCP_SO_RCVBUF), call);
}
+void
+TunnelStateData::readConnectResponse()
+{
+ assert(waitingForConnectResponse());
+
+ AsyncCall::Pointer call = commCbCall(5,4, "readConnectResponseDone",
+ CommIoCbPtrFun(ReadConnectResponseDone, this));
+ comm_read(server.conn, connectRespBuf->space(),
+ server.bytesWanted(1, connectRespBuf->spaceSize()), call);
+}
+
/**
* Set the HTTP status for this request and sets the read handlers for client
* and server side connections.
static void
tunnelStartShoveling(TunnelStateData *tunnelState)
{
+ assert(!tunnelState->waitingForConnectExchange());
*tunnelState->status_ptr = Http::scOkay;
if (cbdataReferenceValid(tunnelState)) {
+ if (!tunnelState->server.len)
tunnelState->copyRead(tunnelState->server, TunnelStateData::ReadServer);
+ else
+ tunnelState->copy(tunnelState->server.len, tunnelState->server, tunnelState->client, TunnelStateData::WriteClientDone);
tunnelState->copyRead(tunnelState->client, TunnelStateData::ReadClient);
}
}
tunnelStartShoveling(tunnelState);
}
+/// Called when we are done writing CONNECT request to a peer.
+static void
+tunnelConnectReqWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t size, comm_err_t flag, int xerrno, void *data)
+{
+ TunnelStateData *tunnelState = (TunnelStateData *)data;
+ debugs(26, 3, conn << ", flag=" << flag);
+ assert(tunnelState->waitingForConnectRequest());
+
+ if (flag != COMM_OK) {
+ *tunnelState->status_ptr = Http::scInternalServerError;
+ tunnelErrorComplete(conn->fd, data, 0);
+ return;
+ }
+
+ tunnelState->connectReqWriting = false;
+ tunnelState->connectExchangeCheckpoint();
+}
+
+void
+TunnelStateData::connectExchangeCheckpoint()
+{
+ if (waitingForConnectResponse()) {
+ debugs(26, 5, "still reading CONNECT response on " << server.conn);
+ } else if (waitingForConnectRequest()) {
+ debugs(26, 5, "still writing CONNECT request on " << server.conn);
+ } else {
+ assert(!waitingForConnectExchange());
+ debugs(26, 3, "done with CONNECT exchange on " << server.conn);
+ tunnelConnected(server.conn, this);
+ }
+}
+
/*
* handle the write completion from a proxy request to an upstream origin
*/
TunnelStateData *tunnelState = (TunnelStateData *)data;
debugs(26, 3, HERE << server << ", tunnelState=" << tunnelState);
- if (tunnelState->request != NULL && (tunnelState->request->flags.interceptTproxy || tunnelState->request->flags.intercepted))
+ if (!tunnelState->clientExpectsConnectResponse())
tunnelStartShoveling(tunnelState); // ssl-bumped connection, be quiet
else {
AsyncCall::Pointer call = commCbCall(5,5, "tunnelConnectedWriteDone",
tunnelRelayConnectRequest(const Comm::ConnectionPointer &srv, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
+ assert(!tunnelState->waitingForConnectExchange());
HttpHeader hdr_out(hoRequest);
Packer p;
HttpStateFlags flags;
packerClean(&p);
mb.append("\r\n", 2);
+ if (tunnelState->clientExpectsConnectResponse()) {
+ // hack: blindly tunnel peer response (to our CONNECT request) to the client as ours.
AsyncCall::Pointer writeCall = commCbCall(5,5, "tunnelConnectedWriteDone",
CommIoCbPtrFun(tunnelConnectedWriteDone, tunnelState));
Comm::Write(srv, &mb, writeCall);
+ } else {
+ // we have to eat the connect response from the peer (so that the client
+ // does not see it) and only then start shoveling data to the client
+ AsyncCall::Pointer writeCall = commCbCall(5,5, "tunnelConnectReqWriteDone",
+ CommIoCbPtrFun(tunnelConnectReqWriteDone,
+ tunnelState));
+ Comm::Write(srv, &mb, writeCall);
+ tunnelState->connectReqWriting = true;
+
+ tunnelState->connectRespBuf = new MemBuf;
+ // SQUID_TCP_SO_RCVBUF: we should not accumulate more than regular I/O buffer
+ // can hold since any CONNECT response leftovers have to fit into server.buf.
+ // 2*SQUID_TCP_SO_RCVBUF: HttpMsg::parse() zero-terminates, which uses space.
+ tunnelState->connectRespBuf->init(SQUID_TCP_SO_RCVBUF, 2*SQUID_TCP_SO_RCVBUF);
+ tunnelState->readConnectResponse();
+
+ assert(tunnelState->waitingForConnectExchange());
+ }
AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
CommTimeoutCbPtrFun(tunnelTimeout, tunnelState));