]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - pdns/remote_logger.hh
auth: switch circleci mssql image
[thirdparty/pdns.git] / pdns / remote_logger.hh
index 3eb2aa575711f0f23fa4ef645ce706a0171f8378..95fd634def3c69b2f27d305abccbd4c65631cbf7 100644 (file)
 #include <thread>
 
 #include "iputils.hh"
+#include <boost/circular_buffer.hpp>
+
+/* Writes can be submitted and they are atomically accepted. Either the whole write
+   ends up in the buffer or nothing ends up in the buffer.
+   In case nothing ends up in the buffer, an exception is thrown.
+   Similarly, EOF leads to this treatment
+
+   The filedescriptor can be in non-blocking mode.
+
+   This class is not threadsafe.
+*/
+
+class CircularWriteBuffer
+{
+public:
+  explicit CircularWriteBuffer(int fd, size_t size) : d_fd(fd), d_buffer(size)
+  {
+  }
+
+  void write(const std::string& str);
+  void flush();
+private:
+  int d_fd;
+  boost::circular_buffer<char> d_buffer;
+};
 
 class RemoteLoggerInterface
 {
@@ -39,35 +64,42 @@ public:
   virtual std::string toString() const = 0;
 };
 
+/* Thread safe. Will connect asynchronously on request.
+   Runs a reconnection thread that also periodicall flushes.
+   Note that the buffer only runs as long as there is a connection.
+   If there is no connection we don't buffer a thing
+*/
 class RemoteLogger : public RemoteLoggerInterface
 {
 public:
-  RemoteLogger(const ComboAddress& remote, uint16_t timeout=2, uint64_t maxQueuedEntries=100, uint8_t reconnectWaitTime=1, bool asyncConnect=false);
-  virtual ~RemoteLogger();
-  virtual void queueData(const std::string& data) override;
-  virtual std::string toString() const override
+  RemoteLogger(const ComboAddress& remote, uint16_t timeout=2,
+               uint64_t maxQueuedBytes=100000,
+               uint8_t reconnectWaitTime=1,
+               bool asyncConnect=false);
+  ~RemoteLogger();
+  void queueData(const std::string& data) override;
+  std::string toString() const override
   {
-    return "RemoteLogger to " + d_remote.toStringWithPort();
+    return d_remote.toStringWithPort();
   }
   void stop()
   {
     d_exiting = true;
   }
+  std::atomic<uint32_t> d_drops{0};
 private:
-  void busyReconnectLoop();
   bool reconnect();
-  void worker();
+  void maintenanceThread();
 
-  std::queue<std::string> d_writeQueue;
-  std::mutex d_writeMutex;
-  std::condition_variable d_queueCond;
   ComboAddress d_remote;
-  uint64_t d_maxQueuedEntries;
+  uint64_t d_maxQueuedBytes;
   int d_socket{-1};
+  std::unique_ptr<CircularWriteBuffer> d_writer;
   uint16_t d_timeout;
   uint8_t d_reconnectWaitTime;
   std::atomic<bool> d_exiting{false};
+
   bool d_asyncConnect{false};
-  bool d_connected{false};
   std::thread d_thread;
+  std::mutex d_mutex;
 };