2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
29 #include <boost/multi_index_container.hpp>
30 #include <boost/format.hpp>
35 #include <sys/socket.h>
37 #include "namespaces.hh"
38 using namespace boost::multi_index
;
42 template<typename Container
, typename SenderReceiver
> class Inflighter
45 Inflighter(Container
& c
, SenderReceiver
& sr
) : d_container(c
), d_sr(sr
), d_init(false)
50 d_unexpectedResponse
= d_timeouts
= 0;
54 d_iter
= d_container
.begin();
58 bool run(); //!< keep calling this as long as it returns 1, or if it throws an exception
60 unsigned int d_maxInFlight
;
61 unsigned int d_timeoutSeconds
;
64 uint64_t getTimeouts()
69 uint64_t getUnexpecteds()
71 return d_unexpectedResponse
;
77 typename
Container::iterator iter
;
78 typename
SenderReceiver::Identifier id
;
79 struct timeval sentTime
, ttd
;
82 typedef multi_index_container
<
86 member
<TTDItem
, typename
SenderReceiver::Identifier
, &TTDItem::id
>
90 member
<TTDItem
, struct timeval
, &TTDItem::ttd
>
95 Container
& d_container
;
98 ttdwatch_t d_ttdWatch
;
99 typename
Container::iterator d_iter
;
102 uint64_t d_unexpectedResponse
, d_timeouts
;
105 template<typename Container
, typename SendReceive
> bool Inflighter
<Container
, SendReceive
>::run()
113 // 'send' as many items as allowed, limited by 'max in flight' and our burst parameter (which limits query rate growth)
114 while(d_iter
!= d_container
.end() && d_ttdWatch
.size() < d_maxInFlight
) {
116 ttdi
.iter
= d_iter
++;
117 ttdi
.id
= d_sr
.send(*ttdi
.iter
);
118 gettimeofday(&ttdi
.sentTime
, 0);
119 ttdi
.ttd
= ttdi
.sentTime
;
120 ttdi
.ttd
.tv_sec
+= d_timeoutSeconds
;
121 if(d_ttdWatch
.count(ttdi
.id
)) {
122 // cerr<<"DUPLICATE INSERT!"<<endl;
124 d_ttdWatch
.insert(ttdi
);
126 if(++burst
== d_burst
)
132 // if there are queries in flight, handle responses
133 if(!d_ttdWatch
.empty()) {
134 // cerr<<"Have "<< d_ttdWatch.size() <<" queries in flight"<<endl;
135 typename
SendReceive::Answer answer
;
136 typename
SendReceive::Identifier id
;
138 // get as many answers as available - 'receive' should block for a short while to wait for an answer
139 while(d_sr
.receive(id
, answer
)) {
140 typename
ttdwatch_t::iterator ival
= d_ttdWatch
.find(id
); // match up what we received to what we were waiting for
142 if(ival
!= d_ttdWatch
.end()) { // found something!
145 gettimeofday(&now
, 0);
146 unsigned int usec
= 1000000*(now
.tv_sec
- ival
->sentTime
.tv_sec
) + (now
.tv_usec
- ival
->sentTime
.tv_usec
);
147 d_sr
.deliverAnswer(*ival
->iter
, answer
, usec
); // deliver to sender/receiver
148 d_ttdWatch
.erase(ival
);
149 break; // we can send new questions!
152 // cerr<<"UNEXPECTED ANSWER: "<<id<<endl;
153 d_unexpectedResponse
++;
158 if(!processed
/* || d_ttdWatch.size() > 10000 */ ) { // no new responses, time for some cleanup of the ttdWatch
160 gettimeofday(&now
, 0);
162 typedef typename
ttdwatch_t::template index
<TimeTag
>::type waiters_by_ttd_index_t
;
163 waiters_by_ttd_index_t
& waiters_index
= boost::multi_index::get
<TimeTag
>(d_ttdWatch
);
165 // this provides a list of items sorted by age
166 for(typename
waiters_by_ttd_index_t::iterator valiter
= waiters_index
.begin(); valiter
!= waiters_index
.end(); ) {
167 if(valiter
->ttd
.tv_sec
< now
.tv_sec
|| (valiter
->ttd
.tv_sec
== now
.tv_sec
&& valiter
->ttd
.tv_usec
< now
.tv_usec
)) {
168 d_sr
.deliverTimeout(valiter
->id
); // so backend can release id
169 waiters_index
.erase(valiter
++);
170 // cerr<<"Have timeout for id="<< valiter->id <<endl;
174 break; // if this one was too new, rest will be too
178 if(d_ttdWatch
.empty() && d_iter
== d_container
.end())
189 typedef int Identifier
;
191 ComboAddress d_remote
;
198 d_socket
= socket(AF_INET
, SOCK_DGRAM
, 0);
200 setsockopt(d_socket
, SOL_SOCKET
, SO_REUSEADDR
, &val
, sizeof(val
));
202 ComboAddress
local("0.0.0.0", 1024);
203 bind(d_socket
, (struct sockaddr
*)&local
, local
.getSocklen());
207 socklen_t remotelen
=sizeof(d_remote
);
208 cerr
<<"Waiting for 'hi' on "<<local
.toStringWithPort()<<endl
;
209 int len
= recvfrom(d_socket
, buf
, sizeof(buf
), 0, (struct sockaddr
*)&d_remote
, &remotelen
);
210 cerr
<<d_remote
.toStringWithPort()<<" sent 'hi': "<<string(buf
, len
);
211 Utility::setNonBlocking(d_socket
);
212 connect(d_socket
, (struct sockaddr
*) &d_remote
, d_remote
.getSocklen());
217 ::send(d_socket
, "done\r\n", 6, 0);
220 Identifier
send(int& i
)
222 cerr
<<"Sending a '"<<i
<<"'"<<endl
;
223 string msg
= (boost::format("%d %d\n") % d_id
% i
).str();
224 ::send(d_socket
, msg
.c_str(), msg
.length(), 0);
228 bool receive(Identifier
& id
, int& i
)
230 if(waitForData(d_socket
, 0, 500000) > 0) {
233 int len
= recv(d_socket
, buf
, sizeof(buf
), 0);
234 string
msg(buf
, len
);
235 if(sscanf(msg
.c_str(), "%d %d", &id
, &i
) != 2) {
236 throw runtime_error("Invalid input");
243 void deliverAnswer(int& i
, int j
)
245 cerr
<<"We sent "<<i
<<", got back: "<<j
<<endl
;
254 Inflighter
<vector
<int>, SendReceive
> inflighter(numbers
, sr
);
256 for(int n
=0; n
< 100; ++n
)
257 numbers
.push_back(n
*n
);
265 catch(exception
& e
) {
266 cerr
<<"Caught exception: "<<e
.what()<<endl
;