From f7a92791b826478b75225cae5520621968b190ec Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Thu, 11 Feb 2021 19:02:03 +0100 Subject: [PATCH] dnsdist: Clean up the Downstream TCP code by using a TCPIOHandler --- pdns/dnsdist.hh | 1 + pdns/dnsdistdist/dnsdist-tcp-downstream.cc | 126 ++++++--------------- pdns/dnsdistdist/dnsdist-tcp-downstream.hh | 10 +- pdns/sdig.cc | 1 - pdns/tcpiohandler.hh | 71 ++++++++++-- 5 files changed, 106 insertions(+), 103 deletions(-) diff --git a/pdns/dnsdist.hh b/pdns/dnsdist.hh index 9e7634fc74..0c00d2c831 100644 --- a/pdns/dnsdist.hh +++ b/pdns/dnsdist.hh @@ -879,6 +879,7 @@ struct DownstreamState std::mutex socketsLock; std::mutex connectLock; std::unique_ptr mplexer{nullptr}; + std::shared_ptr d_tlsCtx{nullptr}; std::thread tid; const ComboAddress remote; QPSLimiter qps; diff --git a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc index 645abbd299..e93d6f6a34 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-downstream.cc +++ b/pdns/dnsdistdist/dnsdist-tcp-downstream.cc @@ -16,7 +16,7 @@ void TCPConnectionToBackend::assignToClientConnection(std::shared_ptr(clientConn->getIOMPlexer(), d_socket->getHandle()); + d_ioState = make_unique(clientConn->getIOMPlexer(), d_handler->getDescriptor()); } else if (d_clientConn != clientConn) { throw std::runtime_error("Assigning a query from a different client to an existing backend connection with pending queries"); @@ -48,88 +48,42 @@ IOState TCPConnectionToBackend::queueNextQuery(std::shared_ptr& conn, const struct timeval& now) { - if (buffer.size() < (pos + toRead)) { - throw std::out_of_range("Calling tryRead() with a too small buffer (" + std::to_string(buffer.size()) + ") for a read of " + std::to_string(toRead) + " bytes starting at " + std::to_string(pos)); - } + DEBUGLOG("sending query to backend "<getDS()->getName()<<" over FD "<d_handler->getDescriptor()); - size_t got = 0; - do { - ssize_t res = ::read(fd, reinterpret_cast(&buffer.at(pos)), toRead - got); - if (res == 0) { - throw runtime_error("EOF while reading message"); - } - if (res < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOTCONN) { - return IOState::NeedRead; - } - else { - throw std::runtime_error(std::string("Error while reading message: ") + stringerror()); - } - } +#warning FIXME: TODO: this drops 1/ source selection other than SO_BINDTODEVICE, perhaps we should look into IP_SENDIF? + IOState state = conn->d_handler->tryWrite(conn->d_currentQuery.d_buffer, conn->d_currentPos, conn->d_currentQuery.d_buffer.size()); - pos += static_cast(res); - got += static_cast(res); + if (state != IOState::Done) { + return state; } - while (got < toRead); - - return IOState::Done; -} -IOState TCPConnectionToBackend::sendQuery(std::shared_ptr& conn, const struct timeval& now) -{ - int fd = conn->d_socket->getHandle(); - DEBUGLOG("sending query to backend "<getDS()->getName()<<" over FD "<isFastOpenEnabled()) { - socketFlags |= MSG_FASTOPEN; - } -#endif /* MSG_FASTOPEN */ - - size_t sent = sendMsgWithOptions(fd, reinterpret_cast(&conn->d_currentQuery.d_buffer.at(conn->d_currentPos)), conn->d_currentQuery.d_buffer.size() - conn->d_currentPos, &conn->d_ds->remote, &conn->d_ds->sourceAddr, conn->d_ds->sourceItf, socketFlags); - if (sent == conn->d_currentQuery.d_buffer.size()) { - DEBUGLOG("query sent to backend"); - /* request sent ! */ - conn->incQueries(); - conn->d_currentPos = 0; - - DEBUGLOG("adding a pending response for ID "<d_currentQuery.d_idstate.origID<<" and QNAME "<d_currentQuery.d_idstate.qname); - conn->d_pendingResponses[conn->d_currentQuery.d_idstate.origID] = std::move(conn->d_currentQuery); - conn->d_currentQuery.d_buffer.clear(); + DEBUGLOG("query sent to backend"); + /* request sent ! */ + conn->incQueries(); + conn->d_currentPos = 0; - if (!conn->d_usedForXFR) { - ++conn->d_ds->outstanding; - } + DEBUGLOG("adding a pending response for ID "<d_currentQuery.d_idstate.origID<<" and QNAME "<d_currentQuery.d_idstate.qname); + conn->d_pendingResponses[conn->d_currentQuery.d_idstate.origID] = std::move(conn->d_currentQuery); + conn->d_currentQuery.d_buffer.clear(); - return IOState::Done; - } - else { - conn->d_currentPos += sent; - /* disable fast open on partial write */ - conn->disableFastOpen(); - return IOState::NeedWrite; + if (!conn->d_usedForXFR) { + ++conn->d_ds->outstanding; } + + return state; } void TCPConnectionToBackend::handleIO(std::shared_ptr& conn, const struct timeval& now) { - if (conn->d_socket == nullptr) { + if (conn->d_handler == nullptr) { throw std::runtime_error("No downstream socket in " + std::string(__PRETTY_FUNCTION__) + "!"); } bool connectionDied = false; IOState iostate = IOState::Done; IOStateGuard ioGuard(conn->d_ioState); - int fd = conn->d_socket->getHandle(); try { if (conn->d_state == State::sendingQueryToBackend) { @@ -153,9 +107,8 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c // then we need to allocate a new buffer (new because we might need to re-send the query if the // backend dies on us) // We also might need to read and send to the client more than one response in case of XFR (yeah!) - // should very likely be a TCPIOHandler conn->d_responseBuffer.resize(sizeof(uint16_t)); - iostate = tryRead(fd, conn->d_responseBuffer, conn->d_currentPos, sizeof(uint16_t) - conn->d_currentPos); + iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, sizeof(uint16_t) - conn->d_currentPos); if (iostate == IOState::Done) { DEBUGLOG("got response size from backend"); conn->d_state = State::readingResponseFromBackend; @@ -168,7 +121,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c if (conn->d_state == State::readingResponseFromBackend) { DEBUGLOG("reading response from backend"); - iostate = tryRead(fd, conn->d_responseBuffer, conn->d_currentPos, conn->d_responseSize - conn->d_currentPos); + iostate = conn->d_handler->tryRead(conn->d_responseBuffer, conn->d_currentPos, conn->d_responseSize - conn->d_currentPos); if (iostate == IOState::Done) { DEBUGLOG("got response from backend"); try { @@ -225,7 +178,7 @@ void TCPConnectionToBackend::handleIO(std::shared_ptr& c try { if (conn->reconnect()) { - conn->d_ioState = make_unique(conn->d_clientConn->getIOMPlexer(), conn->d_socket->getHandle()); + conn->d_ioState = make_unique(conn->d_clientConn->getIOMPlexer(), conn->d_handler->getDescriptor()); /* we need to resend the queries that were in flight, if any */ for (auto& pending : conn->d_pendingResponses) { @@ -328,12 +281,9 @@ void TCPConnectionToBackend::queueQuery(TCPQuery&& query, std::shared_ptr result; - - if (d_socket) { - DEBUGLOG("closing socket "<getHandle()); - shutdown(d_socket->getHandle(), SHUT_RDWR); - d_socket.reset(); + if (d_handler) { + DEBUGLOG("closing socket "<getDescriptor()); + d_handler->close(); d_ioState.reset(); --d_ds->tcpCurrentConnections; } @@ -344,36 +294,32 @@ bool TCPConnectionToBackend::reconnect() vinfolog("TCP connecting to downstream %s (%d)", d_ds->getNameWithAddr(), d_downstreamFailures); DEBUGLOG("Opening TCP connection to backend "<getNameWithAddr()); try { - result = std::unique_ptr(new Socket(d_ds->remote.sin4.sin_family, SOCK_STREAM, 0)); - DEBUGLOG("result of connect is "<getHandle()); + auto socket = std::make_unique(d_ds->remote.sin4.sin_family, SOCK_STREAM, 0); + DEBUGLOG("result of socket() is "<getHandle()); if (!IsAnyAddress(d_ds->sourceAddr)) { - SSetsockopt(result->getHandle(), SOL_SOCKET, SO_REUSEADDR, 1); + SSetsockopt(socket->getHandle(), SOL_SOCKET, SO_REUSEADDR, 1); #ifdef IP_BIND_ADDRESS_NO_PORT if (d_ds->ipBindAddrNoPort) { - SSetsockopt(result->getHandle(), SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1); + SSetsockopt(socket->getHandle(), SOL_IP, IP_BIND_ADDRESS_NO_PORT, 1); } #endif #ifdef SO_BINDTODEVICE if (!d_ds->sourceItfName.empty()) { - int res = setsockopt(result->getHandle(), SOL_SOCKET, SO_BINDTODEVICE, d_ds->sourceItfName.c_str(), d_ds->sourceItfName.length()); + int res = setsockopt(socket->getHandle(), SOL_SOCKET, SO_BINDTODEVICE, d_ds->sourceItfName.c_str(), d_ds->sourceItfName.length()); if (res != 0) { vinfolog("Error setting up the interface on backend TCP socket '%s': %s", d_ds->getNameWithAddr(), stringerror()); } } #endif - result->bind(d_ds->sourceAddr, false); + socket->bind(d_ds->sourceAddr, false); } - result->setNonBlocking(); -#ifdef MSG_FASTOPEN - if (!d_ds->tcpFastOpen || !isFastOpenEnabled()) { - SConnectWithTimeout(result->getHandle(), d_ds->remote, /* no timeout, we will handle it ourselves */ 0); - } -#else - SConnectWithTimeout(result->getHandle(), d_ds->remote, /* no timeout, we will handle it ourselves */ 0); -#endif /* MSG_FASTOPEN */ + socket->setNonBlocking(); + + auto handler = std::make_unique("", socket->releaseHandle(), 0, d_ds->d_tlsCtx, time(nullptr)); + handler->tryConnect(d_ds->tcpFastOpen && isFastOpenEnabled(), d_ds->remote); - d_socket = std::move(result); + d_handler = std::move(handler); ++d_ds->tcpCurrentConnections; return true; } diff --git a/pdns/dnsdistdist/dnsdist-tcp-downstream.hh b/pdns/dnsdistdist/dnsdist-tcp-downstream.hh index 46b53a70ac..a26b29ca06 100644 --- a/pdns/dnsdistdist/dnsdist-tcp-downstream.hh +++ b/pdns/dnsdistdist/dnsdist-tcp-downstream.hh @@ -52,7 +52,7 @@ public: ~TCPConnectionToBackend() { - if (d_ds && d_socket) { + if (d_ds && d_handler) { --d_ds->tcpCurrentConnections; struct timeval now; gettimeofday(&now, nullptr); @@ -66,11 +66,11 @@ public: int getHandle() const { - if (!d_socket) { + if (!d_handler) { throw std::runtime_error("Attempt to get the socket handle from a non-established TCP connection"); } - return d_socket->getHandle(); + return d_handler->getDescriptor(); } const std::shared_ptr& getDS() const @@ -172,7 +172,7 @@ public: std::string toString() const { ostringstream o; - o << "TCP connection to backend "<<(d_ds ? d_ds->getName() : "empty")<<" over FD "<<(d_socket ? std::to_string(d_socket->getHandle()) : "no socket")<<", state is "<<(int)d_state<<", io state is "<<(d_ioState ? std::to_string((int)d_ioState->getState()) : "empty")<<", queries count is "<getName() : "empty")<<" over FD "<<(d_handler ? std::to_string(d_handler->getDescriptor()) : "no socket")<<", state is "<<(int)d_state<<", io state is "<<(d_ioState ? std::to_string((int)d_ioState->getState()) : "empty")<<", queries count is "< d_pendingQueries; std::unordered_map d_pendingResponses; std::unique_ptr> d_proxyProtocolValuesSent{nullptr}; - std::unique_ptr d_socket{nullptr}; + std::unique_ptr d_handler{nullptr}; std::unique_ptr d_ioState{nullptr}; std::shared_ptr d_ds{nullptr}; std::shared_ptr d_clientConn; diff --git a/pdns/sdig.cc b/pdns/sdig.cc index d9e2d7da30..c4432a2008 100644 --- a/pdns/sdig.cc +++ b/pdns/sdig.cc @@ -409,7 +409,6 @@ try { } uint16_t counter = 0; Socket sock(dest.sin4.sin_family, SOCK_STREAM); - SConnectWithTimeout(sock.getHandle(), dest, timeout); TCPIOHandler handler(subjectName, sock.releaseHandle(), timeout, tlsCtx, time(nullptr)); handler.connect(fastOpen, dest, timeout); // we are writing the proxyheader inside the TLS connection. Is that right? diff --git a/pdns/tcpiohandler.hh b/pdns/tcpiohandler.hh index e6c4983aa9..b9b271e384 100644 --- a/pdns/tcpiohandler.hh +++ b/pdns/tcpiohandler.hh @@ -230,24 +230,64 @@ public: IOState tryConnect(bool fastOpen, const ComboAddress& remote) { - /* yes, this is only the TLS connect not the socket one, - sorry about that */ + d_remote = remote; + +#ifdef TCP_FASTOPEN_CONNECT /* Linux >= 4.11 */ + if (fastOpen) { + int value = 1; + int res = setsockopt(d_socket, IPPROTO_TCP, TCP_FASTOPEN_CONNECT, &value, sizeof(value)); + if (res == 0) { + fastOpen = false; + } + } +#endif /* TCP_FASTOPEN_CONNECT */ + +#ifdef MSG_FASTOPEN + if (!d_conn && fastOpen) { + d_fastOpen = true; + } + else { + SConnectWithTimeout(d_socket, remote, /* no timeout, we will handle it ourselves */ 0); + } +#else + SConnectWithTimeout(d_socket, d_ds->remote, /* no timeout, we will handle it ourselves */ 0); +#endif /* MSG_FASTOPEN */ + if (d_conn) { return d_conn->tryConnect(fastOpen, remote); } - d_fastOpen = fastOpen; return IOState::Done; } void connect(bool fastOpen, const ComboAddress& remote, unsigned int timeout) { - /* yes, this is only the TLS connect not the socket one, - sorry about that */ + d_remote = remote; + +#ifdef TCP_FASTOPEN_CONNECT /* Linux >= 4.11 */ + if (fastOpen) { + int value = 1; + int res = setsockopt(d_socket, IPPROTO_TCP, TCP_FASTOPEN_CONNECT, &value, sizeof(value)); + if (res == 0) { + fastOpen = false; + } + } +#endif /* TCP_FASTOPEN_CONNECT */ + +#ifdef MSG_FASTOPEN + if (!d_conn && fastOpen) { + d_fastOpen = true; + } + else { + SConnectWithTimeout(d_socket, remote, timeout); + } +#else + SConnectWithTimeout(d_socket, d_ds->remote, timeout); +#endif /* MSG_FASTOPEN */ + if (d_conn) { d_conn->connect(fastOpen, remote, timeout); } - d_fastOpen = fastOpen; } IOState tryHandshake() @@ -319,8 +359,24 @@ public: return d_conn->tryWrite(buffer, pos, toWrite); } + if (d_fastOpen) { + int socketFlags = MSG_FASTOPEN; + size_t sent = sendMsgWithOptions(d_socket, reinterpret_cast(&buffer.at(pos)), toWrite - pos, &d_remote, nullptr, 0, socketFlags); + if (sent > 0) { + d_fastOpen = false; + pos += sent; + } + + if (pos < toWrite) { + return IOState::NeedWrite; + } + + return IOState::Done; + } + do { ssize_t res = ::write(d_socket, reinterpret_cast(&buffer.at(pos)), toWrite - pos); + if (res == 0) { throw runtime_error("EOF while sending message"); } @@ -389,13 +445,14 @@ public: return d_conn && d_conn->getResumedFromInactiveTicketKey(); } - bool getUnknownTicketKey() const + bool getUnknownTicketKey() const { return d_conn && d_conn->getUnknownTicketKey(); } private: std::unique_ptr d_conn{nullptr}; + ComboAddress d_remote; int d_socket{-1}; bool d_fastOpen{false}; }; -- 2.47.2