]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Better handling of reconnections in Remote Logger (dnsdist, rec)
authorRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 3 Mar 2020 11:24:34 +0000 (12:24 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 6 Mar 2020 16:11:44 +0000 (17:11 +0100)
- Do not lock while trying to reconnect ;
- Try to reconnect right away if the disconnection was detected in
  the maintenance thread ;
- Keep queueing messages when the connection has been lost, until
  the buffer gets full.

pdns/remote_logger.cc
pdns/remote_logger.hh

index b277c7f191077fda271eb81ef0a2998ff7b69ba4..956d06bb19474dd44890fb20373d0ad357c8dc45 100644 (file)
 #include "dolog.hh"
 #endif
 
-void CircularWriteBuffer::write(const std::string& str)
+bool CircularWriteBuffer::hasRoomFor(const std::string& str) const
 {
-  if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
-    flush();
+  if (d_buffer.size() + 2 + str.size() > d_buffer.capacity()) {
+    return false;
+  }
 
-  if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
-    throw std::runtime_error("Full!");
+  return true;
+}
+
+bool CircularWriteBuffer::write(const std::string& str)
+{
+  if (!hasRoomFor(str)) {
+    return false;
+  }
 
   uint16_t len = htons(str.size());
-  char* ptr = (char*)&len;
+  const char* ptr = reinterpret_cast<const char*>(&len);
   d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
   d_buffer.insert(d_buffer.end(), str.begin(), str.end());
+
+  return true;
 }
 
-void CircularWriteBuffer::flush()
+bool CircularWriteBuffer::flush(int fd)
 {
-  if(d_buffer.empty()) // not optional, we report EOF otherwise
-    return;
+  if (d_buffer.empty()) {
+    // not optional, we report EOF otherwise
+    return false;
+  }
 
   auto arr1 = d_buffer.array_one();
   auto arr2 = d_buffer.array_two();
 
   struct iovec iov[2];
-  int pos=0;
-  size_t total=0;
+  int pos = 0;
+  size_t total = 0;
   for(const auto& arr : {arr1, arr2}) {
     if(arr.second) {
       iov[pos].iov_base = arr.first;
@@ -46,50 +57,76 @@ void CircularWriteBuffer::flush()
     }
   }
 
-  int res = writev(d_fd, iov, pos);
-  if(res < 0) {
-    throw std::runtime_error("Couldn't flush a thing: "+stringerror());
-  }
-  if(!res) {
-    throw std::runtime_error("EOF");
+  ssize_t res = 0;
+  do {
+    res = writev(fd, iov, pos);
+
+    if (res < 0) {
+      if (errno == EINTR) {
+        continue;
+      }
+
+      if (errno == EAGAIN || errno == EWOULDBLOCK) {
+        return false;
+      }
+
+      /* we can't be sure we haven't sent a partial message,
+         and we don't want to send the remaining part after reconnecting */
+      d_buffer.clear();
+      throw std::runtime_error("Couldn't flush a thing: " + stringerror());
+    }
+    else if (!res) {
+      /* we can't be sure we haven't sent a partial message,
+         and we don't want to send the remaining part after reconnecting */
+      d_buffer.clear();
+      throw std::runtime_error("EOF");
+    }
   }
+  while (res < 0);
+
   //  cout<<"Flushed "<<res<<" bytes out of " << total <<endl;
   if (static_cast<size_t>(res) == d_buffer.size()) {
     d_buffer.clear();
   }
   else {
-    while(res--) {
+    while (res--) {
       d_buffer.pop_front();
     }
   }
+
+  return true;
 }
 
-RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedBytes(maxQueuedBytes), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect)
+RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_writer(maxQueuedBytes), d_remote(remote), d_maxQueuedBytes(maxQueuedBytes), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect)
 {
   if (!d_asyncConnect) {
-    if(reconnect())
-      d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
+    reconnect();
   }
+
   d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
 }
 
 bool RemoteLogger::reconnect()
 {
-  if (d_socket >= 0) {
-    close(d_socket);
-    d_socket = -1;
-  }
   try {
-    d_socket = SSocket(d_remote.sin4.sin_family, SOCK_STREAM, 0);
-    setNonBlocking(d_socket);
-    SConnectWithTimeout(d_socket, d_remote, d_timeout);
+    auto newSock = make_unique<Socket>(d_remote.sin4.sin_family, SOCK_STREAM, 0);
+    newSock->setNonBlocking();
+    newSock->connect(d_remote, d_timeout);
+
+    {
+      /* we are now successfully connected, time to take the lock and update the
+         socket */
+      std::unique_lock<std::mutex> lock(d_mutex);
+      d_socket = std::move(newSock);
+    }
   }
-  catch(const std::exception& e) {
+  catch (const std::exception& e) {
 #ifdef WE_ARE_RECURSOR
     g_log<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl;
 #else
     warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what());
 #endif
+
     return false;
   }
   return true;
@@ -97,26 +134,40 @@ bool RemoteLogger::reconnect()
 
 void RemoteLogger::queueData(const std::string& data)
 {
-  if(!d_writer) {
-    ++d_drops;
-    return;
-  }
   std::unique_lock<std::mutex> lock(d_mutex);
-  if(d_writer) {
+
+  if (!d_writer.hasRoomFor(data)) {
+    /* not connected, queue is full, just drop */
+    if (!d_socket) {
+      ++d_drops;
+      return;
+    }
     try {
-      d_writer->write(data);
-      ++d_queued;
+      /* we try to flush some data */
+      if (!d_writer.flush(d_socket->getHandle())) {
+        /* but failed, let's just drop */
+        ++d_drops;
+        return;
+      }
+
+      /* see if we freed enough data */
+      if (!d_writer.hasRoomFor(data)) {
+        /* we didn't */
+        ++d_drops;
+        return;
+      }
     }
     catch(const std::exception& e) {
       //      cout << "Got exception writing: "<<e.what()<<endl;
       ++d_drops;
-      d_writer.reset();
-      close(d_socket);
-      d_socket = -1;
+      d_socket.reset();
+      return;
     }
   }
-}
 
+  d_writer.write(data);
+  ++d_queued;
+}
 
 void RemoteLogger::maintenanceThread()
 try
@@ -128,29 +179,40 @@ try
 #endif
   setThreadName(threadName);
 
-  for(;;) {
-    if(d_exiting)
+  for (;;) {
+    if (d_exiting) {
       break;
+    }
 
-    if(d_writer) {
-      std::unique_lock<std::mutex> lock(d_mutex);
-      if(d_writer) { // check if it is still set
-        //        cout<<"Flush"<<endl;
-        try {
-          d_writer->flush();
+    bool connected = true;
+    if (d_socket == nullptr) {
+      // if it was unset, it will remain so, we are the only ones setting it!
+      connected = reconnect();
+    }
+
+    /* we will just go to sleep if the reconnection just failed */
+    if (connected) {
+      try {
+        /* we don't want to take the lock while trying to reconnect */
+        std::unique_lock<std::mutex> lock(d_mutex);
+        if (d_socket) { // check if it is set
+          d_writer.flush(d_socket->getHandle());
         }
-        catch(std::exception& e) {
-          //          cout<<"Flush failed!"<<endl;
-          d_writer.reset();
-          close(d_socket);
-          d_socket = -1;
+        else {
+          connected = false;
         }
       }
+      catch(const std::exception& e) {
+        d_socket.reset();
+        connected = false;
+      }
+
+      if (!connected) {
+        /* let's try to reconnect right away, we are about to sleep anyway */
+        reconnect();
+      }
     }
-    else if(reconnect()) { // if it was zero, it will remain zero, we are the only ones setting it!
-      std::unique_lock<std::mutex> lock(d_mutex);
-      d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
-    }
+
     sleep(d_reconnectWaitTime);
   }
 }
@@ -162,10 +224,6 @@ catch(const std::exception& e)
 RemoteLogger::~RemoteLogger()
 {
   d_exiting = true;
-  if (d_socket >= 0) {
-    close(d_socket);
-    d_socket = -1;
-  }
 
   d_thread.join();
 }
index 8bcf6b05288e5f70021041780a5cd9c4e285aa2e..af48f6ed66579c5766a95647665a5fb361d05827 100644 (file)
@@ -31,6 +31,7 @@
 
 #include "iputils.hh"
 #include "circular_buffer.hh"
+#include "sstuff.hh"
 
 /* Writes can be submitted and they are atomically accepted. Either the whole write
    ends up in the buffer or nothing ends up in the buffer.
 class CircularWriteBuffer
 {
 public:
-  explicit CircularWriteBuffer(int fd, size_t size) : d_fd(fd), d_buffer(size)
+  explicit CircularWriteBuffer(size_t size) : d_buffer(size)
   {
   }
 
-  void write(const std::string& str);
-  void flush();
+  bool hasRoomFor(const std::string& str) const;
+  bool write(const std::string& str);
+  bool flush(int fd);
 private:
-  int d_fd;
   boost::circular_buffer<char> d_buffer;
 };
 
@@ -100,12 +101,12 @@ private:
   bool reconnect();
   void maintenanceThread();
 
-  std::unique_ptr<CircularWriteBuffer> d_writer;
+  CircularWriteBuffer d_writer;
   ComboAddress d_remote;
   std::atomic<uint64_t> d_drops{0};
   std::atomic<uint64_t> d_queued{0};
   uint64_t d_maxQueuedBytes;
-  int d_socket{-1};
+  std::unique_ptr<Socket> d_socket{nullptr};
   uint16_t d_timeout;
   uint8_t d_reconnectWaitTime;
   std::atomic<bool> d_exiting{false};