]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - pdns/remote_logger.cc
Merge pull request #8223 from PowerDNS/omoerbeek-patch-1
[thirdparty/pdns.git] / pdns / remote_logger.cc
index 6c472e7745cd3fb85058db5c5a7c51fa8e605b5d..8863fa1954faa27db7c57a418387a232086dd682 100644 (file)
@@ -1,6 +1,10 @@
 #include <unistd.h>
+#include "threadname.hh"
 #include "remote_logger.hh"
+#include <sys/uio.h>
+#ifdef HAVE_CONFIG_H
 #include "config.h"
+#endif
 #ifdef PDNS_CONFIG_ARGS
 #include "logger.hh"
 #define WE_ARE_RECURSOR
 #include "dolog.hh"
 #endif
 
+void CircularWriteBuffer::write(const std::string& str)
+{
+  if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
+    flush();
+
+  if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
+    throw std::runtime_error("Full!");
+
+  uint16_t len = htons(str.size());
+  char* ptr = (char*)&len;
+  d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
+  d_buffer.insert(d_buffer.end(), str.begin(), str.end());
+}
+
+void CircularWriteBuffer::flush()
+{
+  if(d_buffer.empty()) // not optional, we report EOF otherwise
+    return;
+
+  auto arr1 = d_buffer.array_one();
+  auto arr2 = d_buffer.array_two();
+
+  struct iovec iov[2];
+  int pos=0;
+  size_t total=0;
+  for(const auto& arr : {arr1, arr2}) {
+    if(arr.second) {
+      iov[pos].iov_base = arr.first;
+      iov[pos].iov_len = arr.second;
+      total += arr.second;
+      ++pos;
+    }
+  }
+
+  int res = writev(d_fd, iov, pos);
+  if(res < 0) {
+    throw std::runtime_error("Couldn't flush a thing: "+string(strerror(errno)));
+  }
+  if(!res) {
+    throw std::runtime_error("EOF");
+  }
+  //  cout<<"Flushed "<<res<<" bytes out of " << total <<endl;
+  if((size_t)res == d_buffer.size())
+    d_buffer.clear();
+  else {
+    while(res--)
+      d_buffer.pop_front();
+  }
+}
+
+RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedBytes(maxQueuedBytes), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect)
+{
+  if (!d_asyncConnect) {
+    if(reconnect())
+      d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
+  }
+  d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
+}
+
 bool RemoteLogger::reconnect()
 {
   if (d_socket >= 0) {
     close(d_socket);
     d_socket = -1;
   }
-  d_connected = false;
   try {
     d_socket = SSocket(d_remote.sin4.sin_family, SOCK_STREAM, 0);
     setNonBlocking(d_socket);
@@ -22,73 +84,76 @@ bool RemoteLogger::reconnect()
   }
   catch(const std::exception& e) {
 #ifdef WE_ARE_RECURSOR
-    L<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl;
+    g_log<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl;
 #else
     warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what());
 #endif
     return false;
   }
-  d_connected = true;
   return true;
 }
 
-void RemoteLogger::busyReconnectLoop()
+void RemoteLogger::queueData(const std::string& data)
 {
-  while (!reconnect()) {
-    sleep(d_reconnectWaitTime);
+  if(!d_writer) {
+    d_drops++;
+    return;
   }
-}
-
-void RemoteLogger::worker()
-{
-  while(true) {
-    std::string data;
-    {
-      std::unique_lock<std::mutex> lock(d_writeMutex);
-      d_queueCond.wait(lock, [this]{return (!d_writeQueue.empty()) || d_exiting;});
-      if (d_exiting) {
-        return;
-      }
-      data = d_writeQueue.front();
-      d_writeQueue.pop();
-    }
-
-    if (!d_connected) {
-      busyReconnectLoop();
-    }
-
+  std::unique_lock<std::mutex> lock(d_mutex);
+  if(d_writer) {
     try {
-      uint16_t len = static_cast<uint16_t>(data.length());
-      sendSizeAndMsgWithTimeout(d_socket, len, data.c_str(), static_cast<int>(d_timeout), nullptr, nullptr, 0, 0, 0);
+      d_writer->write(data);
     }
-    catch(const std::runtime_error& e) {
-#ifdef WE_ARE_RECURSOR
-      L<<Logger::Info<<"Error sending data to remote logger "<<d_remote.toStringWithPort()<<": "<< e.what()<<endl;
-#else
-      vinfolog("Error sending data to remote logger (%s): %s", d_remote.toStringWithPort(), e.what());
-#endif
-      busyReconnectLoop();
+    catch(std::exception& e) {
+      //      cout << "Got exception writing: "<<e.what()<<endl;
+      d_drops++;
+      d_writer.reset();
+      close(d_socket);
+      d_socket = -1;
     }
   }
 }
 
-void RemoteLogger::queueData(const std::string& data)
+
+void RemoteLogger::maintenanceThread()
+try
 {
-  {
-    std::unique_lock<std::mutex> lock(d_writeMutex);
-    if (d_writeQueue.size() >= d_maxQueuedEntries) {
-      d_writeQueue.pop();
+#ifdef WE_ARE_RECURSOR
+  string threadName = "pdns-r/remLog";
+#else
+  string threadName = "dnsdist/remLog";
+#endif
+  setThreadName(threadName);
+
+  for(;;) {
+    if(d_exiting)
+      break;
+
+    if(d_writer) {
+      std::unique_lock<std::mutex> lock(d_mutex);
+      if(d_writer) { // check if it is still set
+        //        cout<<"Flush"<<endl;
+        try {
+          d_writer->flush();
+        }
+        catch(std::exception& e) {
+          //          cout<<"Flush failed!"<<endl;
+          d_writer.reset();
+          close(d_socket);
+          d_socket = -1;
+        }
+      }
     }
-    d_writeQueue.push(data);
+    else if(reconnect()) { // if it was zero, it will remain zero, we are the only ones setting it!
+      std::unique_lock<std::mutex> lock(d_mutex);
+      d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
+    }
+    sleep(d_reconnectWaitTime);
   }
-  d_queueCond.notify_one();
 }
-
-RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedEntries, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedEntries(maxQueuedEntries), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_thread(&RemoteLogger::worker, this)
+catch(std::exception& e)
 {
-  if (!d_asyncConnect) {
-    reconnect();
-  }
+  cerr<<"Thead died on: "<<e.what()<<endl;
 }
 
 RemoteLogger::~RemoteLogger()
@@ -97,8 +162,7 @@ RemoteLogger::~RemoteLogger()
   if (d_socket >= 0) {
     close(d_socket);
     d_socket = -1;
-    d_connected = false;
   }
-  d_queueCond.notify_one();
+
   d_thread.join();
 }