]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/fstrm_logger.cc
Merge pull request #6323 from zeha/dnsdist-remotelogger-checkconfig
[thirdparty/pdns.git] / pdns / fstrm_logger.cc
1 #include <unistd.h>
2 #include <sys/un.h>
3
4 #include "config.h"
5 #include "fstrm_logger.hh"
6 #include "dolog.hh"
7
8 #define DNSTAP_CONTENT_TYPE "protobuf:dnstap.Dnstap"
9
10 #ifdef HAVE_FSTRM
11
12 FrameStreamLogger::FrameStreamLogger(const int family, const std::string& address, bool connect): d_family(family), d_address(address)
13 {
14 fstrm_res res;
15
16 try {
17 d_fwopt = fstrm_writer_options_init();
18 if (!d_fwopt) {
19 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_init failed.");
20 }
21
22 res = fstrm_writer_options_add_content_type(d_fwopt, DNSTAP_CONTENT_TYPE, sizeof(DNSTAP_CONTENT_TYPE) - 1);
23 if (res != fstrm_res_success) {
24 throw std::runtime_error("FrameStreamLogger: fstrm_writer_options_add_content_type failed: " + std::to_string(res));
25 }
26
27 if (d_family == AF_UNIX) {
28 struct sockaddr_un local;
29 if (makeUNsockaddr(d_address, &local)) {
30 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "', it is not a valid UNIX socket path.");
31 }
32
33 d_uwopt = fstrm_unix_writer_options_init();
34 if (!d_uwopt) {
35 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_options_init failed.");
36 }
37
38 // void return, no error checking.
39 fstrm_unix_writer_options_set_socket_path(d_uwopt, d_address.c_str());
40
41 d_writer = fstrm_unix_writer_init(d_uwopt, d_fwopt);
42 if (!d_writer) {
43 throw std::runtime_error("FrameStreamLogger: fstrm_unix_writer_init() failed.");
44 }
45 #ifdef HAVE_FSTRM_TCP_WRITER_INIT
46 } else if (family == AF_INET) {
47 d_twopt = fstrm_tcp_writer_options_init();
48 if (!d_twopt) {
49 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_options_init failed.");
50 }
51
52 try {
53 ComboAddress ca(d_address);
54
55 // void return, no error checking.
56 fstrm_tcp_writer_options_set_socket_address(d_twopt, ca.toString().c_str());
57 fstrm_tcp_writer_options_set_socket_port(d_twopt, std::to_string(ca.getPort()).c_str());
58 } catch (PDNSException &e) {
59 throw std::runtime_error("FrameStreamLogger: Unable to use '" + d_address + "': " + e.reason);
60 }
61
62 d_writer = fstrm_tcp_writer_init(d_twopt, d_fwopt);
63 if (!d_writer) {
64 throw std::runtime_error("FrameStreamLogger: fstrm_tcp_writer_init() failed.");
65 }
66 #endif
67 } else {
68 throw std::runtime_error("FrameStreamLogger: family " + std::to_string(family) + " not supported");
69 }
70
71 d_iothropt = fstrm_iothr_options_init();
72 if (!d_iothropt) {
73 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_init() failed.");
74 }
75
76 res = fstrm_iothr_options_set_queue_model(d_iothropt, FSTRM_IOTHR_QUEUE_MODEL_MPSC);
77 if (res != fstrm_res_success) {
78 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_options_set_queue_model failed: " + std::to_string(res));
79 }
80
81 if (connect) {
82 d_iothr = fstrm_iothr_init(d_iothropt, &d_writer);
83 if (!d_iothr) {
84 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_init() failed.");
85 }
86
87 d_ioqueue = fstrm_iothr_get_input_queue(d_iothr);
88 if (!d_ioqueue) {
89 throw std::runtime_error("FrameStreamLogger: fstrm_iothr_get_input_queue() failed.");
90 }
91 }
92 } catch (std::runtime_error &e) {
93 this->cleanup();
94 throw;
95 }
96 }
97
98 void FrameStreamLogger::cleanup()
99 {
100 if (d_iothr != nullptr) {
101 fstrm_iothr_destroy(&d_iothr);
102 d_iothr = nullptr;
103 }
104 if (d_iothropt != nullptr) {
105 fstrm_iothr_options_destroy(&d_iothropt);
106 d_iothropt = nullptr;
107 }
108 if (d_writer != nullptr) {
109 fstrm_writer_destroy(&d_writer);
110 d_writer = nullptr;
111 }
112 if (d_uwopt != nullptr) {
113 fstrm_unix_writer_options_destroy(&d_uwopt);
114 d_uwopt = nullptr;
115 }
116 #ifdef HAVE_FSTRM_TCP_WRITER_INIT
117 if (d_twopt != nullptr) {
118 fstrm_tcp_writer_options_destroy(&d_twopt);
119 d_twopt = nullptr;
120 }
121 #endif
122 if (d_fwopt != nullptr) {
123 fstrm_writer_options_destroy(&d_fwopt);
124 d_fwopt = nullptr;
125 }
126 }
127
128 FrameStreamLogger::~FrameStreamLogger()
129 {
130 this->cleanup();
131 }
132
133 void FrameStreamLogger::queueData(const std::string& data)
134 {
135 if (!d_ioqueue || !d_iothr) {
136 return;
137 }
138 uint8_t *frame = (uint8_t*)malloc(data.length());
139 if (!frame) {
140 warnlog("FrameStreamLogger: cannot allocate memory for stream.");
141 return;
142 }
143 memcpy(frame, data.c_str(), data.length());
144
145 fstrm_res res;
146 res = fstrm_iothr_submit(d_iothr, d_ioqueue, frame, data.length(), fstrm_free_wrapper, nullptr);
147
148 if (res == fstrm_res_success) {
149 // Frame successfully queued.
150 } else if (res == fstrm_res_again) {
151 free(frame);
152 warnlog("FrameStreamLogger: queue full, dropping.");
153 } else {
154 // Permanent failure.
155 free(frame);
156 warnlog("FrameStreamLogger: submitting to queue failed.");
157 }
158 }
159
160 #endif /* HAVE_FSTRM */