]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/inflighter.cc
Add some notes explaining why some validations are not relevant in the dnstap case.
[thirdparty/pdns.git] / pdns / inflighter.cc
CommitLineData
12471842
PL
1/*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
870a0fe4
AT
22#ifdef HAVE_CONFIG_H
23#include "config.h"
24#endif
5cbfa5f6
BH
25#include <vector>
26#include <deque>
27#include <iostream>
fa8fd4d2 28
5cbfa5f6
BH
29#include <boost/multi_index_container.hpp>
30#include <boost/format.hpp>
31#include <sys/time.h>
32#include <time.h>
33#include "iputils.hh"
34#include "statbag.hh"
35#include <sys/socket.h>
36
10f4eea8 37#include "namespaces.hh"
5cbfa5f6
BH
38using namespace boost::multi_index;
39
40struct TimeTag{};
41
42template<typename Container, typename SenderReceiver> class Inflighter
43{
44public:
45 Inflighter(Container& c, SenderReceiver& sr) : d_container(c), d_sr(sr), d_init(false)
46 {
47 d_burst = 2;
48 d_maxInFlight = 5;
49 d_timeoutSeconds = 3;
50 d_unexpectedResponse = d_timeouts = 0;
51 }
52 void init()
53 {
54 d_iter = d_container.begin();
55 d_init=true;
56 }
bf2aaa52
BH
57
58 bool run(); //!< keep calling this as long as it returns 1, or if it throws an exception
5cbfa5f6
BH
59
60 unsigned int d_maxInFlight;
61 unsigned int d_timeoutSeconds;
62 int d_burst;
63
910d27bb
BH
64 uint64_t getTimeouts()
65 {
66 return d_timeouts;
67 }
68
d712a557
BH
69 uint64_t getUnexpecteds()
70 {
71 return d_unexpectedResponse;
72 }
73
5cbfa5f6
BH
74private:
75 struct TTDItem
76 {
77 typename Container::iterator iter;
78 typename SenderReceiver::Identifier id;
3ed022a7 79 struct timeval sentTime, ttd;
5cbfa5f6
BH
80 };
81
82 typedef multi_index_container<
83 TTDItem,
84 indexed_by<
85 ordered_unique<
4957a608 86 member<TTDItem, typename SenderReceiver::Identifier, &TTDItem::id>
5cbfa5f6
BH
87 >,
88 ordered_non_unique<
4957a608
BH
89 tag<TimeTag>,
90 member<TTDItem, struct timeval, &TTDItem::ttd>
5cbfa5f6
BH
91 >
92 >
93 >ttdwatch_t;
94
95 Container& d_container;
96 SenderReceiver& d_sr;
97
5cbfa5f6
BH
98 ttdwatch_t d_ttdWatch;
99 typename Container::iterator d_iter;
100 bool d_init;
101
102 uint64_t d_unexpectedResponse, d_timeouts;
103};
104
bf2aaa52 105template<typename Container, typename SendReceive> bool Inflighter<Container, SendReceive>::run()
5cbfa5f6
BH
106{
107 if(!d_init)
108 init();
109
5cbfa5f6
BH
110 for(;;) {
111 int burst = 0;
bf2aaa52
BH
112
113 // 'send' as many items as allowed, limited by 'max in flight' and our burst parameter (which limits query rate growth)
5cbfa5f6
BH
114 while(d_iter != d_container.end() && d_ttdWatch.size() < d_maxInFlight) {
115 TTDItem ttdi;
9b082a14 116 ttdi.iter = d_iter++;
5cbfa5f6 117 ttdi.id = d_sr.send(*ttdi.iter);
3ed022a7
BH
118 gettimeofday(&ttdi.sentTime, 0);
119 ttdi.ttd = ttdi.sentTime;
5cbfa5f6 120 ttdi.ttd.tv_sec += d_timeoutSeconds;
bf2aaa52
BH
121 if(d_ttdWatch.count(ttdi.id)) {
122// cerr<<"DUPLICATE INSERT!"<<endl;
123 }
5cbfa5f6
BH
124 d_ttdWatch.insert(ttdi);
125
126 if(++burst == d_burst)
127 break;
128 }
129 int processed=0;
bf2aaa52
BH
130
131
132 // if there are queries in flight, handle responses
5cbfa5f6 133 if(!d_ttdWatch.empty()) {
4957a608 134 // cerr<<"Have "<< d_ttdWatch.size() <<" queries in flight"<<endl;
5cbfa5f6
BH
135 typename SendReceive::Answer answer;
136 typename SendReceive::Identifier id;
137
bf2aaa52 138 // get as many answers as available - 'receive' should block for a short while to wait for an answer
5cbfa5f6 139 while(d_sr.receive(id, answer)) {
bf2aaa52
BH
140 typename ttdwatch_t::iterator ival = d_ttdWatch.find(id); // match up what we received to what we were waiting for
141
142 if(ival != d_ttdWatch.end()) { // found something!
4957a608 143 ++processed;
232f0877
CH
144 struct timeval now;
145 gettimeofday(&now, 0);
146 unsigned int usec = 1000000*(now.tv_sec - ival->sentTime.tv_sec) + (now.tv_usec - ival->sentTime.tv_usec);
3ed022a7 147 d_sr.deliverAnswer(*ival->iter, answer, usec); // deliver to sender/receiver
4957a608
BH
148 d_ttdWatch.erase(ival);
149 break; // we can send new questions!
150 }
151 else {
d712a557 152 // cerr<<"UNEXPECTED ANSWER: "<<id<<endl;
4957a608
BH
153 d_unexpectedResponse++;
154 }
5cbfa5f6
BH
155 }
156
157
bf2aaa52 158 if(!processed /* || d_ttdWatch.size() > 10000 */ ) { // no new responses, time for some cleanup of the ttdWatch
5cbfa5f6
BH
159 struct timeval now;
160 gettimeofday(&now, 0);
4957a608
BH
161
162 typedef typename ttdwatch_t::template index<TimeTag>::type waiters_by_ttd_index_t;
163 waiters_by_ttd_index_t& waiters_index = boost::multi_index::get<TimeTag>(d_ttdWatch);
5cbfa5f6 164
bf2aaa52 165 // this provides a list of items sorted by age
5cbfa5f6
BH
166 for(typename waiters_by_ttd_index_t::iterator valiter = waiters_index.begin(); valiter != waiters_index.end(); ) {
167 if(valiter->ttd.tv_sec < now.tv_sec || (valiter->ttd.tv_sec == now.tv_sec && valiter->ttd.tv_usec < now.tv_usec)) {
a4382ef3 168 d_sr.deliverTimeout(valiter->id); // so backend can release id
5cbfa5f6 169 waiters_index.erase(valiter++);
d712a557
BH
170 // cerr<<"Have timeout for id="<< valiter->id <<endl;
171 d_timeouts++;
5cbfa5f6 172 }
4957a608 173 else
bf2aaa52 174 break; // if this one was too new, rest will be too
5cbfa5f6
BH
175 }
176 }
177 }
178 if(d_ttdWatch.empty() && d_iter == d_container.end())
179 break;
180 }
bf2aaa52 181 return false;
5cbfa5f6
BH
182}
183
184#if 0
185StatBag S;
186
187struct SendReceive
188{
189 typedef int Identifier;
190 typedef int Answer;
191 ComboAddress d_remote;
192 int d_socket;
193 int d_id;
194
195 SendReceive()
196 {
197 d_id = 0;
198 d_socket = socket(AF_INET, SOCK_DGRAM, 0);
199 int val=1;
200 setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
201
202 ComboAddress local("0.0.0.0", 1024);
203 bind(d_socket, (struct sockaddr*)&local, local.getSocklen());
204
205 char buf[512];
206
207 socklen_t remotelen=sizeof(d_remote);
208 cerr<<"Waiting for 'hi' on "<<local.toStringWithPort()<<endl;
209 int len = recvfrom(d_socket, buf, sizeof(buf), 0, (struct sockaddr*)&d_remote, &remotelen);
210 cerr<<d_remote.toStringWithPort()<<" sent 'hi': "<<string(buf, len);
211 Utility::setNonBlocking(d_socket);
212 connect(d_socket, (struct sockaddr*) &d_remote, d_remote.getSocklen());
213 }
214
215 ~SendReceive()
216 {
217 ::send(d_socket, "done\r\n", 6, 0);
218 }
219
220 Identifier send(int& i)
221 {
222 cerr<<"Sending a '"<<i<<"'"<<endl;
223 string msg = (boost::format("%d %d\n") % d_id % i).str();
224 ::send(d_socket, msg.c_str(), msg.length(), 0);
225 return d_id++;
226 }
227
228 bool receive(Identifier& id, int& i)
229 {
230 if(waitForData(d_socket, 0, 500000) > 0) {
231 char buf[512];
232
233 int len = recv(d_socket, buf, sizeof(buf), 0);
234 string msg(buf, len);
235 if(sscanf(msg.c_str(), "%d %d", &id, &i) != 2) {
4957a608 236 throw runtime_error("Invalid input");
5cbfa5f6
BH
237 }
238 return 1;
239 }
240 return 0;
241 }
242
243 void deliverAnswer(int& i, int j)
244 {
245 cerr<<"We sent "<<i<<", got back: "<<j<<endl;
246 }
247};
248
249
250int main()
251{
252 vector<int> numbers;
253 SendReceive sr;
254 Inflighter<vector<int>, SendReceive> inflighter(numbers, sr);
255
256 for(int n=0; n < 100; ++n)
257 numbers.push_back(n*n);
258
259
260 for(;;) {
261 try {
262 inflighter.run();
263 break;
264 }
265 catch(exception& e) {
266 cerr<<"Caught exception: "<<e.what()<<endl;
267 }
268 }
269
270}
271#endif