]>
Commit | Line | Data |
---|---|---|
aa7929a3 | 1 | #include <unistd.h> |
519f5484 | 2 | #include "threadname.hh" |
aa7929a3 | 3 | #include "remote_logger.hh" |
da71b63b | 4 | #include <sys/uio.h> |
519f5484 | 5 | #ifdef HAVE_CONFIG_H |
49cfc104 | 6 | #include "config.h" |
519f5484 | 7 | #endif |
49cfc104 | 8 | #ifdef PDNS_CONFIG_ARGS |
9 | #include "logger.hh" | |
10 | #define WE_ARE_RECURSOR | |
11 | #else | |
12 | #include "dolog.hh" | |
13 | #endif | |
aa7929a3 | 14 | |
da71b63b | 15 | void CircularWriteBuffer::write(const std::string& str) |
16 | { | |
17 | if(d_buffer.size() + 2 + str.size() > d_buffer.capacity()) | |
18 | flush(); | |
19 | ||
20 | if(d_buffer.size() + 2 + str.size() > d_buffer.capacity()) | |
21 | throw std::runtime_error("Full!"); | |
22 | ||
23 | uint16_t len = htons(str.size()); | |
24 | char* ptr = (char*)&len; | |
25 | d_buffer.insert(d_buffer.end(), ptr, ptr + 2); | |
26 | d_buffer.insert(d_buffer.end(), str.begin(), str.end()); | |
27 | } | |
28 | ||
29 | void CircularWriteBuffer::flush() | |
30 | { | |
31 | if(d_buffer.empty()) // not optional, we report EOF otherwise | |
32 | return; | |
33 | ||
34 | auto arr1 = d_buffer.array_one(); | |
35 | auto arr2 = d_buffer.array_two(); | |
36 | ||
37 | struct iovec iov[2]; | |
38 | int pos=0; | |
39 | size_t total=0; | |
40 | for(const auto& arr : {arr1, arr2}) { | |
41 | if(arr.second) { | |
42 | iov[pos].iov_base = arr.first; | |
43 | iov[pos].iov_len = arr.second; | |
44 | total += arr.second; | |
45 | ++pos; | |
46 | } | |
47 | } | |
48 | ||
49 | int res = writev(d_fd, iov, pos); | |
50 | if(res < 0) { | |
51 | throw std::runtime_error("Couldn't flush a thing: "+string(strerror(errno))); | |
52 | } | |
53 | if(!res) { | |
54 | throw std::runtime_error("EOF"); | |
55 | } | |
56 | // cout<<"Flushed "<<res<<" bytes out of " << total <<endl; | |
57 | if((size_t)res == d_buffer.size()) | |
58 | d_buffer.clear(); | |
59 | else { | |
60 | while(res--) | |
61 | d_buffer.pop_front(); | |
62 | } | |
63 | } | |
64 | ||
65 | RemoteLogger::RemoteLogger(const ComboAddress& remote, uint16_t timeout, uint64_t maxQueuedBytes, uint8_t reconnectWaitTime, bool asyncConnect): d_remote(remote), d_maxQueuedBytes(maxQueuedBytes), d_timeout(timeout), d_reconnectWaitTime(reconnectWaitTime), d_asyncConnect(asyncConnect) | |
66 | { | |
67 | if (!d_asyncConnect) { | |
68 | if(reconnect()) | |
69 | d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes); | |
70 | } | |
71 | d_thread = std::thread(&RemoteLogger::maintenanceThread, this); | |
72 | } | |
73 | ||
aa7929a3 RG |
74 | bool RemoteLogger::reconnect() |
75 | { | |
76 | if (d_socket >= 0) { | |
77 | close(d_socket); | |
754f300f | 78 | d_socket = -1; |
aa7929a3 RG |
79 | } |
80 | try { | |
81 | d_socket = SSocket(d_remote.sin4.sin_family, SOCK_STREAM, 0); | |
aa7929a3 | 82 | setNonBlocking(d_socket); |
51959320 | 83 | SConnectWithTimeout(d_socket, d_remote, d_timeout); |
aa7929a3 RG |
84 | } |
85 | catch(const std::exception& e) { | |
49cfc104 | 86 | #ifdef WE_ARE_RECURSOR |
e6a9dde5 | 87 | g_log<<Logger::Warning<<"Error connecting to remote logger "<<d_remote.toStringWithPort()<<": "<<e.what()<<std::endl; |
49cfc104 | 88 | #else |
89 | warnlog("Error connecting to remote logger %s: %s", d_remote.toStringWithPort(), e.what()); | |
90 | #endif | |
aa7929a3 RG |
91 | return false; |
92 | } | |
93 | return true; | |
94 | } | |
95 | ||
da71b63b | 96 | void RemoteLogger::queueData(const std::string& data) |
c2d1d4ba | 97 | { |
da71b63b | 98 | if(!d_writer) { |
99 | d_drops++; | |
100 | return; | |
101 | } | |
102 | std::unique_lock<std::mutex> lock(d_mutex); | |
103 | if(d_writer) { | |
104 | try { | |
105 | d_writer->write(data); | |
106 | } | |
107 | catch(std::exception& e) { | |
108 | // cout << "Got exception writing: "<<e.what()<<endl; | |
109 | d_drops++; | |
110 | d_writer.reset(); | |
111 | close(d_socket); | |
112 | d_socket = -1; | |
113 | } | |
c2d1d4ba RG |
114 | } |
115 | } | |
116 | ||
da71b63b | 117 | |
118 | void RemoteLogger::maintenanceThread() | |
119 | try | |
aa7929a3 | 120 | { |
77c9bc9a | 121 | #ifdef WE_ARE_RECURSOR |
c390b2da | 122 | string threadName = "pdns-r/remLog"; |
77c9bc9a PL |
123 | #else |
124 | string threadName = "dnsdist/remLog"; | |
125 | #endif | |
519f5484 | 126 | setThreadName(threadName); |
aa7929a3 | 127 | |
da71b63b | 128 | for(;;) { |
129 | if(d_exiting) | |
130 | break; | |
c2d1d4ba | 131 | |
da71b63b | 132 | if(d_writer) { |
133 | std::unique_lock<std::mutex> lock(d_mutex); | |
134 | if(d_writer) { // check if it is still set | |
135 | // cout<<"Flush"<<endl; | |
136 | try { | |
137 | d_writer->flush(); | |
138 | } | |
139 | catch(std::exception& e) { | |
140 | // cout<<"Flush failed!"<<endl; | |
141 | d_writer.reset(); | |
142 | close(d_socket); | |
143 | d_socket = -1; | |
144 | } | |
145 | } | |
aa7929a3 | 146 | } |
da71b63b | 147 | else if(reconnect()) { // if it was zero, it will remain zero, we are the only ones setting it! |
148 | std::unique_lock<std::mutex> lock(d_mutex); | |
149 | d_writer = make_unique<CircularWriteBuffer>(d_socket, d_maxQueuedBytes); | |
aa7929a3 | 150 | } |
da71b63b | 151 | sleep(d_reconnectWaitTime); |
aa7929a3 | 152 | } |
aa7929a3 | 153 | } |
da71b63b | 154 | catch(std::exception& e) |
aa7929a3 | 155 | { |
da71b63b | 156 | cerr<<"Thead died on: "<<e.what()<<endl; |
aa7929a3 RG |
157 | } |
158 | ||
159 | RemoteLogger::~RemoteLogger() | |
160 | { | |
161 | d_exiting = true; | |
754f300f | 162 | if (d_socket >= 0) { |
aa7929a3 | 163 | close(d_socket); |
754f300f RG |
164 | d_socket = -1; |
165 | } | |
da71b63b | 166 | |
aa7929a3 RG |
167 | d_thread.join(); |
168 | } |