From: Remi Gacogne Date: Tue, 22 Mar 2016 15:24:55 +0000 (+0100) Subject: dnsdist: Use a separate thread and a queue for remote logging X-Git-Tag: dnsdist-1.0.0-beta1~67^2~3^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=cb99b9c3e90c2d9f65e77da0696c03a227648b5e;p=thirdparty%2Fpdns.git dnsdist: Use a separate thread and a queue for remote logging --- diff --git a/pdns/dnsdist-remotelogger.cc b/pdns/dnsdist-remotelogger.cc index 2370105676..b8710bd68a 100644 --- a/pdns/dnsdist-remotelogger.cc +++ b/pdns/dnsdist-remotelogger.cc @@ -10,7 +10,7 @@ #include "dnsmessage.pb.h" #endif -void RemoteLogger::reconnect() +bool RemoteLogger::reconnect() { if (d_socket >= 0) { close(d_socket); @@ -22,8 +22,10 @@ void RemoteLogger::reconnect() setNonBlocking(d_socket); } catch(const std::exception& e) { - infolog("Error connecting to %s: %s", d_remote.toStringWithPort(), e.what()); + infolog("Error connecting to remote logger (%s): %s", d_remote.toStringWithPort(), e.what()); + return false; } + return true; } bool RemoteLogger::sendData(const char* buffer, size_t bufferSize) @@ -55,6 +57,59 @@ bool RemoteLogger::sendData(const char* buffer, size_t bufferSize) return true; } +void RemoteLogger::worker() +{ + while(true) { + std::string data; + { + std::unique_lock lock(d_writeMutex); + d_queueCond.wait(lock, [this]{return !d_writeQueue.empty();}); + data = d_writeQueue.front(); + d_writeQueue.pop(); + } + + try { + uint32_t len = htonl(data.length()); + writen2WithTimeout(d_socket, &len, sizeof(len), (int) d_timeout); + writen2WithTimeout(d_socket, data.c_str(), data.length(), (int) d_timeout); + } + catch(const std::runtime_error& e) { + vinfolog("Error sending data to remote logger (%s): %s", d_remote.toStringWithPort(), e.what()); + + while (!reconnect()) { + sleep(d_reconnectWaitTime); + } + } + } +} + +void RemoteLogger::queueData(const std::string& data) +{ + { + std::unique_lock lock(d_writeMutex); + if (d_writeQueue.size() >= d_maxQueuedEntries) { + d_writeQueue.pop(); + } + d_writeQueue.push(data); + } + d_queueCond.notify_one(); +} + +RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedEntries, uint8_t reconnectWaitTime): d_remote(remote), d_maxQueuedEntries(maxQueuedEntries), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_thread(&RemoteLogger::worker, this) +{ +#ifdef HAVE_PROTOBUF + reconnect(); +#else + throw new std::runtime_error("Remote logging requires protobuf support, which is not enabled."); +#endif /* HAVE_PROTOBUF */ +} + +RemoteLogger::~RemoteLogger() +{ + if (d_socket >= 0) + close(d_socket); +} + void RemoteLogger::logQuery(const DNSQuestion& dq) { #ifdef HAVE_PROTOBUF @@ -91,9 +146,7 @@ void RemoteLogger::logQuery(const DNSQuestion& dq) //cerr < +#include + class RemoteLogger { public: - RemoteLogger(const ComboAddress& remote): d_remote(remote) - { -#ifdef HAVE_PROTOBUF - reconnect(); -#else - throw new std::runtime_error("Remote logging requires protobuf support, which is not enabled."); -#endif /* HAVE_PROTOBUF */ - } - ~RemoteLogger() - { - if (d_socket >= 0) - close(d_socket); - } + RemoteLogger(const ComboAddress& remote, uint16_t timeout=2, uint64_t maxQueuedEntries=100, uint8_t reconnectWaitTime=1); + ~RemoteLogger(); void logQuery(const DNSQuestion& dq); void logResponse(const DNSQuestion& dr); std::string toString() @@ -24,10 +16,19 @@ public: return d_remote.toStringWithPort(); } private: - void reconnect(); + bool reconnect(); bool sendData(const char* buffer, size_t bufferSize); + void worker(); + void queueData(const std::string& data); + std::queue d_writeQueue; + std::mutex d_writeMutex; + std::condition_variable d_queueCond; ComboAddress d_remote; + uint64_t d_maxQueuedEntries; int d_socket{-1}; + uint16_t d_timeout; + uint8_t d_reconnectWaitTime; + std::thread d_thread; };