]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
Move the remote logger class to LockGuarded
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 16 Apr 2021 13:39:18 +0000 (15:39 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 17 Aug 2021 12:04:45 +0000 (14:04 +0200)
pdns/remote_logger.cc
pdns/remote_logger.hh

index f5d9e47a4e7e1f8737150bc012aa3c0858b66c04..736091118fcf9ab614a4ab66d2031e80b133e7dc 100644 (file)
@@ -97,7 +97,7 @@ bool CircularWriteBuffer::flush(int fd)
   return true;
 }
 
-RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_writer(maxQueuedBytes), d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect)
+RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_runtime({CircularWriteBuffer(maxQueuedBytes), nullptr})
 {
   if (!d_asyncConnect) {
     reconnect();
@@ -116,8 +116,8 @@ bool RemoteLogger::reconnect()
     {
       /* we are now successfully connected, time to take the lock and update the
          socket */
-      std::lock_guard<std::mutex> lock(d_mutex);
-      d_socket = std::move(newSock);
+      auto runtime = d_runtime.lock();
+      runtime->d_socket = std::move(newSock);
     }
   }
   catch (const std::exception& e) {
@@ -138,24 +138,24 @@ void RemoteLogger::queueData(const std::string& data)
     throw std::runtime_error("Got a request to write an object of size " + std::to_string(data.size()));
   }
 
-  std::lock_guard<std::mutex> lock(d_mutex);
+  auto runtime = d_runtime.lock();
 
-  if (!d_writer.hasRoomFor(data)) {
+  if (!runtime->d_writer.hasRoomFor(data)) {
     /* not connected, queue is full, just drop */
-    if (!d_socket) {
+    if (!runtime->d_socket) {
       ++d_drops;
       return;
     }
     try {
       /* we try to flush some data */
-      if (!d_writer.flush(d_socket->getHandle())) {
+      if (!runtime->d_writer.flush(runtime->d_socket->getHandle())) {
         /* but failed, let's just drop */
         ++d_drops;
         return;
       }
 
       /* see if we freed enough data */
-      if (!d_writer.hasRoomFor(data)) {
+      if (!runtime->d_writer.hasRoomFor(data)) {
         /* we didn't */
         ++d_drops;
         return;
@@ -164,12 +164,12 @@ void RemoteLogger::queueData(const std::string& data)
     catch(const std::exception& e) {
       //      cout << "Got exception writing: "<<e.what()<<endl;
       ++d_drops;
-      d_socket.reset();
+      runtime->d_socket.reset();
       return;
     }
   }
 
-  d_writer.write(data);
+  runtime->d_writer.write(data);
   ++d_queued;
 }
 
@@ -189,7 +189,7 @@ void RemoteLogger::maintenanceThread()
       }
 
       bool connected = true;
-      if (d_socket == nullptr) {
+      if (d_runtime.lock()->d_socket == nullptr) {
         // if it was unset, it will remain so, we are the only ones setting it!
         connected = reconnect();
       }
@@ -198,19 +198,19 @@ void RemoteLogger::maintenanceThread()
       if (connected) {
         try {
           /* we don't want to take the lock while trying to reconnect */
-          std::lock_guard<std::mutex> lock(d_mutex);
-          if (d_socket) { // check if it is set
+          auto runtime = d_runtime.lock();
+          if (runtime->d_socket) { // check if it is set
             /* if flush() returns false, it means that we couldn't flush anything yet
                either because there is nothing to flush, or because the outgoing TCP
                buffer is full. That's fine by us */
-            d_writer.flush(d_socket->getHandle());
+            runtime->d_writer.flush(runtime->d_socket->getHandle());
           }
           else {
             connected = false;
           }
         }
-        catch(const std::exception& e) {
-          d_socket.reset();
+        catch (const std::exception& e) {
+          d_runtime.lock()->d_socket.reset();
           connected = false;
         }
 
index d6cf953c63b1f197d269148201b08a713f6521ff..c00e882f228f6a78b8f76bf8a373c83afbe025eb 100644 (file)
 
 #include <atomic>
 #include <queue>
-#include <mutex>
 #include <thread>
 
 #include "iputils.hh"
 #include "circular_buffer.hh"
+#include "lock.hh"
 #include "sstuff.hh"
 
 /* Writes can be submitted and they are atomically accepted. Either the whole write
@@ -101,16 +101,20 @@ private:
   bool reconnect();
   void maintenanceThread();
 
-  CircularWriteBuffer d_writer;
+  struct RuntimeData
+  {
+    CircularWriteBuffer d_writer;
+    std::unique_ptr<Socket> d_socket{nullptr};
+  };
+
   ComboAddress d_remote;
   std::atomic<uint64_t> d_drops{0};
   std::atomic<uint64_t> d_queued{0};
-  std::unique_ptr<Socket> d_socket{nullptr};
   uint16_t d_timeout;
   uint8_t d_reconnectWaitTime;
   std::atomic<bool> d_exiting{false};
   bool d_asyncConnect{false};
 
-  std::mutex d_mutex;
+  LockGuarded<RuntimeData> d_runtime;
   std::thread d_thread;
 };