]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/remote_logger.cc
Merge pull request #14032 from rgacogne/ddist-192-changelog-secpoll
[thirdparty/pdns.git] / pdns / remote_logger.cc
CommitLineData
aa7929a3 1#include <unistd.h>
519f5484 2#include "threadname.hh"
aa7929a3 3#include "remote_logger.hh"
da71b63b 4#include <sys/uio.h>
519f5484 5#ifdef HAVE_CONFIG_H
49cfc104 6#include "config.h"
519f5484 7#endif
49cfc104 8#ifdef PDNS_CONFIG_ARGS
9#include "logger.hh"
10#define WE_ARE_RECURSOR
11#else
12#include "dolog.hh"
13#endif
43f91cad 14#include "logging.hh"
aa7929a3 15
ebf99525 16bool CircularWriteBuffer::hasRoomFor(const std::string& str) const
da71b63b 17{
ebf99525
RG
18 if (d_buffer.size() + 2 + str.size() > d_buffer.capacity()) {
19 return false;
20 }
da71b63b 21
ebf99525
RG
22 return true;
23}
24
25bool CircularWriteBuffer::write(const std::string& str)
26{
87f46425 27 if (str.size() > std::numeric_limits<uint16_t>::max() || !hasRoomFor(str)) {
ebf99525
RG
28 return false;
29 }
da71b63b 30
31 uint16_t len = htons(str.size());
ebf99525 32 const char* ptr = reinterpret_cast<const char*>(&len);
da71b63b 33 d_buffer.insert(d_buffer.end(), ptr, ptr + 2);
34 d_buffer.insert(d_buffer.end(), str.begin(), str.end());
ebf99525
RG
35
36 return true;
da71b63b 37}
38
ebf99525 39bool CircularWriteBuffer::flush(int fd)
da71b63b 40{
ebf99525
RG
41 if (d_buffer.empty()) {
42 // not optional, we report EOF otherwise
43 return false;
44 }
da71b63b 45
46 auto arr1 = d_buffer.array_one();
47 auto arr2 = d_buffer.array_two();
48
49 struct iovec iov[2];
ebf99525 50 int pos = 0;
da71b63b 51 for(const auto& arr : {arr1, arr2}) {
52 if(arr.second) {
53 iov[pos].iov_base = arr.first;
54 iov[pos].iov_len = arr.second;
da71b63b 55 ++pos;
56 }
57 }
58
ebf99525
RG
59 ssize_t res = 0;
60 do {
61 res = writev(fd, iov, pos);
62
63 if (res < 0) {
64 if (errno == EINTR) {
65 continue;
66 }
67
68 if (errno == EAGAIN || errno == EWOULDBLOCK) {
69 return false;
70 }
71
72 /* we can't be sure we haven't sent a partial message,
73 and we don't want to send the remaining part after reconnecting */
74 d_buffer.clear();
75 throw std::runtime_error("Couldn't flush a thing: " + stringerror());
76 }
77 else if (!res) {
78 /* we can't be sure we haven't sent a partial message,
79 and we don't want to send the remaining part after reconnecting */
80 d_buffer.clear();
81 throw std::runtime_error("EOF");
82 }
da71b63b 83 }
ebf99525
RG
84 while (res < 0);
85
414a03e1 86 if (static_cast<size_t>(res) == d_buffer.size()) {
da71b63b 87 d_buffer.clear();
414a03e1 88 }
da71b63b 89 else {
ebf99525 90 while (res--) {
da71b63b 91 d_buffer.pop_front();
414a03e1 92 }
da71b63b 93 }
ebf99525
RG
94
95 return true;
da71b63b 96}
97
74b9e43d
OM
98const std::string& RemoteLoggerInterface::toErrorString(Result r)
99{
100 static const std::array<std::string,5> str = {
101 "Queued",
102 "Queue full, dropping",
103 "Not sending too large protobuf message",
104 "Submiting to queue failed",
105 "?"
106 };
107 auto i = static_cast<unsigned int>(r);
108 return str[std::min(i, 4U)];
109}
110
6b4670e7 111RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_runtime({CircularWriteBuffer(maxQueuedBytes), nullptr})
da71b63b 112{
113 if (!d_asyncConnect) {
ebf99525 114 reconnect();
da71b63b 115 }
ebf99525 116
da71b63b 117 d_thread = std::thread(&RemoteLogger::maintenanceThread, this);
118}
119
aa7929a3
RG
120bool RemoteLogger::reconnect()
121{
aa7929a3 122 try {
ebf99525
RG
123 auto newSock = make_unique<Socket>(d_remote.sin4.sin_family, SOCK_STREAM, 0);
124 newSock->setNonBlocking();
125 newSock->connect(d_remote, d_timeout);
126
127 {
128 /* we are now successfully connected, time to take the lock and update the
129 socket */
6b4670e7
RG
130 auto runtime = d_runtime.lock();
131 runtime->d_socket = std::move(newSock);
ebf99525 132 }
aa7929a3 133 }
ebf99525 134 catch (const std::exception& e) {
49cfc104 135#ifdef WE_ARE_RECURSOR
62b191dc 136 SLOG(g_log<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl,
84d24234 137 g_slog->withName("protobuf")->error(Logr::Error, e.what(), "Exception while connecting to remote logger", "address", Logging::Loggable(d_remote)));
49cfc104 138#else
139 warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what());
140#endif
ebf99525 141
aa7929a3
RG
142 return false;
143 }
144 return true;
145}
146
4d7db3d7 147RemoteLoggerInterface::Result RemoteLogger::queueData(const std::string& data)
c2d1d4ba 148{
2b7a8236
OM
149 auto runtime = d_runtime.lock();
150
87f46425 151 if (data.size() > std::numeric_limits<uint16_t>::max()) {
2b7a8236 152 ++runtime->d_stats.d_tooLarge;
4d7db3d7 153 return Result::TooLarge;
87f46425
RG
154 }
155
6b4670e7 156 if (!runtime->d_writer.hasRoomFor(data)) {
ebf99525 157 /* not connected, queue is full, just drop */
6b4670e7 158 if (!runtime->d_socket) {
2b7a8236 159 ++runtime->d_stats.d_pipeFull;
4d7db3d7 160 return Result::PipeFull;
ebf99525 161 }
da71b63b 162 try {
ebf99525 163 /* we try to flush some data */
6b4670e7 164 if (!runtime->d_writer.flush(runtime->d_socket->getHandle())) {
ebf99525 165 /* but failed, let's just drop */
2b7a8236 166 ++runtime->d_stats.d_pipeFull;
4d7db3d7 167 return Result::PipeFull;
ebf99525
RG
168 }
169
170 /* see if we freed enough data */
6b4670e7 171 if (!runtime->d_writer.hasRoomFor(data)) {
ebf99525 172 /* we didn't */
2b7a8236 173 ++runtime->d_stats.d_pipeFull;
4d7db3d7 174 return Result::PipeFull;
ebf99525 175 }
da71b63b 176 }
414a03e1 177 catch(const std::exception& e) {
da71b63b 178 // cout << "Got exception writing: "<<e.what()<<endl;
6b4670e7 179 runtime->d_socket.reset();
2b7a8236
OM
180 ++runtime->d_stats.d_otherError;
181 return Result::OtherError;
da71b63b 182 }
c2d1d4ba 183 }
c2d1d4ba 184
6b4670e7 185 runtime->d_writer.write(data);
2b7a8236 186 ++runtime->d_stats.d_queued;
4d7db3d7 187 return Result::Queued;
ebf99525 188}
da71b63b 189
02d6face 190void RemoteLogger::maintenanceThread()
aa7929a3 191{
02d6face 192 try {
77c9bc9a 193#ifdef WE_ARE_RECURSOR
9548a664 194 string threadName = "rec/remlog";
77c9bc9a 195#else
02d6face 196 string threadName = "dnsdist/remLog";
77c9bc9a 197#endif
02d6face 198 setThreadName(threadName);
aa7929a3 199
02d6face
RG
200 for (;;) {
201 if (d_exiting) {
202 break;
203 }
c2d1d4ba 204
02d6face 205 bool connected = true;
6b4670e7 206 if (d_runtime.lock()->d_socket == nullptr) {
02d6face
RG
207 // if it was unset, it will remain so, we are the only ones setting it!
208 connected = reconnect();
209 }
ebf99525 210
02d6face
RG
211 /* we will just go to sleep if the reconnection just failed */
212 if (connected) {
213 try {
214 /* we don't want to take the lock while trying to reconnect */
6b4670e7
RG
215 auto runtime = d_runtime.lock();
216 if (runtime->d_socket) { // check if it is set
02d6face
RG
217 /* if flush() returns false, it means that we couldn't flush anything yet
218 either because there is nothing to flush, or because the outgoing TCP
219 buffer is full. That's fine by us */
6b4670e7 220 runtime->d_writer.flush(runtime->d_socket->getHandle());
02d6face
RG
221 }
222 else {
223 connected = false;
224 }
da71b63b 225 }
6b4670e7
RG
226 catch (const std::exception& e) {
227 d_runtime.lock()->d_socket.reset();
ebf99525 228 connected = false;
da71b63b 229 }
ebf99525 230
02d6face
RG
231 if (!connected) {
232 /* let's try to reconnect right away, we are about to sleep anyway */
233 reconnect();
234 }
ebf99525 235 }
ebf99525 236
02d6face
RG
237 sleep(d_reconnectWaitTime);
238 }
239 }
240 catch (const std::exception& e)
241 {
70099a45
OM
242 SLOG(cerr << "Remote Logger's maintenance thread died on: " << e.what() << endl,
243 g_slog->withName("protobuf")->error(Logr::Error, e.what(), "Remote Logger's maintenance thread died"));
02d6face
RG
244 }
245 catch (...) {
70099a45
OM
246 SLOG(cerr << "Remote Logger's maintenance thread died on unknown exception" << endl,
247 g_slog->withName("protobuf")->info(Logr::Error, "Remote Logger's maintenance thread died"));
aa7929a3 248 }
aa7929a3
RG
249}
250
251RemoteLogger::~RemoteLogger()
252{
253 d_exiting = true;
da71b63b 254
aa7929a3
RG
255 d_thread.join();
256}
74b9e43d 257