]> git.ipfire.org Git - thirdparty/pdns.git/blame - modules/remotebackend/zmqconnector.cc
Merge pull request #11431 from jroessler-ox/docs-kskzskroll-update
[thirdparty/pdns.git] / modules / remotebackend / zmqconnector.cc
CommitLineData
12471842
PL
1/*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
870a0fe4
AT
22#ifdef HAVE_CONFIG_H
23#include "config.h"
24#endif
a7db8aa6
AT
25#include "remotebackend.hh"
26#ifdef REMOTEBACKEND_ZEROMQ
27
ff05c7e1
O
28ZeroMQConnector::ZeroMQConnector(std::map<std::string, std::string> options) :
29 d_ctx(std::unique_ptr<void, int (*)(void*)>(zmq_init(2), zmq_close)), d_sock(std::unique_ptr<void, int (*)(void*)>(zmq_socket(d_ctx.get(), ZMQ_REQ), zmq_close))
30{
31 int opt = 0;
a7db8aa6
AT
32
33 // lookup timeout, target and stuff
34 if (options.count("endpoint") == 0) {
ff05c7e1 35 g_log << Logger::Error << "Cannot find 'endpoint' option in connection string" << endl;
9305f5fd 36 throw PDNSException("Cannot find 'endpoint' option in connection string");
a7db8aa6
AT
37 }
38 this->d_endpoint = options.find("endpoint")->second;
39 this->d_options = options;
ff05c7e1 40 this->d_timeout = 2000;
a7db8aa6
AT
41
42 if (options.find("timeout") != options.end()) {
ff05c7e1 43 this->d_timeout = std::stoi(options.find("timeout")->second);
a7db8aa6
AT
44 }
45
6c07f223 46 zmq_setsockopt(d_sock.get(), ZMQ_LINGER, &opt, sizeof(opt));
49e4360a 47
ff05c7e1
O
48 if (zmq_connect(this->d_sock.get(), this->d_endpoint.c_str()) < 0) {
49 g_log << Logger::Error << "zmq_connect() failed" << zmq_strerror(errno) << std::endl;
50 ;
49e4360a
AT
51 throw PDNSException("Cannot find 'endpoint' option in connection string");
52 }
a7db8aa6 53
fa2dd0e6
AT
54 Json::array parameters;
55 Json msg = Json(Json::object{
ff05c7e1
O
56 {"method", "initialize"},
57 {"parameters", Json(options)},
fa2dd0e6 58 });
a7db8aa6 59
fa2dd0e6
AT
60 this->send(msg);
61 msg = nullptr;
645d66cf 62 if (!this->recv(msg)) {
ff05c7e1 63 g_log << Logger::Error << "Failed to initialize zeromq" << std::endl;
49e4360a 64 throw PDNSException("Failed to initialize zeromq");
ff05c7e1 65 }
a7db8aa6
AT
66};
67
645d66cf 68ZeroMQConnector::~ZeroMQConnector() = default;
a7db8aa6 69
ff05c7e1
O
70int ZeroMQConnector::send_message(const Json& input)
71{
72 auto line = input.dump();
73 zmq_msg_t message;
74
75 zmq_msg_init_size(&message, line.size() + 1);
76 line.copy(reinterpret_cast<char*>(zmq_msg_data(&message)), line.size());
77 ((char*)zmq_msg_data(&message))[line.size()] = '\0';
78
79 try {
80 zmq_pollitem_t item;
81 item.socket = d_sock.get();
82 item.events = ZMQ_POLLOUT;
83 // poll until it's sent or timeout is spent. try to leave
84 // leave few cycles for read. just in case.
85 for (d_timespent = 0; d_timespent < d_timeout - 5; d_timespent++) {
86 if (zmq_poll(&item, 1, 1) > 0) {
87 if (zmq_msg_send(&message, this->d_sock.get(), 0) == -1) {
88 // message was not sent
89 g_log << Logger::Error << "Cannot send to " << this->d_endpoint << ": " << zmq_strerror(errno) << std::endl;
90 }
645d66cf 91 else {
ff05c7e1 92 return line.size();
645d66cf 93 }
ff05c7e1
O
94 }
95 }
96 }
97 catch (std::exception& ex) {
98 g_log << Logger::Error << "Cannot send to " << this->d_endpoint << ": " << ex.what() << std::endl;
99 throw PDNSException(ex.what());
100 }
101
102 return 0;
a7db8aa6
AT
103}
104
ff05c7e1
O
105int ZeroMQConnector::recv_message(Json& output)
106{
107 int rv = 0;
108 // try to receive message
109 zmq_pollitem_t item;
110 zmq_msg_t message;
111
112 item.socket = d_sock.get();
113 item.events = ZMQ_POLLIN;
114
115 try {
116 // do zmq::poll few times
117 // d_timespent should always be initialized by send_message, recv should never
118 // be called without send first.
119 for (; d_timespent < d_timeout; d_timespent++) {
120 if (zmq_poll(&item, 1, 1) > 0) {
121 // we have an event
122 if ((item.revents & ZMQ_POLLIN) == ZMQ_POLLIN) {
123 string data;
645d66cf 124 size_t msg_size = 0;
ff05c7e1
O
125 zmq_msg_init(&message);
126 // read something
127 if (zmq_msg_recv(&message, this->d_sock.get(), ZMQ_NOBLOCK) > 0) {
128 string err;
129 msg_size = zmq_msg_size(&message);
130 data.assign(reinterpret_cast<const char*>(zmq_msg_data(&message)), msg_size);
131 zmq_msg_close(&message);
132 output = Json::parse(data, err);
645d66cf 133 if (output != nullptr) {
ff05c7e1 134 rv = msg_size;
645d66cf
FM
135 }
136 else {
ff05c7e1 137 g_log << Logger::Error << "Cannot parse JSON reply from " << this->d_endpoint << ": " << err << endl;
645d66cf 138 }
ff05c7e1
O
139 break;
140 }
645d66cf 141 if (errno == EAGAIN) {
ff05c7e1
O
142 continue; // try again }
143 }
645d66cf 144 break;
a7db8aa6 145 }
ff05c7e1
O
146 }
147 }
148 }
149 catch (std::exception& ex) {
150 g_log << Logger::Error << "Cannot receive from " << this->d_endpoint << ": " << ex.what() << std::endl;
151 throw PDNSException(ex.what());
152 }
a7db8aa6 153
ff05c7e1 154 return rv;
a7db8aa6
AT
155}
156
157#endif