From ebf995258fbe844ba5af457c4e83ec1b9a09a0bf Mon Sep 17 00:00:00 2001 From: Remi Gacogne Date: Tue, 3 Mar 2020 12:24:34 +0100 Subject: [PATCH] Better handling of reconnections in Remote Logger (dnsdist, rec) - Do not lock while trying to reconnect ; - Try to reconnect right away if the disconnection was detected in the maintenance thread ; - Keep queueing messages when the connection has been lost, until the buffer gets full. --- pdns/remote_logger.cc | 180 ++++++++++++++++++++++++++++-------------- pdns/remote_logger.hh | 13 +-- 2 files changed, 126 insertions(+), 67 deletions(-) diff --git a/pdns/remote_logger.cc b/pdns/remote_logger.cc index b277c7f191..956d06bb19 100644 --- a/pdns/remote_logger.cc +++ b/pdns/remote_logger.cc @@ -12,31 +12,42 @@ #include "dolog.hh" #endif -void CircularWriteBuffer::write(const std::string& str) +bool CircularWriteBuffer::hasRoomFor(const std::string& str) const { - if(d_buffer.size() + 2 + str.size() > d_buffer.capacity()) - flush(); + if (d_buffer.size() + 2 + str.size() > d_buffer.capacity()) { + return false; + } - if(d_buffer.size() + 2 + str.size() > d_buffer.capacity()) - throw std::runtime_error("Full!"); + return true; +} + +bool CircularWriteBuffer::write(const std::string& str) +{ + if (!hasRoomFor(str)) { + return false; + } uint16_t len = htons(str.size()); - char* ptr = (char*)&len; + const char* ptr = reinterpret_cast(&len); d_buffer.insert(d_buffer.end(), ptr, ptr + 2); d_buffer.insert(d_buffer.end(), str.begin(), str.end()); + + return true; } -void CircularWriteBuffer::flush() +bool CircularWriteBuffer::flush(int fd) { - if(d_buffer.empty()) // not optional, we report EOF otherwise - return; + if (d_buffer.empty()) { + // not optional, we report EOF otherwise + return false; + } auto arr1 = d_buffer.array_one(); auto arr2 = d_buffer.array_two(); struct iovec iov[2]; - int pos=0; - size_t total=0; + int pos = 0; + size_t total = 0; for(const auto& arr : {arr1, arr2}) { if(arr.second) { iov[pos].iov_base = arr.first; @@ -46,50 +57,76 @@ void CircularWriteBuffer::flush() } } - int res = writev(d_fd, iov, pos); - if(res < 0) { - throw std::runtime_error("Couldn't flush a thing: "+stringerror()); - } - if(!res) { - throw std::runtime_error("EOF"); + ssize_t res = 0; + do { + res = writev(fd, iov, pos); + + if (res < 0) { + if (errno == EINTR) { + continue; + } + + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return false; + } + + /* we can't be sure we haven't sent a partial message, + and we don't want to send the remaining part after reconnecting */ + d_buffer.clear(); + throw std::runtime_error("Couldn't flush a thing: " + stringerror()); + } + else if (!res) { + /* we can't be sure we haven't sent a partial message, + and we don't want to send the remaining part after reconnecting */ + d_buffer.clear(); + throw std::runtime_error("EOF"); + } } + while (res < 0); + // cout<<"Flushed "<(res) == d_buffer.size()) { d_buffer.clear(); } else { - while(res--) { + while (res--) { d_buffer.pop_front(); } } + + return true; } -RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedBytes(maxQueuedBytes), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect) +RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_writer(maxQueuedBytes), d_remote(remote), d_maxQueuedBytes(maxQueuedBytes), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect) { if (!d_asyncConnect) { - if(reconnect()) - d_writer = make_unique(d_socket, d_maxQueuedBytes); + reconnect(); } + d_thread = std::thread(&RemoteLogger::maintenanceThread, this); } bool RemoteLogger::reconnect() { - if (d_socket >= 0) { - close(d_socket); - d_socket = -1; - } try { - d_socket = SSocket(d_remote.sin4.sin_family, SOCK_STREAM, 0); - setNonBlocking(d_socket); - SConnectWithTimeout(d_socket, d_remote, d_timeout); + auto newSock = make_unique(d_remote.sin4.sin_family, SOCK_STREAM, 0); + newSock->setNonBlocking(); + newSock->connect(d_remote, d_timeout); + + { + /* we are now successfully connected, time to take the lock and update the + socket */ + std::unique_lock lock(d_mutex); + d_socket = std::move(newSock); + } } - catch(const std::exception& e) { + catch (const std::exception& e) { #ifdef WE_ARE_RECURSOR g_log< lock(d_mutex); - if(d_writer) { + + if (!d_writer.hasRoomFor(data)) { + /* not connected, queue is full, just drop */ + if (!d_socket) { + ++d_drops; + return; + } try { - d_writer->write(data); - ++d_queued; + /* we try to flush some data */ + if (!d_writer.flush(d_socket->getHandle())) { + /* but failed, let's just drop */ + ++d_drops; + return; + } + + /* see if we freed enough data */ + if (!d_writer.hasRoomFor(data)) { + /* we didn't */ + ++d_drops; + return; + } } catch(const std::exception& e) { // cout << "Got exception writing: "< lock(d_mutex); - if(d_writer) { // check if it is still set - // cout<<"Flush"<flush(); + bool connected = true; + if (d_socket == nullptr) { + // if it was unset, it will remain so, we are the only ones setting it! + connected = reconnect(); + } + + /* we will just go to sleep if the reconnection just failed */ + if (connected) { + try { + /* we don't want to take the lock while trying to reconnect */ + std::unique_lock lock(d_mutex); + if (d_socket) { // check if it is set + d_writer.flush(d_socket->getHandle()); } - catch(std::exception& e) { - // cout<<"Flush failed!"< lock(d_mutex); - d_writer = make_unique(d_socket, d_maxQueuedBytes); - } + sleep(d_reconnectWaitTime); } } @@ -162,10 +224,6 @@ catch(const std::exception& e) RemoteLogger::~RemoteLogger() { d_exiting = true; - if (d_socket >= 0) { - close(d_socket); - d_socket = -1; - } d_thread.join(); } diff --git a/pdns/remote_logger.hh b/pdns/remote_logger.hh index 8bcf6b0528..af48f6ed66 100644 --- a/pdns/remote_logger.hh +++ b/pdns/remote_logger.hh @@ -31,6 +31,7 @@ #include "iputils.hh" #include "circular_buffer.hh" +#include "sstuff.hh" /* Writes can be submitted and they are atomically accepted. Either the whole write ends up in the buffer or nothing ends up in the buffer. @@ -45,14 +46,14 @@ class CircularWriteBuffer { public: - explicit CircularWriteBuffer(int fd, size_t size) : d_fd(fd), d_buffer(size) + explicit CircularWriteBuffer(size_t size) : d_buffer(size) { } - void write(const std::string& str); - void flush(); + bool hasRoomFor(const std::string& str) const; + bool write(const std::string& str); + bool flush(int fd); private: - int d_fd; boost::circular_buffer d_buffer; }; @@ -100,12 +101,12 @@ private: bool reconnect(); void maintenanceThread(); - std::unique_ptr d_writer; + CircularWriteBuffer d_writer; ComboAddress d_remote; std::atomic d_drops{0}; std::atomic d_queued{0}; uint64_t d_maxQueuedBytes; - int d_socket{-1}; + std::unique_ptr d_socket{nullptr}; uint16_t d_timeout; uint8_t d_reconnectWaitTime; std::atomic d_exiting{false}; -- 2.47.2