#include <boost/variant.hpp>
-#include "bpf-filter.hh"
#include "capabilities.hh"
#include "circular_buffer.hh"
#include "dnscrypt.hh"
if (res == fstrm_res_success) {
// Frame successfully queued.
+ ++d_framesSent;
} else if (res == fstrm_res_again) {
free(frame);
#ifdef RECURSOR
- g_log<<Logger::Warning<<"FrameStreamLogger: queue full, dropping."<<std::endl;
+ g_log<<Logger::Debug<<"FrameStreamLogger: queue full, dropping."<<std::endl;
#else
- warnlog("FrameStreamLogger: queue full, dropping.");
+ vinfolog("FrameStreamLogger: queue full, dropping.");
#endif
+ ++d_queueFullDrops;
} else {
// Permanent failure.
free(frame);
#else
warnlog("FrameStreamLogger: submitting to queue failed.");
#endif
+ ++d_permanentFailures;
}
}
{
public:
FrameStreamLogger(int family, const std::string& address, bool connect, const std::unordered_map<string,unsigned>& options = std::unordered_map<string,unsigned>());
- virtual ~FrameStreamLogger();
- virtual void queueData(const std::string& data) override;
- virtual std::string toString() const override
+ ~FrameStreamLogger();
+ void queueData(const std::string& data) override;
+ std::string toString() const override
{
- return "FrameStreamLogger to " + d_address;
+ return "FrameStreamLogger to " + d_address + " (" + std::to_string(d_framesSent) + " frames sent, " + std::to_string(d_queueFullDrops) + " dropped, " + std::to_string(d_permanentFailures) + " permanent failures)";
}
private:
struct fstrm_writer *d_writer{nullptr};
struct fstrm_iothr_options *d_iothropt{nullptr};
struct fstrm_iothr *d_iothr{nullptr};
+ std::atomic<uint64_t> d_framesSent{0};
+ std::atomic<uint64_t> d_queueFullDrops{0};
+ std::atomic<uint64_t> d_permanentFailures{0};
void cleanup();
};
throw std::runtime_error("EOF");
}
// cout<<"Flushed "<<res<<" bytes out of " << total <<endl;
- if((size_t)res == d_buffer.size())
+ if (static_cast<size_t>(res) == d_buffer.size()) {
d_buffer.clear();
+ }
else {
- while(res--)
+ while(res--) {
d_buffer.pop_front();
+ }
}
}
void RemoteLogger::queueData(const std::string& data)
{
if(!d_writer) {
- d_drops++;
+ ++d_drops;
return;
}
std::unique_lock<std::mutex> lock(d_mutex);
if(d_writer) {
try {
d_writer->write(data);
+ ++d_queued;
}
- catch(std::exception& e) {
+ catch(const std::exception& e) {
// cout << "Got exception writing: "<<e.what()<<endl;
- d_drops++;
+ ++d_drops;
d_writer.reset();
close(d_socket);
d_socket = -1;
sleep(d_reconnectWaitTime);
}
}
-catch(std::exception& e)
+catch(const std::exception& e)
{
cerr<<"Thead died on: "<<e.what()<<endl;
}
void queueData(const std::string& data) override;
std::string toString() const override
{
- return d_remote.toStringWithPort();
+ return d_remote.toStringWithPort() + " (" + std::to_string(d_queued) + " queued, " + std::to_string(d_drops) + " dropped)";
}
void stop()
{
d_exiting = true;
}
- std::atomic<uint32_t> d_drops{0};
private:
bool reconnect();
void maintenanceThread();
+ std::unique_ptr<CircularWriteBuffer> d_writer;
ComboAddress d_remote;
+ std::atomic<uint64_t> d_drops{0};
+ std::atomic<uint64_t> d_queued{0};
uint64_t d_maxQueuedBytes;
int d_socket{-1};
- std::unique_ptr<CircularWriteBuffer> d_writer;
uint16_t d_timeout;
uint8_t d_reconnectWaitTime;
std::atomic<bool> d_exiting{false};