]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/remote_logger.cc
rec: ensure correct service user on debian
[thirdparty/pdns.git] / pdns / remote_logger.cc
1 #include <unistd.h>
2 #include "threadname.hh"
3 #include "remote_logger.hh"
4 #include <sys/uio.h>
5 #ifdef HAVE_CONFIG_H
6 #include "config.h"
7 #endif
8 #ifdef PDNS_CONFIG_ARGS
9 #include "logger.hh"
10 #define WE_ARE_RECURSOR
11 #else
12 #include "dolog.hh"
13 #endif
14
15 void CircularWriteBuffer::write(const std::string& str)
16 {
17 if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
18 flush();
19
20 if(d_buffer.size() + 2 + str.size() > d_buffer.capacity())
21 throw std::runtime_error("Full!");
22
23 uint16_t len = htons(str.size());
24 char* ptr = (char*)&len;
25 d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
26 d_buffer.insert(d_buffer.end(), str.begin(), str.end());
27 }
28
29 void CircularWriteBuffer::flush()
30 {
31 if(d_buffer.empty()) // not optional, we report EOF otherwise
32 return;
33
34 auto arr1 = d_buffer.array_one();
35 auto arr2 = d_buffer.array_two();
36
37 struct iovec iov[2];
38 int pos=0;
39 size_t total=0;
40 for(const auto& arr : {arr1, arr2}) {
41 if(arr.second) {
42 iov[pos].iov_base = arr.first;
43 iov[pos].iov_len = arr.second;
44 total += arr.second;
45 ++pos;
46 }
47 }
48
49 int res = writev(d_fd, iov, pos);
50 if(res < 0) {
51 throw std::runtime_error("Couldn't flush a thing: "+string(strerror(errno)));
52 }
53 if(!res) {
54 throw std::runtime_error("EOF");
55 }
56 // cout<<"Flushed "<<res<<" bytes out of " << total <<endl;
57 if((size_t)res == d_buffer.size())
58 d_buffer.clear();
59 else {
60 while(res--)
61 d_buffer.pop_front();
62 }
63 }
64
65 RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedBytes(maxQueuedBytes), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect)
66 {
67 if (!d_asyncConnect) {
68 if(reconnect())
69 d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
70 }
71 d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
72 }
73
74 bool RemoteLogger::reconnect()
75 {
76 if (d_socket >= 0) {
77 close(d_socket);
78 d_socket = -1;
79 }
80 try {
81 d_socket = SSocket(d_remote.sin4.sin_family, SOCK_STREAM, 0);
82 setNonBlocking(d_socket);
83 SConnectWithTimeout(d_socket, d_remote, d_timeout);
84 }
85 catch(const std::exception& e) {
86 #ifdef WE_ARE_RECURSOR
87 g_log<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl;
88 #else
89 warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what());
90 #endif
91 return false;
92 }
93 return true;
94 }
95
96 void RemoteLogger::queueData(const std::string& data)
97 {
98 if(!d_writer) {
99 d_drops++;
100 return;
101 }
102 std::unique_lock<std::mutex> lock(d_mutex);
103 if(d_writer) {
104 try {
105 d_writer->write(data);
106 }
107 catch(std::exception& e) {
108 // cout << "Got exception writing: "<<e.what()<<endl;
109 d_drops++;
110 d_writer.reset();
111 close(d_socket);
112 d_socket = -1;
113 }
114 }
115 }
116
117
118 void RemoteLogger::maintenanceThread()
119 try
120 {
121 #ifdef WE_ARE_RECURSOR
122 string threadName = "pdns-r/remLog";
123 #else
124 string threadName = "dnsdist/remLog";
125 #endif
126 setThreadName(threadName);
127
128 for(;;) {
129 if(d_exiting)
130 break;
131
132 if(d_writer) {
133 std::unique_lock<std::mutex> lock(d_mutex);
134 if(d_writer) { // check if it is still set
135 // cout<<"Flush"<<endl;
136 try {
137 d_writer->flush();
138 }
139 catch(std::exception& e) {
140 // cout<<"Flush failed!"<<endl;
141 d_writer.reset();
142 close(d_socket);
143 d_socket = -1;
144 }
145 }
146 }
147 else if(reconnect()) { // if it was zero, it will remain zero, we are the only ones setting it!
148 std::unique_lock<std::mutex> lock(d_mutex);
149 d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes);
150 }
151 sleep(d_reconnectWaitTime);
152 }
153 }
154 catch(std::exception& e)
155 {
156 cerr<<"Thead died on: "<<e.what()<<endl;
157 }
158
159 RemoteLogger::~RemoteLogger()
160 {
161 d_exiting = true;
162 if (d_socket >= 0) {
163 close(d_socket);
164 d_socket = -1;
165 }
166
167 d_thread.join();
168 }