]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/inflighter.cc
Merge pull request #14021 from Habbie/auth-lua-join-whitespace
[thirdparty/pdns.git] / pdns / inflighter.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25 #include <vector>
26 #include <deque>
27 #include <iostream>
28
29 #include <boost/multi_index_container.hpp>
30 #include <boost/format.hpp>
31 #include <sys/time.h>
32 #include <time.h>
33 #include "iputils.hh"
34 #include "statbag.hh"
35 #include <sys/socket.h>
36
37 #include "namespaces.hh"
38 using namespace boost::multi_index;
39
40 struct TimeTag{};
41
42 template<typename Container, typename SenderReceiver> class Inflighter
43 {
44 public:
45 Inflighter(Container& c, SenderReceiver& sr) : d_container(c), d_sr(sr), d_init(false)
46 {
47 d_burst = 2;
48 d_maxInFlight = 5;
49 d_timeoutSeconds = 3;
50 d_unexpectedResponse = d_timeouts = 0;
51 }
52 void init()
53 {
54 d_iter = d_container.begin();
55 d_init=true;
56 }
57
58 bool run(); //!< keep calling this as long as it returns 1, or if it throws an exception
59
60 unsigned int d_maxInFlight;
61 unsigned int d_timeoutSeconds;
62 int d_burst;
63
64 uint64_t getTimeouts()
65 {
66 return d_timeouts;
67 }
68
69 uint64_t getUnexpecteds()
70 {
71 return d_unexpectedResponse;
72 }
73
74 private:
75 struct TTDItem
76 {
77 typename Container::iterator iter;
78 typename SenderReceiver::Identifier id;
79 struct timeval sentTime, ttd;
80 };
81
82 typedef multi_index_container<
83 TTDItem,
84 indexed_by<
85 ordered_unique<
86 member<TTDItem, typename SenderReceiver::Identifier, &TTDItem::id>
87 >,
88 ordered_non_unique<
89 tag<TimeTag>,
90 member<TTDItem, struct timeval, &TTDItem::ttd>
91 >
92 >
93 >ttdwatch_t;
94
95 Container& d_container;
96 SenderReceiver& d_sr;
97
98 ttdwatch_t d_ttdWatch;
99 typename Container::iterator d_iter;
100 bool d_init;
101
102 uint64_t d_unexpectedResponse, d_timeouts;
103 };
104
105 template<typename Container, typename SendReceive> bool Inflighter<Container, SendReceive>::run()
106 {
107 if(!d_init)
108 init();
109
110 for(;;) {
111 int burst = 0;
112
113 // 'send' as many items as allowed, limited by 'max in flight' and our burst parameter (which limits query rate growth)
114 while(d_iter != d_container.end() && d_ttdWatch.size() < d_maxInFlight) {
115 TTDItem ttdi;
116 ttdi.iter = d_iter++;
117 ttdi.id = d_sr.send(*ttdi.iter);
118 gettimeofday(&ttdi.sentTime, 0);
119 ttdi.ttd = ttdi.sentTime;
120 ttdi.ttd.tv_sec += d_timeoutSeconds;
121 if(d_ttdWatch.count(ttdi.id)) {
122 // cerr<<"DUPLICATE INSERT!"<<endl;
123 }
124 d_ttdWatch.insert(ttdi);
125
126 if(++burst == d_burst)
127 break;
128 }
129 int processed=0;
130
131
132 // if there are queries in flight, handle responses
133 if(!d_ttdWatch.empty()) {
134 // cerr<<"Have "<< d_ttdWatch.size() <<" queries in flight"<<endl;
135 typename SendReceive::Answer answer;
136 typename SendReceive::Identifier id;
137
138 // get as many answers as available - 'receive' should block for a short while to wait for an answer
139 while(d_sr.receive(id, answer)) {
140 typename ttdwatch_t::iterator ival = d_ttdWatch.find(id); // match up what we received to what we were waiting for
141
142 if(ival != d_ttdWatch.end()) { // found something!
143 ++processed;
144 struct timeval now;
145 gettimeofday(&now, 0);
146 unsigned int usec = 1000000*(now.tv_sec - ival->sentTime.tv_sec) + (now.tv_usec - ival->sentTime.tv_usec);
147 d_sr.deliverAnswer(*ival->iter, answer, usec); // deliver to sender/receiver
148 d_ttdWatch.erase(ival);
149 break; // we can send new questions!
150 }
151 else {
152 // cerr<<"UNEXPECTED ANSWER: "<<id<<endl;
153 d_unexpectedResponse++;
154 }
155 }
156
157
158 if(!processed /* || d_ttdWatch.size() > 10000 */ ) { // no new responses, time for some cleanup of the ttdWatch
159 struct timeval now;
160 gettimeofday(&now, 0);
161
162 typedef typename ttdwatch_t::template index<TimeTag>::type waiters_by_ttd_index_t;
163 waiters_by_ttd_index_t& waiters_index = boost::multi_index::get<TimeTag>(d_ttdWatch);
164
165 // this provides a list of items sorted by age
166 for(typename waiters_by_ttd_index_t::iterator valiter = waiters_index.begin(); valiter != waiters_index.end(); ) {
167 if(valiter->ttd.tv_sec < now.tv_sec || (valiter->ttd.tv_sec == now.tv_sec && valiter->ttd.tv_usec < now.tv_usec)) {
168 d_sr.deliverTimeout(valiter->id); // so backend can release id
169 waiters_index.erase(valiter++);
170 // cerr<<"Have timeout for id="<< valiter->id <<endl;
171 d_timeouts++;
172 }
173 else
174 break; // if this one was too new, rest will be too
175 }
176 }
177 }
178 if(d_ttdWatch.empty() && d_iter == d_container.end())
179 break;
180 }
181 return false;
182 }
183
184 #if 0
185 StatBag S;
186
187 struct SendReceive
188 {
189 typedef int Identifier;
190 typedef int Answer;
191 ComboAddress d_remote;
192 int d_socket;
193 int d_id;
194
195 SendReceive()
196 {
197 d_id = 0;
198 d_socket = socket(AF_INET, SOCK_DGRAM, 0);
199 int val=1;
200 setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
201
202 ComboAddress local("0.0.0.0", 1024);
203 bind(d_socket, (struct sockaddr*)&local, local.getSocklen());
204
205 char buf[512];
206
207 socklen_t remotelen=sizeof(d_remote);
208 cerr<<"Waiting for 'hi' on "<<local.toStringWithPort()<<endl;
209 int len = recvfrom(d_socket, buf, sizeof(buf), 0, (struct sockaddr*)&d_remote, &remotelen);
210 cerr<<d_remote.toStringWithPort()<<" sent 'hi': "<<string(buf, len);
211 Utility::setNonBlocking(d_socket);
212 connect(d_socket, (struct sockaddr*) &d_remote, d_remote.getSocklen());
213 }
214
215 ~SendReceive()
216 {
217 ::send(d_socket, "done\r\n", 6, 0);
218 }
219
220 Identifier send(int& i)
221 {
222 cerr<<"Sending a '"<<i<<"'"<<endl;
223 string msg = (boost::format("%d %d\n") % d_id % i).str();
224 ::send(d_socket, msg.c_str(), msg.length(), 0);
225 return d_id++;
226 }
227
228 bool receive(Identifier& id, int& i)
229 {
230 if(waitForData(d_socket, 0, 500000) > 0) {
231 char buf[512];
232
233 int len = recv(d_socket, buf, sizeof(buf), 0);
234 string msg(buf, len);
235 if(sscanf(msg.c_str(), "%d %d", &id, &i) != 2) {
236 throw runtime_error("Invalid input");
237 }
238 return 1;
239 }
240 return 0;
241 }
242
243 void deliverAnswer(int& i, int j)
244 {
245 cerr<<"We sent "<<i<<", got back: "<<j<<endl;
246 }
247 };
248
249
250 int main()
251 {
252 vector<int> numbers;
253 SendReceive sr;
254 Inflighter<vector<int>, SendReceive> inflighter(numbers, sr);
255
256 for(int n=0; n < 100; ++n)
257 numbers.push_back(n*n);
258
259
260 for(;;) {
261 try {
262 inflighter.run();
263 break;
264 }
265 catch(exception& e) {
266 cerr<<"Caught exception: "<<e.what()<<endl;
267 }
268 }
269
270 }
271 #endif