return true;
}
-RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_writer(maxQueuedBytes), d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect)
+RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_runtime({CircularWriteBuffer(maxQueuedBytes), nullptr})
{
if (!d_asyncConnect) {
reconnect();
{
/* we are now successfully connected, time to take the lock and update the
socket */
- std::lock_guard<std::mutex> lock(d_mutex);
- d_socket = std::move(newSock);
+ auto runtime = d_runtime.lock();
+ runtime->d_socket = std::move(newSock);
}
}
catch (const std::exception& e) {
throw std::runtime_error("Got a request to write an object of size " + std::to_string(data.size()));
}
- std::lock_guard<std::mutex> lock(d_mutex);
+ auto runtime = d_runtime.lock();
- if (!d_writer.hasRoomFor(data)) {
+ if (!runtime->d_writer.hasRoomFor(data)) {
/* not connected, queue is full, just drop */
- if (!d_socket) {
+ if (!runtime->d_socket) {
++d_drops;
return;
}
try {
/* we try to flush some data */
- if (!d_writer.flush(d_socket->getHandle())) {
+ if (!runtime->d_writer.flush(runtime->d_socket->getHandle())) {
/* but failed, let's just drop */
++d_drops;
return;
}
/* see if we freed enough data */
- if (!d_writer.hasRoomFor(data)) {
+ if (!runtime->d_writer.hasRoomFor(data)) {
/* we didn't */
++d_drops;
return;
catch(const std::exception& e) {
// cout << "Got exception writing: "<<e.what()<<endl;
++d_drops;
- d_socket.reset();
+ runtime->d_socket.reset();
return;
}
}
- d_writer.write(data);
+ runtime->d_writer.write(data);
++d_queued;
}
}
bool connected = true;
- if (d_socket == nullptr) {
+ if (d_runtime.lock()->d_socket == nullptr) {
// if it was unset, it will remain so, we are the only ones setting it!
connected = reconnect();
}
if (connected) {
try {
/* we don't want to take the lock while trying to reconnect */
- std::lock_guard<std::mutex> lock(d_mutex);
- if (d_socket) { // check if it is set
+ auto runtime = d_runtime.lock();
+ if (runtime->d_socket) { // check if it is set
/* if flush() returns false, it means that we couldn't flush anything yet
either because there is nothing to flush, or because the outgoing TCP
buffer is full. That's fine by us */
- d_writer.flush(d_socket->getHandle());
+ runtime->d_writer.flush(runtime->d_socket->getHandle());
}
else {
connected = false;
}
}
- catch(const std::exception& e) {
- d_socket.reset();
+ catch (const std::exception& e) {
+ d_runtime.lock()->d_socket.reset();
connected = false;
}
#include <atomic>
#include <queue>
-#include <mutex>
#include <thread>
#include "iputils.hh"
#include "circular_buffer.hh"
+#include "lock.hh"
#include "sstuff.hh"
/* Writes can be submitted and they are atomically accepted. Either the whole write
bool reconnect();
void maintenanceThread();
- CircularWriteBuffer d_writer;
+ struct RuntimeData
+ {
+ CircularWriteBuffer d_writer;
+ std::unique_ptr<Socket> d_socket{nullptr};
+ };
+
ComboAddress d_remote;
std::atomic<uint64_t> d_drops{0};
std::atomic<uint64_t> d_queued{0};
- std::unique_ptr<Socket> d_socket{nullptr};
uint16_t d_timeout;
uint8_t d_reconnectWaitTime;
std::atomic<bool> d_exiting{false};
bool d_asyncConnect{false};
- std::mutex d_mutex;
+ LockGuarded<RuntimeData> d_runtime;
std::thread d_thread;
};