]>
Commit | Line | Data |
---|---|---|
aa7929a3 | 1 | #include <unistd.h> |
519f5484 | 2 | #include "threadname.hh" |
aa7929a3 | 3 | #include "remote_logger.hh" |
da71b63b | 4 | #include <sys/uio.h> |
519f5484 | 5 | #ifdef HAVE_CONFIG_H |
49cfc104 | 6 | #include "config.h" |
519f5484 | 7 | #endif |
49cfc104 | 8 | #ifdef PDNS_CONFIG_ARGS |
9 | #include "logger.hh" | |
10 | #define WE_ARE_RECURSOR | |
11 | #else | |
12 | #include "dolog.hh" | |
13 | #endif | |
43f91cad | 14 | #include "logging.hh" |
aa7929a3 | 15 | |
ebf99525 | 16 | bool CircularWriteBuffer::hasRoomFor(const std::string& str) const |
da71b63b | 17 | { |
ebf99525 RG |
18 | if (d_buffer.size() + 2 + str.size() > d_buffer.capacity()) { |
19 | return false; | |
20 | } | |
da71b63b | 21 | |
ebf99525 RG |
22 | return true; |
23 | } | |
24 | ||
25 | bool CircularWriteBuffer::write(const std::string& str) | |
26 | { | |
87f46425 | 27 | if (str.size() > std::numeric_limits<uint16_t>::max() || !hasRoomFor(str)) { |
ebf99525 RG |
28 | return false; |
29 | } | |
da71b63b | 30 | |
31 | uint16_t len = htons(str.size()); | |
ebf99525 | 32 | const char* ptr = reinterpret_cast<const char*>(&len); |
da71b63b | 33 | d_buffer.insert(d_buffer.end(), ptr, ptr + 2); |
34 | d_buffer.insert(d_buffer.end(), str.begin(), str.end()); | |
ebf99525 RG |
35 | |
36 | return true; | |
da71b63b | 37 | } |
38 | ||
ebf99525 | 39 | bool CircularWriteBuffer::flush(int fd) |
da71b63b | 40 | { |
ebf99525 RG |
41 | if (d_buffer.empty()) { |
42 | // not optional, we report EOF otherwise | |
43 | return false; | |
44 | } | |
da71b63b | 45 | |
46 | auto arr1 = d_buffer.array_one(); | |
47 | auto arr2 = d_buffer.array_two(); | |
48 | ||
49 | struct iovec iov[2]; | |
ebf99525 | 50 | int pos = 0; |
da71b63b | 51 | for(const auto& arr : {arr1, arr2}) { |
52 | if(arr.second) { | |
53 | iov[pos].iov_base = arr.first; | |
54 | iov[pos].iov_len = arr.second; | |
da71b63b | 55 | ++pos; |
56 | } | |
57 | } | |
58 | ||
ebf99525 RG |
59 | ssize_t res = 0; |
60 | do { | |
61 | res = writev(fd, iov, pos); | |
62 | ||
63 | if (res < 0) { | |
64 | if (errno == EINTR) { | |
65 | continue; | |
66 | } | |
67 | ||
68 | if (errno == EAGAIN || errno == EWOULDBLOCK) { | |
69 | return false; | |
70 | } | |
71 | ||
72 | /* we can't be sure we haven't sent a partial message, | |
73 | and we don't want to send the remaining part after reconnecting */ | |
74 | d_buffer.clear(); | |
75 | throw std::runtime_error("Couldn't flush a thing: " + stringerror()); | |
76 | } | |
77 | else if (!res) { | |
78 | /* we can't be sure we haven't sent a partial message, | |
79 | and we don't want to send the remaining part after reconnecting */ | |
80 | d_buffer.clear(); | |
81 | throw std::runtime_error("EOF"); | |
82 | } | |
da71b63b | 83 | } |
ebf99525 RG |
84 | while (res < 0); |
85 | ||
414a03e1 | 86 | if (static_cast<size_t>(res) == d_buffer.size()) { |
da71b63b | 87 | d_buffer.clear(); |
414a03e1 | 88 | } |
da71b63b | 89 | else { |
ebf99525 | 90 | while (res--) { |
da71b63b | 91 | d_buffer.pop_front(); |
414a03e1 | 92 | } |
da71b63b | 93 | } |
ebf99525 RG |
94 | |
95 | return true; | |
da71b63b | 96 | } |
97 | ||
74b9e43d OM |
98 | const std::string& RemoteLoggerInterface::toErrorString(Result r) |
99 | { | |
100 | static const std::array<std::string,5> str = { | |
101 | "Queued", | |
102 | "Queue full, dropping", | |
103 | "Not sending too large protobuf message", | |
104 | "Submiting to queue failed", | |
105 | "?" | |
106 | }; | |
107 | auto i = static_cast<unsigned int>(r); | |
108 | return str[std::min(i, 4U)]; | |
109 | } | |
110 | ||
6b4670e7 | 111 | RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect), d_runtime({CircularWriteBuffer(maxQueuedBytes), nullptr}) |
da71b63b | 112 | { |
113 | if (!d_asyncConnect) { | |
ebf99525 | 114 | reconnect(); |
da71b63b | 115 | } |
ebf99525 | 116 | |
da71b63b | 117 | d_thread = std::thread(&RemoteLogger::maintenanceThread, this); |
118 | } | |
119 | ||
aa7929a3 RG |
120 | bool RemoteLogger::reconnect() |
121 | { | |
aa7929a3 | 122 | try { |
ebf99525 RG |
123 | auto newSock = make_unique<Socket>(d_remote.sin4.sin_family, SOCK_STREAM, 0); |
124 | newSock->setNonBlocking(); | |
125 | newSock->connect(d_remote, d_timeout); | |
126 | ||
127 | { | |
128 | /* we are now successfully connected, time to take the lock and update the | |
129 | socket */ | |
6b4670e7 RG |
130 | auto runtime = d_runtime.lock(); |
131 | runtime->d_socket = std::move(newSock); | |
ebf99525 | 132 | } |
aa7929a3 | 133 | } |
ebf99525 | 134 | catch (const std::exception& e) { |
49cfc104 | 135 | #ifdef WE_ARE_RECURSOR |
62b191dc | 136 | SLOG(g_log<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl, |
84d24234 | 137 | g_slog->withName("protobuf")->error(Logr::Error, e.what(), "Exception while connecting to remote logger", "address", Logging::Loggable(d_remote))); |
49cfc104 | 138 | #else |
139 | warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what()); | |
140 | #endif | |
ebf99525 | 141 | |
aa7929a3 RG |
142 | return false; |
143 | } | |
144 | return true; | |
145 | } | |
146 | ||
4d7db3d7 | 147 | RemoteLoggerInterface::Result RemoteLogger::queueData(const std::string& data) |
c2d1d4ba | 148 | { |
2b7a8236 OM |
149 | auto runtime = d_runtime.lock(); |
150 | ||
87f46425 | 151 | if (data.size() > std::numeric_limits<uint16_t>::max()) { |
2b7a8236 | 152 | ++runtime->d_stats.d_tooLarge; |
4d7db3d7 | 153 | return Result::TooLarge; |
87f46425 RG |
154 | } |
155 | ||
6b4670e7 | 156 | if (!runtime->d_writer.hasRoomFor(data)) { |
ebf99525 | 157 | /* not connected, queue is full, just drop */ |
6b4670e7 | 158 | if (!runtime->d_socket) { |
2b7a8236 | 159 | ++runtime->d_stats.d_pipeFull; |
4d7db3d7 | 160 | return Result::PipeFull; |
ebf99525 | 161 | } |
da71b63b | 162 | try { |
ebf99525 | 163 | /* we try to flush some data */ |
6b4670e7 | 164 | if (!runtime->d_writer.flush(runtime->d_socket->getHandle())) { |
ebf99525 | 165 | /* but failed, let's just drop */ |
2b7a8236 | 166 | ++runtime->d_stats.d_pipeFull; |
4d7db3d7 | 167 | return Result::PipeFull; |
ebf99525 RG |
168 | } |
169 | ||
170 | /* see if we freed enough data */ | |
6b4670e7 | 171 | if (!runtime->d_writer.hasRoomFor(data)) { |
ebf99525 | 172 | /* we didn't */ |
2b7a8236 | 173 | ++runtime->d_stats.d_pipeFull; |
4d7db3d7 | 174 | return Result::PipeFull; |
ebf99525 | 175 | } |
da71b63b | 176 | } |
414a03e1 | 177 | catch(const std::exception& e) { |
da71b63b | 178 | // cout << "Got exception writing: "<<e.what()<<endl; |
6b4670e7 | 179 | runtime->d_socket.reset(); |
2b7a8236 OM |
180 | ++runtime->d_stats.d_otherError; |
181 | return Result::OtherError; | |
da71b63b | 182 | } |
c2d1d4ba | 183 | } |
c2d1d4ba | 184 | |
6b4670e7 | 185 | runtime->d_writer.write(data); |
2b7a8236 | 186 | ++runtime->d_stats.d_queued; |
4d7db3d7 | 187 | return Result::Queued; |
ebf99525 | 188 | } |
da71b63b | 189 | |
02d6face | 190 | void RemoteLogger::maintenanceThread() |
aa7929a3 | 191 | { |
02d6face | 192 | try { |
77c9bc9a | 193 | #ifdef WE_ARE_RECURSOR |
9548a664 | 194 | string threadName = "rec/remlog"; |
77c9bc9a | 195 | #else |
02d6face | 196 | string threadName = "dnsdist/remLog"; |
77c9bc9a | 197 | #endif |
02d6face | 198 | setThreadName(threadName); |
aa7929a3 | 199 | |
02d6face RG |
200 | for (;;) { |
201 | if (d_exiting) { | |
202 | break; | |
203 | } | |
c2d1d4ba | 204 | |
02d6face | 205 | bool connected = true; |
6b4670e7 | 206 | if (d_runtime.lock()->d_socket == nullptr) { |
02d6face RG |
207 | // if it was unset, it will remain so, we are the only ones setting it! |
208 | connected = reconnect(); | |
209 | } | |
ebf99525 | 210 | |
02d6face RG |
211 | /* we will just go to sleep if the reconnection just failed */ |
212 | if (connected) { | |
213 | try { | |
214 | /* we don't want to take the lock while trying to reconnect */ | |
6b4670e7 RG |
215 | auto runtime = d_runtime.lock(); |
216 | if (runtime->d_socket) { // check if it is set | |
02d6face RG |
217 | /* if flush() returns false, it means that we couldn't flush anything yet |
218 | either because there is nothing to flush, or because the outgoing TCP | |
219 | buffer is full. That's fine by us */ | |
6b4670e7 | 220 | runtime->d_writer.flush(runtime->d_socket->getHandle()); |
02d6face RG |
221 | } |
222 | else { | |
223 | connected = false; | |
224 | } | |
da71b63b | 225 | } |
6b4670e7 RG |
226 | catch (const std::exception& e) { |
227 | d_runtime.lock()->d_socket.reset(); | |
ebf99525 | 228 | connected = false; |
da71b63b | 229 | } |
ebf99525 | 230 | |
02d6face RG |
231 | if (!connected) { |
232 | /* let's try to reconnect right away, we are about to sleep anyway */ | |
233 | reconnect(); | |
234 | } | |
ebf99525 | 235 | } |
ebf99525 | 236 | |
02d6face RG |
237 | sleep(d_reconnectWaitTime); |
238 | } | |
239 | } | |
240 | catch (const std::exception& e) | |
241 | { | |
70099a45 OM |
242 | SLOG(cerr << "Remote Logger's maintenance thread died on: " << e.what() << endl, |
243 | g_slog->withName("protobuf")->error(Logr::Error, e.what(), "Remote Logger's maintenance thread died")); | |
02d6face RG |
244 | } |
245 | catch (...) { | |
70099a45 OM |
246 | SLOG(cerr << "Remote Logger's maintenance thread died on unknown exception" << endl, |
247 | g_slog->withName("protobuf")->info(Logr::Error, "Remote Logger's maintenance thread died")); | |
aa7929a3 | 248 | } |
aa7929a3 RG |
249 | } |
250 | ||
251 | RemoteLogger::~RemoteLogger() | |
252 | { | |
253 | d_exiting = true; | |
da71b63b | 254 | |
aa7929a3 RG |
255 | d_thread.join(); |
256 | } | |
74b9e43d | 257 |