From: bert hubert Date: Tue, 29 Jan 2019 11:15:21 +0000 (+0100) Subject: Reduce systemcall usage in protobuf logging X-Git-Tag: auth-4.2.0-beta1~44^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=da71b63b1e67d89575ffc47ec1336928e7d903eb;p=thirdparty%2Fpdns.git Reduce systemcall usage in protobuf logging Since Spectre/Meltdown, system calls have become more expensive. In addition, relevant versions of glibc turn out to implement pthread_cond_wait and pthread_cond_signal in such a way that they use multiple system calls always. There is an optimization in glibc to improve this but it is disabled. This new setup changes our protobuf logging so it amortizes system calls so we perform far less than one call per message. Note that our previous RemoteLogger was configured in terms of how many *messages* it would buffer. Our new code is configured in terms of how many *bytes*. I have multiplied the configured numbers by 100 elsewhere (recursor config, dnsdist config) to sort of maintain parity. In addition, the old RemoteLogger would buffer messages while there was no connection available. We no longer do this. Finally new, every 'reconnectTimeout' seconds we will flush our buffers opportunistically to not keep people waiting. --- diff --git a/pdns/dnsdist-lua-bindings.cc b/pdns/dnsdist-lua-bindings.cc index 1138c75e0d..97f85006e0 100644 --- a/pdns/dnsdist-lua-bindings.cc +++ b/pdns/dnsdist-lua-bindings.cc @@ -390,7 +390,7 @@ void setupLuaBindings(bool client) /* RemoteLogger */ g_lua.writeFunction("newRemoteLogger", [client](const std::string& remote, boost::optional timeout, boost::optional maxQueuedEntries, boost::optional reconnectWaitTime) { - return std::shared_ptr(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? *maxQueuedEntries : 100, reconnectWaitTime ? *reconnectWaitTime : 1, client)); + return std::shared_ptr(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries*100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client)); }); g_lua.writeFunction("newFrameStreamUnixLogger", [client](const std::string& address) { diff --git a/pdns/namespaces.hh b/pdns/namespaces.hh index 2d5cd8c61c..565b67d9b1 100644 --- a/pdns/namespaces.hh +++ b/pdns/namespaces.hh @@ -22,7 +22,6 @@ #ifndef PDNS_NAMESPACES_HH #define PDNS_NAMESPACES_HH #include - #include #include #include @@ -78,4 +77,5 @@ using boost::trim_right_copy_if; using boost::equals; using boost::ends_with; using boost::iends_with; + #endif diff --git a/pdns/pdns_recursor.cc b/pdns/pdns_recursor.cc index 4bb10c96e0..f4842e9e7c 100644 --- a/pdns/pdns_recursor.cc +++ b/pdns/pdns_recursor.cc @@ -878,7 +878,7 @@ static std::shared_ptr>> startProtobuf for (const auto& server : config.servers) { try { - result->emplace_back(new RemoteLogger(server, config.timeout, config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect)); + result->emplace_back(new RemoteLogger(server, config.timeout, 100*config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect)); } catch(const std::exception& e) { g_log< #include "threadname.hh" #include "remote_logger.hh" +#include #ifdef HAVE_CONFIG_H #include "config.h" #endif @@ -11,13 +12,71 @@ #include "dolog.hh" #endif +void CircularWriteBuffer::write(const std::string& str) +{ + if(d_buffer.size() + 2 + str.size() > d_buffer.capacity()) + flush(); + + if(d_buffer.size() + 2 + str.size() > d_buffer.capacity()) + throw std::runtime_error("Full!"); + + uint16_t len = htons(str.size()); + char* ptr = (char*)&len; + d_buffer.insert(d_buffer.end(), ptr, ptr + 2); + d_buffer.insert(d_buffer.end(), str.begin(), str.end()); +} + +void CircularWriteBuffer::flush() +{ + if(d_buffer.empty()) // not optional, we report EOF otherwise + return; + + auto arr1 = d_buffer.array_one(); + auto arr2 = d_buffer.array_two(); + + struct iovec iov[2]; + int pos=0; + size_t total=0; + for(const auto& arr : {arr1, arr2}) { + if(arr.second) { + iov[pos].iov_base = arr.first; + iov[pos].iov_len = arr.second; + total += arr.second; + ++pos; + } + } + + int res = writev(d_fd, iov, pos); + if(res < 0) { + throw std::runtime_error("Couldn't flush a thing: "+string(strerror(errno))); + } + if(!res) { + throw std::runtime_error("EOF"); + } + // cout<<"Flushed "<(d_socket, d_maxQueuedBytes); + } + d_thread = std::thread(&RemoteLogger::maintenanceThread, this); +} + bool RemoteLogger::reconnect() { if (d_socket >= 0) { close(d_socket); d_socket = -1; } - d_connected = false; try { d_socket = SSocket(d_remote.sin4.sin_family, SOCK_STREAM, 0); setNonBlocking(d_socket); @@ -31,18 +90,33 @@ bool RemoteLogger::reconnect() #endif return false; } - d_connected = true; return true; } -void RemoteLogger::busyReconnectLoop() +void RemoteLogger::queueData(const std::string& data) { - while (!reconnect()) { - sleep(d_reconnectWaitTime); + if(!d_writer) { + d_drops++; + return; + } + std::unique_lock lock(d_mutex); + if(d_writer) { + try { + d_writer->write(data); + } + catch(std::exception& e) { + // cout << "Got exception writing: "< lock(d_writeMutex); - d_queueCond.wait(lock, [this]{return (!d_writeQueue.empty()) || d_exiting;}); - if (d_exiting) { - return; - } - data = d_writeQueue.front(); - d_writeQueue.pop(); - } - if (!d_connected) { - busyReconnectLoop(); - } + for(;;) { + if(d_exiting) + break; - try { - uint16_t len = static_cast(data.length()); - sendSizeAndMsgWithTimeout(d_socket, len, data.c_str(), static_cast(d_timeout), nullptr, nullptr, 0, 0, 0); - } - catch(const std::runtime_error& e) { -#ifdef WE_ARE_RECURSOR - g_log< lock(d_mutex); + if(d_writer) { // check if it is still set + // cout<<"Flush"<flush(); + } + catch(std::exception& e) { + // cout<<"Flush failed!"< lock(d_writeMutex); - if (d_writeQueue.size() >= d_maxQueuedEntries) { - d_writeQueue.pop(); + else if(reconnect()) { // if it was zero, it will remain zero, we are the only ones setting it! + std::unique_lock lock(d_mutex); + d_writer = make_unique(d_socket, d_maxQueuedBytes); } - d_writeQueue.push(data); + sleep(d_reconnectWaitTime); } - d_queueCond.notify_one(); } - -RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedEntries, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedEntries(maxQueuedEntries), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_thread(&RemoteLogger::worker, this) +catch(std::exception& e) { - if (!d_asyncConnect) { - reconnect(); - } + cerr<<"Thead died on: "<= 0) { close(d_socket); d_socket = -1; - d_connected = false; } - d_queueCond.notify_one(); + d_thread.join(); } diff --git a/pdns/remote_logger.hh b/pdns/remote_logger.hh index 3eb2aa5757..95fd634def 100644 --- a/pdns/remote_logger.hh +++ b/pdns/remote_logger.hh @@ -30,6 +30,31 @@ #include #include "iputils.hh" +#include + +/* Writes can be submitted and they are atomically accepted. Either the whole write + ends up in the buffer or nothing ends up in the buffer. + In case nothing ends up in the buffer, an exception is thrown. + Similarly, EOF leads to this treatment + + The filedescriptor can be in non-blocking mode. + + This class is not threadsafe. +*/ + +class CircularWriteBuffer +{ +public: + explicit CircularWriteBuffer(int fd, size_t size) : d_fd(fd), d_buffer(size) + { + } + + void write(const std::string& str); + void flush(); +private: + int d_fd; + boost::circular_buffer d_buffer; +}; class RemoteLoggerInterface { @@ -39,35 +64,42 @@ public: virtual std::string toString() const = 0; }; +/* Thread safe. Will connect asynchronously on request. + Runs a reconnection thread that also periodicall flushes. + Note that the buffer only runs as long as there is a connection. + If there is no connection we don't buffer a thing +*/ class RemoteLogger : public RemoteLoggerInterface { public: - RemoteLogger(const ComboAddress& remote, uint16_t timeout=2, uint64_t maxQueuedEntries=100, uint8_t reconnectWaitTime=1, bool asyncConnect=false); - virtual ~RemoteLogger(); - virtual void queueData(const std::string& data) override; - virtual std::string toString() const override + RemoteLogger(const ComboAddress& remote, uint16_t timeout=2, + uint64_t maxQueuedBytes=100000, + uint8_t reconnectWaitTime=1, + bool asyncConnect=false); + ~RemoteLogger(); + void queueData(const std::string& data) override; + std::string toString() const override { - return "RemoteLogger to " + d_remote.toStringWithPort(); + return d_remote.toStringWithPort(); } void stop() { d_exiting = true; } + std::atomic d_drops{0}; private: - void busyReconnectLoop(); bool reconnect(); - void worker(); + void maintenanceThread(); - std::queue d_writeQueue; - std::mutex d_writeMutex; - std::condition_variable d_queueCond; ComboAddress d_remote; - uint64_t d_maxQueuedEntries; + uint64_t d_maxQueuedBytes; int d_socket{-1}; + std::unique_ptr d_writer; uint16_t d_timeout; uint8_t d_reconnectWaitTime; std::atomic d_exiting{false}; + bool d_asyncConnect{false}; - bool d_connected{false}; std::thread d_thread; + std::mutex d_mutex; };