]>
Commit | Line | Data |
---|---|---|
12471842 PL |
1 | /* |
2 | * This file is part of PowerDNS or dnsdist. | |
3 | * Copyright -- PowerDNS.COM B.V. and its contributors | |
4 | * | |
5 | * This program is free software; you can redistribute it and/or modify | |
6 | * it under the terms of version 2 of the GNU General Public License as | |
7 | * published by the Free Software Foundation. | |
8 | * | |
9 | * In addition, for the avoidance of any doubt, permission is granted to | |
10 | * link this program with OpenSSL and to (re)distribute the binaries | |
11 | * produced as the result of such linking. | |
12 | * | |
13 | * This program is distributed in the hope that it will be useful, | |
14 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 | * GNU General Public License for more details. | |
17 | * | |
18 | * You should have received a copy of the GNU General Public License | |
19 | * along with this program; if not, write to the Free Software | |
20 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
21 | */ | |
870a0fe4 AT |
22 | #ifdef HAVE_CONFIG_H |
23 | #include "config.h" | |
24 | #endif | |
5cbfa5f6 BH |
25 | #include <vector> |
26 | #include <deque> | |
27 | #include <iostream> | |
fa8fd4d2 | 28 | |
5cbfa5f6 BH |
29 | #include <boost/multi_index_container.hpp> |
30 | #include <boost/format.hpp> | |
31 | #include <sys/time.h> | |
32 | #include <time.h> | |
33 | #include "iputils.hh" | |
34 | #include "statbag.hh" | |
35 | #include <sys/socket.h> | |
36 | ||
10f4eea8 | 37 | #include "namespaces.hh" |
5cbfa5f6 BH |
38 | using namespace boost::multi_index; |
39 | ||
40 | struct TimeTag{}; | |
41 | ||
42 | template<typename Container, typename SenderReceiver> class Inflighter | |
43 | { | |
44 | public: | |
45 | Inflighter(Container& c, SenderReceiver& sr) : d_container(c), d_sr(sr), d_init(false) | |
46 | { | |
47 | d_burst = 2; | |
48 | d_maxInFlight = 5; | |
49 | d_timeoutSeconds = 3; | |
50 | d_unexpectedResponse = d_timeouts = 0; | |
51 | } | |
52 | void init() | |
53 | { | |
54 | d_iter = d_container.begin(); | |
55 | d_init=true; | |
56 | } | |
bf2aaa52 BH |
57 | |
58 | bool run(); //!< keep calling this as long as it returns 1, or if it throws an exception | |
5cbfa5f6 BH |
59 | |
60 | unsigned int d_maxInFlight; | |
61 | unsigned int d_timeoutSeconds; | |
62 | int d_burst; | |
63 | ||
910d27bb BH |
64 | uint64_t getTimeouts() |
65 | { | |
66 | return d_timeouts; | |
67 | } | |
68 | ||
d712a557 BH |
69 | uint64_t getUnexpecteds() |
70 | { | |
71 | return d_unexpectedResponse; | |
72 | } | |
73 | ||
5cbfa5f6 BH |
74 | private: |
75 | struct TTDItem | |
76 | { | |
77 | typename Container::iterator iter; | |
78 | typename SenderReceiver::Identifier id; | |
3ed022a7 | 79 | struct timeval sentTime, ttd; |
5cbfa5f6 BH |
80 | }; |
81 | ||
82 | typedef multi_index_container< | |
83 | TTDItem, | |
84 | indexed_by< | |
85 | ordered_unique< | |
4957a608 | 86 | member<TTDItem, typename SenderReceiver::Identifier, &TTDItem::id> |
5cbfa5f6 BH |
87 | >, |
88 | ordered_non_unique< | |
4957a608 BH |
89 | tag<TimeTag>, |
90 | member<TTDItem, struct timeval, &TTDItem::ttd> | |
5cbfa5f6 BH |
91 | > |
92 | > | |
93 | >ttdwatch_t; | |
94 | ||
95 | Container& d_container; | |
96 | SenderReceiver& d_sr; | |
97 | ||
5cbfa5f6 BH |
98 | ttdwatch_t d_ttdWatch; |
99 | typename Container::iterator d_iter; | |
100 | bool d_init; | |
101 | ||
102 | uint64_t d_unexpectedResponse, d_timeouts; | |
103 | }; | |
104 | ||
bf2aaa52 | 105 | template<typename Container, typename SendReceive> bool Inflighter<Container, SendReceive>::run() |
5cbfa5f6 BH |
106 | { |
107 | if(!d_init) | |
108 | init(); | |
109 | ||
5cbfa5f6 BH |
110 | for(;;) { |
111 | int burst = 0; | |
bf2aaa52 BH |
112 | |
113 | // 'send' as many items as allowed, limited by 'max in flight' and our burst parameter (which limits query rate growth) | |
5cbfa5f6 BH |
114 | while(d_iter != d_container.end() && d_ttdWatch.size() < d_maxInFlight) { |
115 | TTDItem ttdi; | |
9b082a14 | 116 | ttdi.iter = d_iter++; |
5cbfa5f6 | 117 | ttdi.id = d_sr.send(*ttdi.iter); |
3ed022a7 BH |
118 | gettimeofday(&ttdi.sentTime, 0); |
119 | ttdi.ttd = ttdi.sentTime; | |
5cbfa5f6 | 120 | ttdi.ttd.tv_sec += d_timeoutSeconds; |
bf2aaa52 BH |
121 | if(d_ttdWatch.count(ttdi.id)) { |
122 | // cerr<<"DUPLICATE INSERT!"<<endl; | |
123 | } | |
5cbfa5f6 BH |
124 | d_ttdWatch.insert(ttdi); |
125 | ||
126 | if(++burst == d_burst) | |
127 | break; | |
128 | } | |
129 | int processed=0; | |
bf2aaa52 BH |
130 | |
131 | ||
132 | // if there are queries in flight, handle responses | |
5cbfa5f6 | 133 | if(!d_ttdWatch.empty()) { |
4957a608 | 134 | // cerr<<"Have "<< d_ttdWatch.size() <<" queries in flight"<<endl; |
5cbfa5f6 BH |
135 | typename SendReceive::Answer answer; |
136 | typename SendReceive::Identifier id; | |
137 | ||
bf2aaa52 | 138 | // get as many answers as available - 'receive' should block for a short while to wait for an answer |
5cbfa5f6 | 139 | while(d_sr.receive(id, answer)) { |
bf2aaa52 BH |
140 | typename ttdwatch_t::iterator ival = d_ttdWatch.find(id); // match up what we received to what we were waiting for |
141 | ||
142 | if(ival != d_ttdWatch.end()) { // found something! | |
4957a608 | 143 | ++processed; |
232f0877 CH |
144 | struct timeval now; |
145 | gettimeofday(&now, 0); | |
146 | unsigned int usec = 1000000*(now.tv_sec - ival->sentTime.tv_sec) + (now.tv_usec - ival->sentTime.tv_usec); | |
3ed022a7 | 147 | d_sr.deliverAnswer(*ival->iter, answer, usec); // deliver to sender/receiver |
4957a608 BH |
148 | d_ttdWatch.erase(ival); |
149 | break; // we can send new questions! | |
150 | } | |
151 | else { | |
d712a557 | 152 | // cerr<<"UNEXPECTED ANSWER: "<<id<<endl; |
4957a608 BH |
153 | d_unexpectedResponse++; |
154 | } | |
5cbfa5f6 BH |
155 | } |
156 | ||
157 | ||
bf2aaa52 | 158 | if(!processed /* || d_ttdWatch.size() > 10000 */ ) { // no new responses, time for some cleanup of the ttdWatch |
5cbfa5f6 BH |
159 | struct timeval now; |
160 | gettimeofday(&now, 0); | |
4957a608 BH |
161 | |
162 | typedef typename ttdwatch_t::template index<TimeTag>::type waiters_by_ttd_index_t; | |
163 | waiters_by_ttd_index_t& waiters_index = boost::multi_index::get<TimeTag>(d_ttdWatch); | |
5cbfa5f6 | 164 | |
bf2aaa52 | 165 | // this provides a list of items sorted by age |
5cbfa5f6 BH |
166 | for(typename waiters_by_ttd_index_t::iterator valiter = waiters_index.begin(); valiter != waiters_index.end(); ) { |
167 | if(valiter->ttd.tv_sec < now.tv_sec || (valiter->ttd.tv_sec == now.tv_sec && valiter->ttd.tv_usec < now.tv_usec)) { | |
a4382ef3 | 168 | d_sr.deliverTimeout(valiter->id); // so backend can release id |
5cbfa5f6 | 169 | waiters_index.erase(valiter++); |
d712a557 BH |
170 | // cerr<<"Have timeout for id="<< valiter->id <<endl; |
171 | d_timeouts++; | |
5cbfa5f6 | 172 | } |
4957a608 | 173 | else |
bf2aaa52 | 174 | break; // if this one was too new, rest will be too |
5cbfa5f6 BH |
175 | } |
176 | } | |
177 | } | |
178 | if(d_ttdWatch.empty() && d_iter == d_container.end()) | |
179 | break; | |
180 | } | |
bf2aaa52 | 181 | return false; |
5cbfa5f6 BH |
182 | } |
183 | ||
184 | #if 0 | |
185 | StatBag S; | |
186 | ||
187 | struct SendReceive | |
188 | { | |
189 | typedef int Identifier; | |
190 | typedef int Answer; | |
191 | ComboAddress d_remote; | |
192 | int d_socket; | |
193 | int d_id; | |
194 | ||
195 | SendReceive() | |
196 | { | |
197 | d_id = 0; | |
198 | d_socket = socket(AF_INET, SOCK_DGRAM, 0); | |
199 | int val=1; | |
200 | setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); | |
201 | ||
202 | ComboAddress local("0.0.0.0", 1024); | |
203 | bind(d_socket, (struct sockaddr*)&local, local.getSocklen()); | |
204 | ||
205 | char buf[512]; | |
206 | ||
207 | socklen_t remotelen=sizeof(d_remote); | |
208 | cerr<<"Waiting for 'hi' on "<<local.toStringWithPort()<<endl; | |
209 | int len = recvfrom(d_socket, buf, sizeof(buf), 0, (struct sockaddr*)&d_remote, &remotelen); | |
210 | cerr<<d_remote.toStringWithPort()<<" sent 'hi': "<<string(buf, len); | |
211 | Utility::setNonBlocking(d_socket); | |
212 | connect(d_socket, (struct sockaddr*) &d_remote, d_remote.getSocklen()); | |
213 | } | |
214 | ||
215 | ~SendReceive() | |
216 | { | |
217 | ::send(d_socket, "done\r\n", 6, 0); | |
218 | } | |
219 | ||
220 | Identifier send(int& i) | |
221 | { | |
222 | cerr<<"Sending a '"<<i<<"'"<<endl; | |
223 | string msg = (boost::format("%d %d\n") % d_id % i).str(); | |
224 | ::send(d_socket, msg.c_str(), msg.length(), 0); | |
225 | return d_id++; | |
226 | } | |
227 | ||
228 | bool receive(Identifier& id, int& i) | |
229 | { | |
230 | if(waitForData(d_socket, 0, 500000) > 0) { | |
231 | char buf[512]; | |
232 | ||
233 | int len = recv(d_socket, buf, sizeof(buf), 0); | |
234 | string msg(buf, len); | |
235 | if(sscanf(msg.c_str(), "%d %d", &id, &i) != 2) { | |
4957a608 | 236 | throw runtime_error("Invalid input"); |
5cbfa5f6 BH |
237 | } |
238 | return 1; | |
239 | } | |
240 | return 0; | |
241 | } | |
242 | ||
243 | void deliverAnswer(int& i, int j) | |
244 | { | |
245 | cerr<<"We sent "<<i<<", got back: "<<j<<endl; | |
246 | } | |
247 | }; | |
248 | ||
249 | ||
250 | int main() | |
251 | { | |
252 | vector<int> numbers; | |
253 | SendReceive sr; | |
254 | Inflighter<vector<int>, SendReceive> inflighter(numbers, sr); | |
255 | ||
256 | for(int n=0; n < 100; ++n) | |
257 | numbers.push_back(n*n); | |
258 | ||
259 | ||
260 | for(;;) { | |
261 | try { | |
262 | inflighter.run(); | |
263 | break; | |
264 | } | |
265 | catch(exception& e) { | |
266 | cerr<<"Caught exception: "<<e.what()<<endl; | |
267 | } | |
268 | } | |
269 | ||
270 | } | |
271 | #endif |