]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/remote_logger.cc
rec: ensure correct service user on debian
[thirdparty/pdns.git] / pdns / remote_logger.cc
CommitLineData
aa7929a3 1#include <unistd.h>
519f5484 2#include "threadname.hh"
aa7929a3 3#include "remote_logger.hh"
da71b63b 4#include <sys/uio.h>
519f5484 5#ifdef HAVE_CONFIG_H
49cfc104 6#include "config.h"
519f5484 7#endif
49cfc104 8#ifdef PDNS_CONFIG_ARGS
9#include "logger.hh"
10#define WE_ARE_RECURSOR
11#else
12#include "dolog.hh"
13#endif
aa7929a3 14
da71b63b 15void CircularWriteBuffer::write(const std::string& str)
16{
17 if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
18 flush();
19
20 if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
21 throw std::runtime_error("Full!");
22
23 uint16_t len = htons(str.size());
24 char* ptr = (char*)&len;
25 d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
26 d_buffer.insert(d_buffer.end(), str.begin(), str.end());
27}
28
29void CircularWriteBuffer::flush()
30{
31 if(d_buffer.empty()) // not optional, we report EOF otherwise
32 return;
33
34 auto arr1 = d_buffer.array_one();
35 auto arr2 = d_buffer.array_two();
36
37 struct iovec iov[2];
38 int pos=0;
39 size_t total=0;
40 for(const auto& arr : {arr1, arr2}) {
41 if(arr.second) {
42 iov[pos].iov_base = arr.first;
43 iov[pos].iov_len = arr.second;
44 total += arr.second;
45 ++pos;
46 }
47 }
48
49 int res = writev(d_fd, iov, pos);
50 if(res < 0) {
51 throw std::runtime_error("Couldn't flush a thing: "+string(strerror(errno)));
52 }
53 if(!res) {
54 throw std::runtime_error("EOF");
55 }
56 // cout<<"Flushed "<<res<<" bytes out of " << total <<endl;
57 if((size_t)res == d_buffer.size())
58 d_buffer.clear();
59 else {
60 while(res--)
61 d_buffer.pop_front();
62 }
63}
64
65RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedBytes(maxQueuedBytes), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect)
66{
67 if (!d_asyncConnect) {
68 if(reconnect())
69 d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
70 }
71 d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
72}
73
aa7929a3
RG
74bool RemoteLogger::reconnect()
75{
76 if (d_socket >= 0) {
77 close(d_socket);
754f300f 78 d_socket = -1;
aa7929a3
RG
79 }
80 try {
81 d_socket = SSocket(d_remote.sin4.sin_family, SOCK_STREAM, 0);
aa7929a3 82 setNonBlocking(d_socket);
51959320 83 SConnectWithTimeout(d_socket, d_remote, d_timeout);
aa7929a3
RG
84 }
85 catch(const std::exception& e) {
49cfc104 86#ifdef WE_ARE_RECURSOR
e6a9dde5 87 g_log<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl;
49cfc104 88#else
89 warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what());
90#endif
aa7929a3
RG
91 return false;
92 }
93 return true;
94}
95
da71b63b 96void RemoteLogger::queueData(const std::string& data)
c2d1d4ba 97{
da71b63b 98 if(!d_writer) {
99 d_drops++;
100 return;
101 }
102 std::unique_lock<std::mutex> lock(d_mutex);
103 if(d_writer) {
104 try {
105 d_writer->write(data);
106 }
107 catch(std::exception& e) {
108 // cout << "Got exception writing: "<<e.what()<<endl;
109 d_drops++;
110 d_writer.reset();
111 close(d_socket);
112 d_socket = -1;
113 }
c2d1d4ba
RG
114 }
115}
116
da71b63b 117
118void RemoteLogger::maintenanceThread()
119try
aa7929a3 120{
77c9bc9a 121#ifdef WE_ARE_RECURSOR
c390b2da 122 string threadName = "pdns-r/remLog";
77c9bc9a
PL
123#else
124 string threadName = "dnsdist/remLog";
125#endif
519f5484 126 setThreadName(threadName);
aa7929a3 127
da71b63b 128 for(;;) {
129 if(d_exiting)
130 break;
c2d1d4ba 131
da71b63b 132 if(d_writer) {
133 std::unique_lock<std::mutex> lock(d_mutex);
134 if(d_writer) { // check if it is still set
135 // cout<<"Flush"<<endl;
136 try {
137 d_writer->flush();
138 }
139 catch(std::exception& e) {
140 // cout<<"Flush failed!"<<endl;
141 d_writer.reset();
142 close(d_socket);
143 d_socket = -1;
144 }
145 }
aa7929a3 146 }
da71b63b 147 else if(reconnect()) { // if it was zero, it will remain zero, we are the only ones setting it!
148 std::unique_lock<std::mutex> lock(d_mutex);
149 d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
aa7929a3 150 }
da71b63b 151 sleep(d_reconnectWaitTime);
aa7929a3 152 }
aa7929a3 153}
da71b63b 154catch(std::exception& e)
aa7929a3 155{
da71b63b 156 cerr<<"Thead died on: "<<e.what()<<endl;
aa7929a3
RG
157}
158
159RemoteLogger::~RemoteLogger()
160{
161 d_exiting = true;
754f300f 162 if (d_socket >= 0) {
aa7929a3 163 close(d_socket);
754f300f
RG
164 d_socket = -1;
165 }
da71b63b 166
aa7929a3
RG
167 d_thread.join();
168}