]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Reduce systemcall usage in protobuf logging 7428/head
authorbert hubert <bert.hubert@netherlabs.nl>
Tue, 29 Jan 2019 11:15:21 +0000 (12:15 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 29 Jan 2019 14:54:00 +0000 (15:54 +0100)
Since Spectre/Meltdown, system calls have become more expensive.  In
addition, relevant versions of glibc turn out to implement pthread_cond_wait
and pthread_cond_signal in such a way that they use multiple system calls always.
There is an optimization in glibc to improve this but it is disabled.

This new setup changes our protobuf logging so it amortizes system calls so we perform
far less than one call per message.

Note that our previous RemoteLogger was configured in terms of how many
*messages* it would buffer. Our new code is configured in terms of how many
*bytes*. I have multiplied the configured numbers by 100 elsewhere (recursor
config, dnsdist config) to sort of maintain parity.

In addition, the old RemoteLogger would buffer messages while there was no
connection available. We no longer do this.

Finally new, every 'reconnectTimeout' seconds we will flush our buffers
opportunistically to not keep people waiting.

pdns/dnsdist-lua-bindings.cc
pdns/namespaces.hh
pdns/pdns_recursor.cc
pdns/remote_logger.cc
pdns/remote_logger.hh

index 1138c75e0d9ef0d5fba639ad919f3807866d181e..97f85006e02f872fdc8520c3e5fc0a366beefef8 100644 (file)
@@ -390,7 +390,7 @@ void setupLuaBindings(bool client)
 
   /* RemoteLogger */
   g_lua.writeFunction("newRemoteLogger", [client](const std::string& remote, boost::optional<uint16_t> timeout, boost::optional<uint64_t> maxQueuedEntries, boost::optional<uint8_t> reconnectWaitTime) {
-      return std::shared_ptr<RemoteLoggerInterface>(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? *maxQueuedEntries : 100, reconnectWaitTime ? *reconnectWaitTime : 1, client));
+      return std::shared_ptr<RemoteLoggerInterface>(new RemoteLogger(ComboAddress(remote), timeout ? *timeout : 2, maxQueuedEntries ? (*maxQueuedEntries*100) : 10000, reconnectWaitTime ? *reconnectWaitTime : 1, client));
     });
 
   g_lua.writeFunction("newFrameStreamUnixLogger", [client](const std::string& address) {
index 2d5cd8c61c243b1d24749d388cd783b5077abf90..565b67d9b19c6291dfc697dd955e3d10b101a658 100644 (file)
@@ -22,7 +22,6 @@
 #ifndef PDNS_NAMESPACES_HH
 #define PDNS_NAMESPACES_HH
 #include <boost/tuple/tuple.hpp>
-
 #include <boost/shared_array.hpp>
 #include <boost/scoped_array.hpp>
 #include <boost/optional.hpp>
@@ -78,4 +77,5 @@ using boost::trim_right_copy_if;
 using boost::equals;
 using boost::ends_with;
 using boost::iends_with;
+
 #endif
index 4bb10c96e038e21650a8e2998b4e2a1eeaa203c5..f4842e9e7c8d02d104872d64e5f2aecb73e18565 100644 (file)
@@ -878,7 +878,7 @@ static std::shared_ptr<std::vector<std::unique_ptr<RemoteLogger>>> startProtobuf
 
   for (const auto& server : config.servers) {
     try {
-      result->emplace_back(new RemoteLogger(server, config.timeout, config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect));
+      result->emplace_back(new RemoteLogger(server, config.timeout, 100*config.maxQueuedEntries, config.reconnectWaitTime, config.asyncConnect));
     }
     catch(const std::exception& e) {
       g_log<<Logger::Error<<"Error while starting protobuf logger to '"<<server<<": "<<e.what()<<endl;
index 50e54248d2eb0308977ddbde202727c2d7ede92f..8863fa1954faa27db7c57a418387a232086dd682 100644 (file)
@@ -1,6 +1,7 @@
 #include <unistd.h>
 #include "threadname.hh"
 #include "remote_logger.hh"
+#include <sys/uio.h>
 #ifdef HAVE_CONFIG_H
 #include "config.h"
 #endif
 #include "dolog.hh"
 #endif
 
+void CircularWriteBuffer::write(const std::string& str)
+{
+  if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
+    flush();
+
+  if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
+    throw std::runtime_error("Full!");
+
+  uint16_t len = htons(str.size());
+  char* ptr = (char*)&len;
+  d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
+  d_buffer.insert(d_buffer.end(), str.begin(), str.end());
+}
+
+void CircularWriteBuffer::flush()
+{
+  if(d_buffer.empty()) // not optional, we report EOF otherwise
+    return;
+
+  auto arr1 = d_buffer.array_one();
+  auto arr2 = d_buffer.array_two();
+
+  struct iovec iov[2];
+  int pos=0;
+  size_t total=0;
+  for(const auto& arr : {arr1, arr2}) {
+    if(arr.second) {
+      iov[pos].iov_base = arr.first;
+      iov[pos].iov_len = arr.second;
+      total += arr.second;
+      ++pos;
+    }
+  }
+
+  int res = writev(d_fd, iov, pos);
+  if(res < 0) {
+    throw std::runtime_error("Couldn't flush a thing: "+string(strerror(errno)));
+  }
+  if(!res) {
+    throw std::runtime_error("EOF");
+  }
+  //  cout<<"Flushed "<<res<<" bytes out of " << total <<endl;
+  if((size_t)res == d_buffer.size())
+    d_buffer.clear();
+  else {
+    while(res--)
+      d_buffer.pop_front();
+  }
+}
+
+RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedBytes(maxQueuedBytes), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect)
+{
+  if (!d_asyncConnect) {
+    if(reconnect())
+      d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
+  }
+  d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
+}
+
 bool RemoteLogger::reconnect()
 {
   if (d_socket >= 0) {
     close(d_socket);
     d_socket = -1;
   }
-  d_connected = false;
   try {
     d_socket = SSocket(d_remote.sin4.sin_family, SOCK_STREAM, 0);
     setNonBlocking(d_socket);
@@ -31,18 +90,33 @@ bool RemoteLogger::reconnect()
 #endif
     return false;
   }
-  d_connected = true;
   return true;
 }
 
-void RemoteLogger::busyReconnectLoop()
+void RemoteLogger::queueData(const std::string& data)
 {
-  while (!reconnect()) {
-    sleep(d_reconnectWaitTime);
+  if(!d_writer) {
+    d_drops++;
+    return;
+  }
+  std::unique_lock<std::mutex> lock(d_mutex);
+  if(d_writer) {
+    try {
+      d_writer->write(data);
+    }
+    catch(std::exception& e) {
+      //      cout << "Got exception writing: "<<e.what()<<endl;
+      d_drops++;
+      d_writer.reset();
+      close(d_socket);
+      d_socket = -1;
+    }
   }
 }
 
-void RemoteLogger::worker()
+
+void RemoteLogger::maintenanceThread()
+try
 {
 #ifdef WE_ARE_RECURSOR
   string threadName = "pdns-r/remLog";
@@ -50,54 +124,36 @@ void RemoteLogger::worker()
   string threadName = "dnsdist/remLog";
 #endif
   setThreadName(threadName);
-  while(true) {
-    std::string data;
-    {
-      std::unique_lock<std::mutex> lock(d_writeMutex);
-      d_queueCond.wait(lock, [this]{return (!d_writeQueue.empty()) || d_exiting;});
-      if (d_exiting) {
-        return;
-      }
-      data = d_writeQueue.front();
-      d_writeQueue.pop();
-    }
 
-    if (!d_connected) {
-      busyReconnectLoop();
-    }
+  for(;;) {
+    if(d_exiting)
+      break;
 
-    try {
-      uint16_t len = static_cast<uint16_t>(data.length());
-      sendSizeAndMsgWithTimeout(d_socket, len, data.c_str(), static_cast<int>(d_timeout), nullptr, nullptr, 0, 0, 0);
-    }
-    catch(const std::runtime_error& e) {
-#ifdef WE_ARE_RECURSOR
-      g_log<<Logger::Info<<"Error sending data to remote logger "<<d_remote.toStringWithPort()<<": "<< e.what()<<endl;
-#else
-      vinfolog("Error sending data to remote logger (%s): %s", d_remote.toStringWithPort(), e.what());
-#endif
-      busyReconnectLoop();
+    if(d_writer) {
+      std::unique_lock<std::mutex> lock(d_mutex);
+      if(d_writer) { // check if it is still set
+        //        cout<<"Flush"<<endl;
+        try {
+          d_writer->flush();
+        }
+        catch(std::exception& e) {
+          //          cout<<"Flush failed!"<<endl;
+          d_writer.reset();
+          close(d_socket);
+          d_socket = -1;
+        }
+      }
     }
-  }
-}
-
-void RemoteLogger::queueData(const std::string& data)
-{
-  {
-    std::lock_guard<std::mutex> lock(d_writeMutex);
-    if (d_writeQueue.size() >= d_maxQueuedEntries) {
-      d_writeQueue.pop();
+    else if(reconnect()) { // if it was zero, it will remain zero, we are the only ones setting it!
+      std::unique_lock<std::mutex> lock(d_mutex);
+      d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
     }
-    d_writeQueue.push(data);
+    sleep(d_reconnectWaitTime);
   }
-  d_queueCond.notify_one();
 }
-
-RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedEntries, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedEntries(maxQueuedEntries), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_thread(&RemoteLogger::worker, this)
+catch(std::exception& e)
 {
-  if (!d_asyncConnect) {
-    reconnect();
-  }
+  cerr<<"Thead died on: "<<e.what()<<endl;
 }
 
 RemoteLogger::~RemoteLogger()
@@ -106,8 +162,7 @@ RemoteLogger::~RemoteLogger()
   if (d_socket >= 0) {
     close(d_socket);
     d_socket = -1;
-    d_connected = false;
   }
-  d_queueCond.notify_one();
+
   d_thread.join();
 }
index 3eb2aa575711f0f23fa4ef645ce706a0171f8378..95fd634def3c69b2f27d305abccbd4c65631cbf7 100644 (file)
 #include <thread>
 
 #include "iputils.hh"
+#include <boost/circular_buffer.hpp>
+
+/* Writes can be submitted and they are atomically accepted. Either the whole write
+   ends up in the buffer or nothing ends up in the buffer.
+   In case nothing ends up in the buffer, an exception is thrown.
+   Similarly, EOF leads to this treatment
+
+   The filedescriptor can be in non-blocking mode.
+
+   This class is not threadsafe.
+*/
+
+class CircularWriteBuffer
+{
+public:
+  explicit CircularWriteBuffer(int fd, size_t size) : d_fd(fd), d_buffer(size)
+  {
+  }
+
+  void write(const std::string& str);
+  void flush();
+private:
+  int d_fd;
+  boost::circular_buffer<char> d_buffer;
+};
 
 class RemoteLoggerInterface
 {
@@ -39,35 +64,42 @@ public:
   virtual std::string toString() const = 0;
 };
 
+/* Thread safe. Will connect asynchronously on request.
+   Runs a reconnection thread that also periodicall flushes.
+   Note that the buffer only runs as long as there is a connection.
+   If there is no connection we don't buffer a thing
+*/
 class RemoteLogger : public RemoteLoggerInterface
 {
 public:
-  RemoteLogger(const ComboAddress& remote, uint16_t timeout=2, uint64_t maxQueuedEntries=100, uint8_t reconnectWaitTime=1, bool asyncConnect=false);
-  virtual ~RemoteLogger();
-  virtual void queueData(const std::string& data) override;
-  virtual std::string toString() const override
+  RemoteLogger(const ComboAddress& remote, uint16_t timeout=2,
+               uint64_t maxQueuedBytes=100000,
+               uint8_t reconnectWaitTime=1,
+               bool asyncConnect=false);
+  ~RemoteLogger();
+  void queueData(const std::string& data) override;
+  std::string toString() const override
   {
-    return "RemoteLogger to " + d_remote.toStringWithPort();
+    return d_remote.toStringWithPort();
   }
   void stop()
   {
     d_exiting = true;
   }
+  std::atomic<uint32_t> d_drops{0};
 private:
-  void busyReconnectLoop();
   bool reconnect();
-  void worker();
+  void maintenanceThread();
 
-  std::queue<std::string> d_writeQueue;
-  std::mutex d_writeMutex;
-  std::condition_variable d_queueCond;
   ComboAddress d_remote;
-  uint64_t d_maxQueuedEntries;
+  uint64_t d_maxQueuedBytes;
   int d_socket{-1};
+  std::unique_ptr<CircularWriteBuffer> d_writer;
   uint16_t d_timeout;
   uint8_t d_reconnectWaitTime;
   std::atomic<bool> d_exiting{false};
+
   bool d_asyncConnect{false};
-  bool d_connected{false};
   std::thread d_thread;
+  std::mutex d_mutex;
 };