]> git.ipfire.org Git - thirdparty/pdns.git/blob - modules/remotebackend/zmqconnector.cc
c810d042efbf411a4bc9e251239ef33b0f690ec5
[thirdparty/pdns.git] / modules / remotebackend / zmqconnector.cc
1 #ifdef HAVE_CONFIG_H
2 #include "config.h"
3 #endif
4 #include "remotebackend.hh"
5 #ifdef REMOTEBACKEND_ZEROMQ
6
7 ZeroMQConnector::ZeroMQConnector(std::map<std::string,std::string> options) {
8 int opt=0;
9
10 // lookup timeout, target and stuff
11 if (options.count("endpoint") == 0) {
12 L<<Logger::Error<<"Cannot find 'endpoint' option in connection string"<<endl;
13 throw PDNSException("Cannot find 'endpoint' option in connection string");
14 }
15 this->d_endpoint = options.find("endpoint")->second;
16 this->d_options = options;
17 this->d_timeout=2000;
18
19 if (options.find("timeout") != options.end()) {
20 this->d_timeout = std::stoi(options.find("timeout")->second);
21 }
22
23 d_ctx = zmq_init(2);
24 d_sock = zmq_socket(this->d_ctx, ZMQ_REQ);
25 zmq_setsockopt(d_sock, ZMQ_LINGER, &opt, sizeof(opt));
26
27 if(zmq_connect(this->d_sock, this->d_endpoint.c_str()) < 0)
28 {
29 L<<Logger::Error<<"zmq_connect() failed"<< zmq_strerror(errno)<<std::endl;;
30 throw PDNSException("Cannot find 'endpoint' option in connection string");
31 }
32
33 Json::array parameters;
34 Json msg = Json(Json::object{
35 { "method", "initialize" },
36 { "parameters", Json(options) },
37 });
38
39 this->send(msg);
40 msg = nullptr;
41 if (this->recv(msg)==false) {
42 L<<Logger::Error<<"Failed to initialize zeromq"<<std::endl;
43 throw PDNSException("Failed to initialize zeromq");
44 }
45 };
46
47 ZeroMQConnector::~ZeroMQConnector() {
48 zmq_close(this->d_sock);
49 zmq_term(this->d_ctx);
50 };
51
52 int ZeroMQConnector::send_message(const Json& input) {
53 auto line = input.dump();
54 zmq_msg_t message;
55
56 zmq_msg_init_size(&message, line.size()+1);
57 line.copy(reinterpret_cast<char*>(zmq_msg_data(&message)), line.size());
58 ((char *)zmq_msg_data(&message))[line.size()] = '\0';
59
60 try {
61 zmq_pollitem_t item;
62 item.socket = d_sock;
63 item.events = ZMQ_POLLOUT;
64 // poll until it's sent or timeout is spent. try to leave
65 // leave few cycles for read. just in case.
66 for(d_timespent = 0; d_timespent < d_timeout-5; d_timespent++) {
67 if (zmq_poll(&item, 1, 1)>0) {
68 if(zmq_msg_send(&message, this->d_sock, 0) == -1) {
69 // message was not sent
70 L<<Logger::Error<<"Cannot send to " << this->d_endpoint << ": " << zmq_strerror(errno)<<std::endl;
71 } else
72 return line.size();
73 }
74 }
75 } catch (std::exception &ex) {
76 L<<Logger::Error<<"Cannot send to " << this->d_endpoint << ": " << ex.what()<<std::endl;
77 throw PDNSException(ex.what());
78 }
79
80 return 0;
81 }
82
83 int ZeroMQConnector::recv_message(Json& output) {
84 int rv = 0;
85 // try to receive message
86 zmq_pollitem_t item;
87 zmq_msg_t message;
88
89 item.socket = d_sock;
90 item.events = ZMQ_POLLIN;
91
92 try {
93 // do zmq::poll few times
94 // d_timespent should always be initialized by send_message, recv should never
95 // be called without send first.
96 for(; d_timespent < d_timeout; d_timespent++) {
97 if (zmq_poll(&item, 1, 1)>0) {
98 // we have an event
99 if ((item.revents & ZMQ_POLLIN) == ZMQ_POLLIN) {
100 string data;
101 size_t msg_size;
102 zmq_msg_init(&message);
103 // read something
104 if(zmq_msg_recv(&message, this->d_sock, ZMQ_NOBLOCK)>0) {
105 string err;
106 msg_size = zmq_msg_size(&message);
107 data.assign(reinterpret_cast<const char*>(zmq_msg_data(&message)), msg_size);
108 zmq_msg_close(&message);
109 output = Json::parse(data, err);
110 if (output != nullptr)
111 rv = msg_size;
112 else
113 L<<Logger::Error<<"Cannot parse JSON reply from " << this->d_endpoint << ": " << err << endl;
114 break;
115 } else if (errno == EAGAIN) { continue; // try again }
116 } else {
117 break;
118 }
119 }
120 }
121 }
122 } catch (std::exception &ex) {
123 L<<Logger::Error<<"Cannot receive from " << this->d_endpoint << ": " << ex.what()<<std::endl;
124 throw PDNSException(ex.what());
125 }
126
127 return rv;
128 }
129
130 #endif