]>
Commit | Line | Data |
---|---|---|
12471842 PL |
1 | /* |
2 | * This file is part of PowerDNS or dnsdist. | |
3 | * Copyright -- PowerDNS.COM B.V. and its contributors | |
4 | * | |
5 | * This program is free software; you can redistribute it and/or modify | |
6 | * it under the terms of version 2 of the GNU General Public License as | |
7 | * published by the Free Software Foundation. | |
8 | * | |
9 | * In addition, for the avoidance of any doubt, permission is granted to | |
10 | * link this program with OpenSSL and to (re)distribute the binaries | |
11 | * produced as the result of such linking. | |
12 | * | |
13 | * This program is distributed in the hope that it will be useful, | |
14 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 | * GNU General Public License for more details. | |
17 | * | |
18 | * You should have received a copy of the GNU General Public License | |
19 | * along with this program; if not, write to the Free Software | |
20 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
21 | */ | |
870a0fe4 AT |
22 | #ifdef HAVE_CONFIG_H |
23 | #include "config.h" | |
24 | #endif | |
5cbfa5f6 | 25 | #include <vector> |
5cbfa5f6 | 26 | #include <iostream> |
fa8fd4d2 | 27 | |
5cbfa5f6 | 28 | #include <boost/multi_index_container.hpp> |
905dae56 RG |
29 | #include <boost/multi_index/ordered_index.hpp> |
30 | #include <boost/multi_index/key_extractors.hpp> | |
31 | ||
5cbfa5f6 BH |
32 | #include <boost/format.hpp> |
33 | #include <sys/time.h> | |
34 | #include <time.h> | |
35 | #include "iputils.hh" | |
36 | #include "statbag.hh" | |
37 | #include <sys/socket.h> | |
38 | ||
10f4eea8 | 39 | #include "namespaces.hh" |
5cbfa5f6 BH |
40 | using namespace boost::multi_index; |
41 | ||
42 | struct TimeTag{}; | |
43 | ||
44 | template<typename Container, typename SenderReceiver> class Inflighter | |
45 | { | |
46 | public: | |
47 | Inflighter(Container& c, SenderReceiver& sr) : d_container(c), d_sr(sr), d_init(false) | |
48 | { | |
49 | d_burst = 2; | |
50 | d_maxInFlight = 5; | |
51 | d_timeoutSeconds = 3; | |
52 | d_unexpectedResponse = d_timeouts = 0; | |
53 | } | |
54 | void init() | |
55 | { | |
56 | d_iter = d_container.begin(); | |
57 | d_init=true; | |
58 | } | |
bf2aaa52 BH |
59 | |
60 | bool run(); //!< keep calling this as long as it returns 1, or if it throws an exception | |
5cbfa5f6 BH |
61 | |
62 | unsigned int d_maxInFlight; | |
63 | unsigned int d_timeoutSeconds; | |
64 | int d_burst; | |
65 | ||
910d27bb BH |
66 | uint64_t getTimeouts() |
67 | { | |
68 | return d_timeouts; | |
69 | } | |
70 | ||
d712a557 BH |
71 | uint64_t getUnexpecteds() |
72 | { | |
73 | return d_unexpectedResponse; | |
74 | } | |
75 | ||
5cbfa5f6 BH |
76 | private: |
77 | struct TTDItem | |
78 | { | |
79 | typename Container::iterator iter; | |
80 | typename SenderReceiver::Identifier id; | |
3ed022a7 | 81 | struct timeval sentTime, ttd; |
5cbfa5f6 BH |
82 | }; |
83 | ||
84 | typedef multi_index_container< | |
85 | TTDItem, | |
86 | indexed_by< | |
87 | ordered_unique< | |
4957a608 | 88 | member<TTDItem, typename SenderReceiver::Identifier, &TTDItem::id> |
5cbfa5f6 BH |
89 | >, |
90 | ordered_non_unique< | |
4957a608 BH |
91 | tag<TimeTag>, |
92 | member<TTDItem, struct timeval, &TTDItem::ttd> | |
5cbfa5f6 BH |
93 | > |
94 | > | |
95 | >ttdwatch_t; | |
96 | ||
97 | Container& d_container; | |
98 | SenderReceiver& d_sr; | |
99 | ||
5cbfa5f6 BH |
100 | ttdwatch_t d_ttdWatch; |
101 | typename Container::iterator d_iter; | |
102 | bool d_init; | |
103 | ||
104 | uint64_t d_unexpectedResponse, d_timeouts; | |
105 | }; | |
106 | ||
bf2aaa52 | 107 | template<typename Container, typename SendReceive> bool Inflighter<Container, SendReceive>::run() |
5cbfa5f6 BH |
108 | { |
109 | if(!d_init) | |
110 | init(); | |
111 | ||
5cbfa5f6 BH |
112 | for(;;) { |
113 | int burst = 0; | |
bf2aaa52 BH |
114 | |
115 | // 'send' as many items as allowed, limited by 'max in flight' and our burst parameter (which limits query rate growth) | |
5cbfa5f6 BH |
116 | while(d_iter != d_container.end() && d_ttdWatch.size() < d_maxInFlight) { |
117 | TTDItem ttdi; | |
9b082a14 | 118 | ttdi.iter = d_iter++; |
5cbfa5f6 | 119 | ttdi.id = d_sr.send(*ttdi.iter); |
3ed022a7 BH |
120 | gettimeofday(&ttdi.sentTime, 0); |
121 | ttdi.ttd = ttdi.sentTime; | |
5cbfa5f6 | 122 | ttdi.ttd.tv_sec += d_timeoutSeconds; |
bf2aaa52 BH |
123 | if(d_ttdWatch.count(ttdi.id)) { |
124 | // cerr<<"DUPLICATE INSERT!"<<endl; | |
125 | } | |
5cbfa5f6 BH |
126 | d_ttdWatch.insert(ttdi); |
127 | ||
128 | if(++burst == d_burst) | |
129 | break; | |
130 | } | |
131 | int processed=0; | |
bf2aaa52 BH |
132 | |
133 | ||
134 | // if there are queries in flight, handle responses | |
5cbfa5f6 | 135 | if(!d_ttdWatch.empty()) { |
4957a608 | 136 | // cerr<<"Have "<< d_ttdWatch.size() <<" queries in flight"<<endl; |
5cbfa5f6 BH |
137 | typename SendReceive::Answer answer; |
138 | typename SendReceive::Identifier id; | |
139 | ||
bf2aaa52 | 140 | // get as many answers as available - 'receive' should block for a short while to wait for an answer |
5cbfa5f6 | 141 | while(d_sr.receive(id, answer)) { |
bf2aaa52 BH |
142 | typename ttdwatch_t::iterator ival = d_ttdWatch.find(id); // match up what we received to what we were waiting for |
143 | ||
144 | if(ival != d_ttdWatch.end()) { // found something! | |
4957a608 | 145 | ++processed; |
232f0877 CH |
146 | struct timeval now; |
147 | gettimeofday(&now, 0); | |
148 | unsigned int usec = 1000000*(now.tv_sec - ival->sentTime.tv_sec) + (now.tv_usec - ival->sentTime.tv_usec); | |
3ed022a7 | 149 | d_sr.deliverAnswer(*ival->iter, answer, usec); // deliver to sender/receiver |
4957a608 BH |
150 | d_ttdWatch.erase(ival); |
151 | break; // we can send new questions! | |
152 | } | |
153 | else { | |
d712a557 | 154 | // cerr<<"UNEXPECTED ANSWER: "<<id<<endl; |
4957a608 BH |
155 | d_unexpectedResponse++; |
156 | } | |
5cbfa5f6 BH |
157 | } |
158 | ||
159 | ||
bf2aaa52 | 160 | if(!processed /* || d_ttdWatch.size() > 10000 */ ) { // no new responses, time for some cleanup of the ttdWatch |
5cbfa5f6 BH |
161 | struct timeval now; |
162 | gettimeofday(&now, 0); | |
4957a608 BH |
163 | |
164 | typedef typename ttdwatch_t::template index<TimeTag>::type waiters_by_ttd_index_t; | |
165 | waiters_by_ttd_index_t& waiters_index = boost::multi_index::get<TimeTag>(d_ttdWatch); | |
5cbfa5f6 | 166 | |
bf2aaa52 | 167 | // this provides a list of items sorted by age |
5cbfa5f6 BH |
168 | for(typename waiters_by_ttd_index_t::iterator valiter = waiters_index.begin(); valiter != waiters_index.end(); ) { |
169 | if(valiter->ttd.tv_sec < now.tv_sec || (valiter->ttd.tv_sec == now.tv_sec && valiter->ttd.tv_usec < now.tv_usec)) { | |
a4382ef3 | 170 | d_sr.deliverTimeout(valiter->id); // so backend can release id |
5cbfa5f6 | 171 | waiters_index.erase(valiter++); |
d712a557 BH |
172 | // cerr<<"Have timeout for id="<< valiter->id <<endl; |
173 | d_timeouts++; | |
5cbfa5f6 | 174 | } |
4957a608 | 175 | else |
bf2aaa52 | 176 | break; // if this one was too new, rest will be too |
5cbfa5f6 BH |
177 | } |
178 | } | |
179 | } | |
180 | if(d_ttdWatch.empty() && d_iter == d_container.end()) | |
181 | break; | |
182 | } | |
bf2aaa52 | 183 | return false; |
5cbfa5f6 BH |
184 | } |
185 | ||
186 | #if 0 | |
187 | StatBag S; | |
188 | ||
189 | struct SendReceive | |
190 | { | |
191 | typedef int Identifier; | |
192 | typedef int Answer; | |
193 | ComboAddress d_remote; | |
194 | int d_socket; | |
195 | int d_id; | |
196 | ||
197 | SendReceive() | |
198 | { | |
199 | d_id = 0; | |
200 | d_socket = socket(AF_INET, SOCK_DGRAM, 0); | |
201 | int val=1; | |
202 | setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); | |
203 | ||
204 | ComboAddress local("0.0.0.0", 1024); | |
205 | bind(d_socket, (struct sockaddr*)&local, local.getSocklen()); | |
206 | ||
207 | char buf[512]; | |
208 | ||
209 | socklen_t remotelen=sizeof(d_remote); | |
210 | cerr<<"Waiting for 'hi' on "<<local.toStringWithPort()<<endl; | |
211 | int len = recvfrom(d_socket, buf, sizeof(buf), 0, (struct sockaddr*)&d_remote, &remotelen); | |
212 | cerr<<d_remote.toStringWithPort()<<" sent 'hi': "<<string(buf, len); | |
213 | Utility::setNonBlocking(d_socket); | |
214 | connect(d_socket, (struct sockaddr*) &d_remote, d_remote.getSocklen()); | |
215 | } | |
216 | ||
217 | ~SendReceive() | |
218 | { | |
219 | ::send(d_socket, "done\r\n", 6, 0); | |
220 | } | |
221 | ||
222 | Identifier send(int& i) | |
223 | { | |
224 | cerr<<"Sending a '"<<i<<"'"<<endl; | |
225 | string msg = (boost::format("%d %d\n") % d_id % i).str(); | |
226 | ::send(d_socket, msg.c_str(), msg.length(), 0); | |
227 | return d_id++; | |
228 | } | |
229 | ||
230 | bool receive(Identifier& id, int& i) | |
231 | { | |
232 | if(waitForData(d_socket, 0, 500000) > 0) { | |
233 | char buf[512]; | |
234 | ||
235 | int len = recv(d_socket, buf, sizeof(buf), 0); | |
236 | string msg(buf, len); | |
237 | if(sscanf(msg.c_str(), "%d %d", &id, &i) != 2) { | |
4957a608 | 238 | throw runtime_error("Invalid input"); |
5cbfa5f6 BH |
239 | } |
240 | return 1; | |
241 | } | |
242 | return 0; | |
243 | } | |
244 | ||
245 | void deliverAnswer(int& i, int j) | |
246 | { | |
247 | cerr<<"We sent "<<i<<", got back: "<<j<<endl; | |
248 | } | |
249 | }; | |
250 | ||
251 | ||
252 | int main() | |
253 | { | |
254 | vector<int> numbers; | |
255 | SendReceive sr; | |
256 | Inflighter<vector<int>, SendReceive> inflighter(numbers, sr); | |
257 | ||
258 | for(int n=0; n < 100; ++n) | |
259 | numbers.push_back(n*n); | |
260 | ||
261 | ||
262 | for(;;) { | |
263 | try { | |
264 | inflighter.run(); | |
265 | break; | |
266 | } | |
267 | catch(exception& e) { | |
268 | cerr<<"Caught exception: "<<e.what()<<endl; | |
269 | } | |
270 | } | |
271 | ||
272 | } | |
273 | #endif |