state->d_currentPos = 0;
state->d_currentResponse = std::move(response);
- auto iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
- if (iostate == IOState::Done) {
- DEBUGLOG("response sent");
- if (!handleResponseSent(state, now)) {
- return IOState::Done;
+ try {
+ auto iostate = state->d_handler.tryWrite(state->d_currentResponse.d_buffer, state->d_currentPos, state->d_currentResponse.d_buffer.size());
+ if (iostate == IOState::Done) {
+ DEBUGLOG("response sent");
+ if (!handleResponseSent(state, now)) {
+ return IOState::Done;
+ }
+ return sendQueuedResponses(state, now);
+ } else {
+ return IOState::NeedWrite;
+ DEBUGLOG("partial write");
}
- return sendQueuedResponses(state, now);
- } else {
- return IOState::NeedWrite;
- DEBUGLOG("partial write");
+ }
+ catch (const std::exception& e) {
+ vinfolog("Closing TCP client connection with %s: %s", state->d_ci.remote.toStringWithPort(), e.what());
+ DEBUGLOG("Closing TCP client connection: "<<e.what());
+ ++state->d_ci.cs->tcpDiedSendingResponse;
+
+ state->terminateClientConnection();
+
+ return IOState::Done;
}
}
+void IncomingTCPConnectionState::terminateClientConnection()
+{
+ d_queuedResponses.clear();
+ /* we have already released idle connections that could be reused,
+ we don't care about the ones still waiting for responses */
+ d_activeConnectionsToBackend.clear();
+ /* meaning we will no longer be 'active' when the backend
+ response or timeout comes in */
+ d_ioState->reset();
+ d_handler.close();
+}
+
/* called when handling a response or error coming from a backend */
void IncomingTCPConnectionState::sendOrQueueResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response)
{
for (auto it = list.begin(); it != list.end(); ++it) {
if (*it == response.d_connection) {
- response.d_connection->release();
- DownstreamConnectionsManager::releaseDownstreamConnection(std::move(*it));
+ try {
+ response.d_connection->release();
+ DownstreamConnectionsManager::releaseDownstreamConnection(std::move(*it));
+ }
+ catch (const std::exception& e) {
+ vinfolog("Error releasing connection: %s", e.what());
+ }
list.erase(it);
break;
}
void IncomingTCPConnectionState::handleIOCallback(int fd, FDMultiplexer::funcparam_t& param)
{
auto conn = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(param);
- if (fd != conn->d_ci.fd) {
- throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->d_ci.fd));
+ if (fd != conn->d_handler.getDescriptor()) {
+ throw std::runtime_error("Unexpected socket descriptor " + std::to_string(fd) + " received in " + std::string(__PRETTY_FUNCTION__) + ", expected " + std::to_string(conn->d_handler.getDescriptor()));
}
struct timeval now;
TCPResponse resp = std::move(state->d_queuedResponses.front());
state->d_queuedResponses.pop_front();
state->d_state = IncomingTCPConnectionState::State::idle;
- sendOrQueueResponse(state, now, std::move(resp));
+ try {
+ sendOrQueueResponse(state, now, std::move(resp));
+ }
+ catch (const std::exception& e) {
+ vinfolog("exception in notifyIOError: %s", e.what());
+ }
}
else {
// the backend code already tried to reconnect if it was possible
for (const auto& cbData : expiredReadConns) {
if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
- if (cbData.first == state->d_ci.fd) {
+ if (cbData.first == state->d_handler.getDescriptor()) {
vinfolog("Timeout (read) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
state->handleTimeout(state, false);
}
for (const auto& cbData : expiredWriteConns) {
if (cbData.second.type() == typeid(std::shared_ptr<IncomingTCPConnectionState>)) {
auto state = boost::any_cast<std::shared_ptr<IncomingTCPConnectionState>>(cbData.second);
- if (cbData.first == state->d_ci.fd) {
+ if (cbData.first == state->d_handler.getDescriptor()) {
vinfolog("Timeout (write) from remote TCP client %s", state->d_ci.remote.toStringWithPort());
state->handleTimeout(state, true);
}
void TCPConnectionToBackend::release()
{
+ if (!d_usedForXFR) {
+ d_ds->outstanding -= d_pendingResponses.size();
+ }
+
+ d_pendingResponses.clear();
+ d_pendingQueries.clear();
+
d_clientConn.reset();
if (d_ioState) {
d_ioState.reset();
}
catch (const std::exception& e) {
vinfolog("Got an exception while handling TCP response from %s (client is %s): %s", conn->d_ds ? conn->d_ds->getName() : "unknown", conn->d_currentQuery.d_idstate.origRemote.toStringWithPort(), e.what());
+ ioGuard.release();
+ conn->release();
+ return;
}
}
}
++d_ds->tcpReadTimeouts;
}
- if (d_ioState) {
- d_ioState->reset();
+ try {
+ notifyAllQueriesFailed(now, FailureReason::timeout);
+ }
+ catch (const std::exception& e) {
+ vinfolog("Got an exception while notifying a timeout: %s", e.what());
+ }
+ catch (...) {
+ vinfolog("Got exception while notifying a timeout");
}
- notifyAllQueriesFailed(now, FailureReason::timeout);
+ release();
}
void TCPConnectionToBackend::notifyAllQueriesFailed(const struct timeval& now, FailureReason reason)
{
d_connectionDied = true;
- if (!d_usedForXFR) {
- d_ds->outstanding -= d_pendingResponses.size();
- }
-
auto& clientConn = d_clientConn;
if (!clientConn->active()) {
// a client timeout occured, or something like that */
++clientConn->d_ci.cs->tcpGaveUp;
}
- if (d_state == State::sendingQueryToBackend) {
- clientConn->notifyIOError(clientConn, std::move(d_currentQuery.d_idstate), now);
- }
+ try {
+ if (d_state == State::sendingQueryToBackend) {
+ clientConn->notifyIOError(clientConn, std::move(d_currentQuery.d_idstate), now);
+ }
- for (auto& query : d_pendingQueries) {
- clientConn->notifyIOError(clientConn, std::move(query.d_idstate), now);
- }
+ for (auto& query : d_pendingQueries) {
+ clientConn->notifyIOError(clientConn, std::move(query.d_idstate), now);
+ }
- for (auto& response : d_pendingResponses) {
- clientConn->notifyIOError(clientConn, std::move(response.second.d_idstate), now);
+ for (auto& response : d_pendingResponses) {
+ clientConn->notifyIOError(clientConn, std::move(response.second.d_idstate), now);
+ }
+ }
+ catch (const std::exception& e) {
+ vinfolog("Got an exception while notifying: %s", e.what());
+ }
+ catch (...) {
+ vinfolog("Got exception while notifying");
}
- d_pendingQueries.clear();
- d_pendingResponses.clear();
-
- d_clientConn.reset();
+ release();
}
IOState TCPConnectionToBackend::handleResponse(std::shared_ptr<TCPConnectionToBackend>& conn, const struct timeval& now)
if (!clientConn || !clientConn->active()) {
// a client timeout occured, or something like that */
d_connectionDied = true;
- d_clientConn.reset();
- if (!conn->d_usedForXFR) {
- --conn->d_ds->outstanding;
- }
+ release();
return IOState::Done;
}
close(fd);
fd = -1;
}
+
if (cs) {
--cs->tcpCurrentConnections;
}
if (getsockname(d_ci.fd, reinterpret_cast<sockaddr*>(&d_origDest), &socklen)) {
d_origDest = d_ci.cs->local;
}
+ /* belongs to the handler now */
+ d_ci.fd = -1;
d_proxiedDestination = d_origDest;
d_proxiedRemote = d_ci.remote;
}
static void handleXFRResponse(std::shared_ptr<IncomingTCPConnectionState>& state, const struct timeval& now, TCPResponse&& response);
static void handleTimeout(std::shared_ptr<IncomingTCPConnectionState>& state, bool write);
+ void terminateClientConnection();
void queueQuery(TCPQuery&& query);
bool canAcceptNewQueries() const;
std::string toString() const
{
ostringstream o;
- o << "Incoming TCP connection from "<<d_ci.remote.toStringWithPort()<<" over FD "<<d_ci.fd<<", state is "<<(int)d_state<<", io state is "<<(d_ioState ? std::to_string((int)d_ioState->getState()) : "empty")<<", queries count is "<<d_queriesCount<<", current queries count is "<<d_currentQueriesCount<<", "<<d_queuedResponses.size()<<" queued responses, "<<d_activeConnectionsToBackend.size()<<" active connections to a backend";
+ o << "Incoming TCP connection from "<<d_ci.remote.toStringWithPort()<<" over FD "<<d_handler.getDescriptor()<<", state is "<<(int)d_state<<", io state is "<<(d_ioState ? std::to_string((int)d_ioState->getState()) : "empty")<<", queries count is "<<d_queriesCount<<", current queries count is "<<d_currentQueriesCount<<", "<<d_queuedResponses.size()<<" queued responses, "<<d_activeConnectionsToBackend.size()<<" active connections to a backend";
return o.str();
}
uint16_t counter = 0;
Socket sock(dest.sin4.sin_family, SOCK_STREAM);
SConnectWithTimeout(sock.getHandle(), dest, timeout);
- TCPIOHandler handler(subjectName, sock.getHandle(), timeout, tlsCtx, time(nullptr));
+ TCPIOHandler handler(subjectName, sock.releaseHandle(), timeout, tlsCtx, time(nullptr));
handler.connect(fastOpen, dest, timeout);
// we are writing the proxyheader inside the TLS connection. Is that right?
if (proxyheader.size() > 0 && handler.write(proxyheader.data(), proxyheader.size(), timeout) != proxyheader.size()) {
{
return d_socket;
}
-
+
+ int releaseHandle()
+ {
+ int ret = d_socket;
+ d_socket = -1;
+ return ret;
+ }
+
private:
static const size_t s_buflen{4096};
std::string d_buffer;
close();
}
- /* Prepare the connection but does not close the descriptor */
void close()
{
if (d_conn) {
d_conn->close();
d_conn.reset();
}
- else if (d_socket != -1) {
+
+ if (d_socket != -1) {
shutdown(d_socket, SHUT_RDWR);
+ ::close(d_socket);
d_socket = -1;
}
}
+ int getDescriptor() const
+ {
+ return d_socket;
+ }
+
IOState tryConnect(bool fastOpen, const ComboAddress& remote)
{
/* yes, this is only the TLS connect not the socket one,