#include <thread>
#include "iputils.hh"
+#include <boost/circular_buffer.hpp>
+
+/* Writes can be submitted and they are atomically accepted. Either the whole write
+ ends up in the buffer or nothing ends up in the buffer.
+ In case nothing ends up in the buffer, an exception is thrown.
+ Similarly, EOF leads to this treatment
+
+ The filedescriptor can be in non-blocking mode.
+
+ This class is not threadsafe.
+*/
+
+class CircularWriteBuffer
+{
+public:
+ explicit CircularWriteBuffer(int fd, size_t size) : d_fd(fd), d_buffer(size)
+ {
+ }
+
+ void write(const std::string& str);
+ void flush();
+private:
+ int d_fd;
+ boost::circular_buffer<char> d_buffer;
+};
class RemoteLoggerInterface
{
public:
virtual ~RemoteLoggerInterface() {};
virtual void queueData(const std::string& data) = 0;
- virtual std::string toString() = 0;
+ virtual std::string toString() const = 0;
};
+/* Thread safe. Will connect asynchronously on request.
+ Runs a reconnection thread that also periodicall flushes.
+ Note that the buffer only runs as long as there is a connection.
+ If there is no connection we don't buffer a thing
+*/
class RemoteLogger : public RemoteLoggerInterface
{
public:
- RemoteLogger(const ComboAddress& remote, uint16_t timeout=2, uint64_t maxQueuedEntries=100, uint8_t reconnectWaitTime=1, bool asyncConnect=false);
- virtual ~RemoteLogger();
- virtual void queueData(const std::string& data) override;
- virtual std::string toString() override
+ RemoteLogger(const ComboAddress& remote, uint16_t timeout=2,
+ uint64_t maxQueuedBytes=100000,
+ uint8_t reconnectWaitTime=1,
+ bool asyncConnect=false);
+ ~RemoteLogger();
+ void queueData(const std::string& data) override;
+ std::string toString() const override
{
- return "RemoteLogger to " + d_remote.toStringWithPort();
+ return d_remote.toStringWithPort();
}
+ void stop()
+ {
+ d_exiting = true;
+ }
+ std::atomic<uint32_t> d_drops{0};
private:
- void busyReconnectLoop();
bool reconnect();
- void worker();
+ void maintenanceThread();
- std::queue<std::string> d_writeQueue;
- std::mutex d_writeMutex;
- std::condition_variable d_queueCond;
ComboAddress d_remote;
- uint64_t d_maxQueuedEntries;
+ uint64_t d_maxQueuedBytes;
int d_socket{-1};
+ std::unique_ptr<CircularWriteBuffer> d_writer;
uint16_t d_timeout;
uint8_t d_reconnectWaitTime;
std::atomic<bool> d_exiting{false};
+
bool d_asyncConnect{false};
- bool d_connected{false};
std::thread d_thread;
+ std::mutex d_mutex;
};