]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/inflighter.cc
Meson: Separate test files from common files
[thirdparty/pdns.git] / pdns / inflighter.cc
CommitLineData
12471842
PL
1/*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
870a0fe4
AT
22#ifdef HAVE_CONFIG_H
23#include "config.h"
24#endif
5cbfa5f6 25#include <vector>
5cbfa5f6 26#include <iostream>
fa8fd4d2 27
5cbfa5f6 28#include <boost/multi_index_container.hpp>
905dae56
RG
29#include <boost/multi_index/ordered_index.hpp>
30#include <boost/multi_index/key_extractors.hpp>
31
5cbfa5f6
BH
32#include <boost/format.hpp>
33#include <sys/time.h>
34#include <time.h>
35#include "iputils.hh"
36#include "statbag.hh"
37#include <sys/socket.h>
38
10f4eea8 39#include "namespaces.hh"
5cbfa5f6
BH
40using namespace boost::multi_index;
41
42struct TimeTag{};
43
44template<typename Container, typename SenderReceiver> class Inflighter
45{
46public:
47 Inflighter(Container& c, SenderReceiver& sr) : d_container(c), d_sr(sr), d_init(false)
48 {
49 d_burst = 2;
50 d_maxInFlight = 5;
51 d_timeoutSeconds = 3;
52 d_unexpectedResponse = d_timeouts = 0;
53 }
54 void init()
55 {
56 d_iter = d_container.begin();
57 d_init=true;
58 }
bf2aaa52
BH
59
60 bool run(); //!< keep calling this as long as it returns 1, or if it throws an exception
5cbfa5f6
BH
61
62 unsigned int d_maxInFlight;
63 unsigned int d_timeoutSeconds;
64 int d_burst;
65
910d27bb
BH
66 uint64_t getTimeouts()
67 {
68 return d_timeouts;
69 }
70
d712a557
BH
71 uint64_t getUnexpecteds()
72 {
73 return d_unexpectedResponse;
74 }
75
5cbfa5f6
BH
76private:
77 struct TTDItem
78 {
79 typename Container::iterator iter;
80 typename SenderReceiver::Identifier id;
3ed022a7 81 struct timeval sentTime, ttd;
5cbfa5f6
BH
82 };
83
84 typedef multi_index_container<
85 TTDItem,
86 indexed_by<
87 ordered_unique<
4957a608 88 member<TTDItem, typename SenderReceiver::Identifier, &TTDItem::id>
5cbfa5f6
BH
89 >,
90 ordered_non_unique<
4957a608
BH
91 tag<TimeTag>,
92 member<TTDItem, struct timeval, &TTDItem::ttd>
5cbfa5f6
BH
93 >
94 >
95 >ttdwatch_t;
96
97 Container& d_container;
98 SenderReceiver& d_sr;
99
5cbfa5f6
BH
100 ttdwatch_t d_ttdWatch;
101 typename Container::iterator d_iter;
102 bool d_init;
103
104 uint64_t d_unexpectedResponse, d_timeouts;
105};
106
bf2aaa52 107template<typename Container, typename SendReceive> bool Inflighter<Container, SendReceive>::run()
5cbfa5f6
BH
108{
109 if(!d_init)
110 init();
111
5cbfa5f6
BH
112 for(;;) {
113 int burst = 0;
bf2aaa52
BH
114
115 // 'send' as many items as allowed, limited by 'max in flight' and our burst parameter (which limits query rate growth)
5cbfa5f6
BH
116 while(d_iter != d_container.end() && d_ttdWatch.size() < d_maxInFlight) {
117 TTDItem ttdi;
9b082a14 118 ttdi.iter = d_iter++;
5cbfa5f6 119 ttdi.id = d_sr.send(*ttdi.iter);
3ed022a7
BH
120 gettimeofday(&ttdi.sentTime, 0);
121 ttdi.ttd = ttdi.sentTime;
5cbfa5f6 122 ttdi.ttd.tv_sec += d_timeoutSeconds;
bf2aaa52
BH
123 if(d_ttdWatch.count(ttdi.id)) {
124// cerr<<"DUPLICATE INSERT!"<<endl;
125 }
5cbfa5f6
BH
126 d_ttdWatch.insert(ttdi);
127
128 if(++burst == d_burst)
129 break;
130 }
131 int processed=0;
bf2aaa52
BH
132
133
134 // if there are queries in flight, handle responses
5cbfa5f6 135 if(!d_ttdWatch.empty()) {
4957a608 136 // cerr<<"Have "<< d_ttdWatch.size() <<" queries in flight"<<endl;
5cbfa5f6
BH
137 typename SendReceive::Answer answer;
138 typename SendReceive::Identifier id;
139
bf2aaa52 140 // get as many answers as available - 'receive' should block for a short while to wait for an answer
5cbfa5f6 141 while(d_sr.receive(id, answer)) {
bf2aaa52
BH
142 typename ttdwatch_t::iterator ival = d_ttdWatch.find(id); // match up what we received to what we were waiting for
143
144 if(ival != d_ttdWatch.end()) { // found something!
4957a608 145 ++processed;
232f0877
CH
146 struct timeval now;
147 gettimeofday(&now, 0);
148 unsigned int usec = 1000000*(now.tv_sec - ival->sentTime.tv_sec) + (now.tv_usec - ival->sentTime.tv_usec);
3ed022a7 149 d_sr.deliverAnswer(*ival->iter, answer, usec); // deliver to sender/receiver
4957a608
BH
150 d_ttdWatch.erase(ival);
151 break; // we can send new questions!
152 }
153 else {
d712a557 154 // cerr<<"UNEXPECTED ANSWER: "<<id<<endl;
4957a608
BH
155 d_unexpectedResponse++;
156 }
5cbfa5f6
BH
157 }
158
159
bf2aaa52 160 if(!processed /* || d_ttdWatch.size() > 10000 */ ) { // no new responses, time for some cleanup of the ttdWatch
5cbfa5f6
BH
161 struct timeval now;
162 gettimeofday(&now, 0);
4957a608
BH
163
164 typedef typename ttdwatch_t::template index<TimeTag>::type waiters_by_ttd_index_t;
165 waiters_by_ttd_index_t& waiters_index = boost::multi_index::get<TimeTag>(d_ttdWatch);
5cbfa5f6 166
bf2aaa52 167 // this provides a list of items sorted by age
5cbfa5f6
BH
168 for(typename waiters_by_ttd_index_t::iterator valiter = waiters_index.begin(); valiter != waiters_index.end(); ) {
169 if(valiter->ttd.tv_sec < now.tv_sec || (valiter->ttd.tv_sec == now.tv_sec && valiter->ttd.tv_usec < now.tv_usec)) {
a4382ef3 170 d_sr.deliverTimeout(valiter->id); // so backend can release id
5cbfa5f6 171 waiters_index.erase(valiter++);
d712a557
BH
172 // cerr<<"Have timeout for id="<< valiter->id <<endl;
173 d_timeouts++;
5cbfa5f6 174 }
4957a608 175 else
bf2aaa52 176 break; // if this one was too new, rest will be too
5cbfa5f6
BH
177 }
178 }
179 }
180 if(d_ttdWatch.empty() && d_iter == d_container.end())
181 break;
182 }
bf2aaa52 183 return false;
5cbfa5f6
BH
184}
185
186#if 0
187StatBag S;
188
189struct SendReceive
190{
191 typedef int Identifier;
192 typedef int Answer;
193 ComboAddress d_remote;
194 int d_socket;
195 int d_id;
196
197 SendReceive()
198 {
199 d_id = 0;
200 d_socket = socket(AF_INET, SOCK_DGRAM, 0);
201 int val=1;
202 setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
203
204 ComboAddress local("0.0.0.0", 1024);
205 bind(d_socket, (struct sockaddr*)&local, local.getSocklen());
206
207 char buf[512];
208
209 socklen_t remotelen=sizeof(d_remote);
210 cerr<<"Waiting for 'hi' on "<<local.toStringWithPort()<<endl;
211 int len = recvfrom(d_socket, buf, sizeof(buf), 0, (struct sockaddr*)&d_remote, &remotelen);
212 cerr<<d_remote.toStringWithPort()<<" sent 'hi': "<<string(buf, len);
213 Utility::setNonBlocking(d_socket);
214 connect(d_socket, (struct sockaddr*) &d_remote, d_remote.getSocklen());
215 }
216
217 ~SendReceive()
218 {
219 ::send(d_socket, "done\r\n", 6, 0);
220 }
221
222 Identifier send(int& i)
223 {
224 cerr<<"Sending a '"<<i<<"'"<<endl;
225 string msg = (boost::format("%d %d\n") % d_id % i).str();
226 ::send(d_socket, msg.c_str(), msg.length(), 0);
227 return d_id++;
228 }
229
230 bool receive(Identifier& id, int& i)
231 {
232 if(waitForData(d_socket, 0, 500000) > 0) {
233 char buf[512];
234
235 int len = recv(d_socket, buf, sizeof(buf), 0);
236 string msg(buf, len);
237 if(sscanf(msg.c_str(), "%d %d", &id, &i) != 2) {
4957a608 238 throw runtime_error("Invalid input");
5cbfa5f6
BH
239 }
240 return 1;
241 }
242 return 0;
243 }
244
245 void deliverAnswer(int& i, int j)
246 {
247 cerr<<"We sent "<<i<<", got back: "<<j<<endl;
248 }
249};
250
251
252int main()
253{
254 vector<int> numbers;
255 SendReceive sr;
256 Inflighter<vector<int>, SendReceive> inflighter(numbers, sr);
257
258 for(int n=0; n < 100; ++n)
259 numbers.push_back(n*n);
260
261
262 for(;;) {
263 try {
264 inflighter.run();
265 break;
266 }
267 catch(exception& e) {
268 cerr<<"Caught exception: "<<e.what()<<endl;
269 }
270 }
271
272}
273#endif