]>
git.ipfire.org Git - thirdparty/pdns.git/blob - modules/remotebackend/zmqconnector.cc
c810d042efbf411a4bc9e251239ef33b0f690ec5
4 #include "remotebackend.hh"
5 #ifdef REMOTEBACKEND_ZEROMQ
7 ZeroMQConnector::ZeroMQConnector(std::map
<std::string
,std::string
> options
) {
10 // lookup timeout, target and stuff
11 if (options
.count("endpoint") == 0) {
12 L
<<Logger::Error
<<"Cannot find 'endpoint' option in connection string"<<endl
;
13 throw PDNSException("Cannot find 'endpoint' option in connection string");
15 this->d_endpoint
= options
.find("endpoint")->second
;
16 this->d_options
= options
;
19 if (options
.find("timeout") != options
.end()) {
20 this->d_timeout
= std::stoi(options
.find("timeout")->second
);
24 d_sock
= zmq_socket(this->d_ctx
, ZMQ_REQ
);
25 zmq_setsockopt(d_sock
, ZMQ_LINGER
, &opt
, sizeof(opt
));
27 if(zmq_connect(this->d_sock
, this->d_endpoint
.c_str()) < 0)
29 L
<<Logger::Error
<<"zmq_connect() failed"<< zmq_strerror(errno
)<<std::endl
;;
30 throw PDNSException("Cannot find 'endpoint' option in connection string");
33 Json::array parameters
;
34 Json msg
= Json(Json::object
{
35 { "method", "initialize" },
36 { "parameters", Json(options
) },
41 if (this->recv(msg
)==false) {
42 L
<<Logger::Error
<<"Failed to initialize zeromq"<<std::endl
;
43 throw PDNSException("Failed to initialize zeromq");
47 ZeroMQConnector::~ZeroMQConnector() {
48 zmq_close(this->d_sock
);
49 zmq_term(this->d_ctx
);
52 int ZeroMQConnector::send_message(const Json
& input
) {
53 auto line
= input
.dump();
56 zmq_msg_init_size(&message
, line
.size()+1);
57 line
.copy(reinterpret_cast<char*>(zmq_msg_data(&message
)), line
.size());
58 ((char *)zmq_msg_data(&message
))[line
.size()] = '\0';
63 item
.events
= ZMQ_POLLOUT
;
64 // poll until it's sent or timeout is spent. try to leave
65 // leave few cycles for read. just in case.
66 for(d_timespent
= 0; d_timespent
< d_timeout
-5; d_timespent
++) {
67 if (zmq_poll(&item
, 1, 1)>0) {
68 if(zmq_msg_send(&message
, this->d_sock
, 0) == -1) {
69 // message was not sent
70 L
<<Logger::Error
<<"Cannot send to " << this->d_endpoint
<< ": " << zmq_strerror(errno
)<<std::endl
;
75 } catch (std::exception
&ex
) {
76 L
<<Logger::Error
<<"Cannot send to " << this->d_endpoint
<< ": " << ex
.what()<<std::endl
;
77 throw PDNSException(ex
.what());
83 int ZeroMQConnector::recv_message(Json
& output
) {
85 // try to receive message
90 item
.events
= ZMQ_POLLIN
;
93 // do zmq::poll few times
94 // d_timespent should always be initialized by send_message, recv should never
95 // be called without send first.
96 for(; d_timespent
< d_timeout
; d_timespent
++) {
97 if (zmq_poll(&item
, 1, 1)>0) {
99 if ((item
.revents
& ZMQ_POLLIN
) == ZMQ_POLLIN
) {
102 zmq_msg_init(&message
);
104 if(zmq_msg_recv(&message
, this->d_sock
, ZMQ_NOBLOCK
)>0) {
106 msg_size
= zmq_msg_size(&message
);
107 data
.assign(reinterpret_cast<const char*>(zmq_msg_data(&message
)), msg_size
);
108 zmq_msg_close(&message
);
109 output
= Json::parse(data
, err
);
110 if (output
!= nullptr)
113 L
<<Logger::Error
<<"Cannot parse JSON reply from " << this->d_endpoint
<< ": " << err
<< endl
;
115 } else if (errno
== EAGAIN
) { continue; // try again }
122 } catch (std::exception
&ex
) {
123 L
<<Logger::Error
<<"Cannot receive from " << this->d_endpoint
<< ": " << ex
.what()<<std::endl
;
124 throw PDNSException(ex
.what());