]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/inflighter.cc
99a1e541aafe91a3ca8576e320c8da29c9a204a2
[thirdparty/pdns.git] / pdns / inflighter.cc
1 #ifdef HAVE_CONFIG_H
2 #include "config.h"
3 #endif
4 #include <vector>
5 #include <deque>
6 #include <iostream>
7
8 #include <boost/multi_index_container.hpp>
9 #include <boost/format.hpp>
10 #include <sys/time.h>
11 #include <time.h>
12 #include "iputils.hh"
13 #include "statbag.hh"
14 #include <sys/socket.h>
15
16 #include "namespaces.hh"
17 using namespace boost::multi_index;
18
19 struct TimeTag{};
20
21 template<typename Container, typename SenderReceiver> class Inflighter
22 {
23 public:
24 Inflighter(Container& c, SenderReceiver& sr) : d_container(c), d_sr(sr), d_init(false)
25 {
26 d_burst = 2;
27 d_maxInFlight = 5;
28 d_timeoutSeconds = 3;
29 d_unexpectedResponse = d_timeouts = 0;
30 }
31 void init()
32 {
33 d_iter = d_container.begin();
34 d_init=true;
35 }
36
37 bool run(); //!< keep calling this as long as it returns 1, or if it throws an exception
38
39 unsigned int d_maxInFlight;
40 unsigned int d_timeoutSeconds;
41 int d_burst;
42
43 uint64_t getTimeouts()
44 {
45 return d_timeouts;
46 }
47
48 uint64_t getUnexpecteds()
49 {
50 return d_unexpectedResponse;
51 }
52
53 private:
54 struct TTDItem
55 {
56 typename Container::iterator iter;
57 typename SenderReceiver::Identifier id;
58 struct timeval sentTime, ttd;
59 };
60
61 typedef multi_index_container<
62 TTDItem,
63 indexed_by<
64 ordered_unique<
65 member<TTDItem, typename SenderReceiver::Identifier, &TTDItem::id>
66 >,
67 ordered_non_unique<
68 tag<TimeTag>,
69 member<TTDItem, struct timeval, &TTDItem::ttd>
70 >
71 >
72 >ttdwatch_t;
73
74 Container& d_container;
75 SenderReceiver& d_sr;
76
77 ttdwatch_t d_ttdWatch;
78 typename Container::iterator d_iter;
79 bool d_init;
80
81 uint64_t d_unexpectedResponse, d_timeouts;
82 };
83
84 template<typename Container, typename SendReceive> bool Inflighter<Container, SendReceive>::run()
85 {
86 if(!d_init)
87 init();
88
89 for(;;) {
90 int burst = 0;
91
92 // 'send' as many items as allowed, limited by 'max in flight' and our burst parameter (which limits query rate growth)
93 while(d_iter != d_container.end() && d_ttdWatch.size() < d_maxInFlight) {
94 TTDItem ttdi;
95 ttdi.iter = d_iter++;
96 ttdi.id = d_sr.send(*ttdi.iter);
97 gettimeofday(&ttdi.sentTime, 0);
98 ttdi.ttd = ttdi.sentTime;
99 ttdi.ttd.tv_sec += d_timeoutSeconds;
100 if(d_ttdWatch.count(ttdi.id)) {
101 // cerr<<"DUPLICATE INSERT!"<<endl;
102 }
103 d_ttdWatch.insert(ttdi);
104
105 if(++burst == d_burst)
106 break;
107 }
108 int processed=0;
109
110
111 // if there are queries in flight, handle responses
112 if(!d_ttdWatch.empty()) {
113 // cerr<<"Have "<< d_ttdWatch.size() <<" queries in flight"<<endl;
114 typename SendReceive::Answer answer;
115 typename SendReceive::Identifier id;
116
117 // get as many answers as available - 'receive' should block for a short while to wait for an answer
118 while(d_sr.receive(id, answer)) {
119 typename ttdwatch_t::iterator ival = d_ttdWatch.find(id); // match up what we received to what we were waiting for
120
121 if(ival != d_ttdWatch.end()) { // found something!
122 ++processed;
123 struct timeval now;
124 gettimeofday(&now, 0);
125 unsigned int usec = 1000000*(now.tv_sec - ival->sentTime.tv_sec) + (now.tv_usec - ival->sentTime.tv_usec);
126 d_sr.deliverAnswer(*ival->iter, answer, usec); // deliver to sender/receiver
127 d_ttdWatch.erase(ival);
128 break; // we can send new questions!
129 }
130 else {
131 // cerr<<"UNEXPECTED ANSWER: "<<id<<endl;
132 d_unexpectedResponse++;
133 }
134 }
135
136
137 if(!processed /* || d_ttdWatch.size() > 10000 */ ) { // no new responses, time for some cleanup of the ttdWatch
138 struct timeval now;
139 gettimeofday(&now, 0);
140
141 typedef typename ttdwatch_t::template index<TimeTag>::type waiters_by_ttd_index_t;
142 waiters_by_ttd_index_t& waiters_index = boost::multi_index::get<TimeTag>(d_ttdWatch);
143
144 // this provides a list of items sorted by age
145 for(typename waiters_by_ttd_index_t::iterator valiter = waiters_index.begin(); valiter != waiters_index.end(); ) {
146 if(valiter->ttd.tv_sec < now.tv_sec || (valiter->ttd.tv_sec == now.tv_sec && valiter->ttd.tv_usec < now.tv_usec)) {
147 d_sr.deliverTimeout(valiter->id); // so backend can release id
148 waiters_index.erase(valiter++);
149 // cerr<<"Have timeout for id="<< valiter->id <<endl;
150 d_timeouts++;
151 }
152 else
153 break; // if this one was too new, rest will be too
154 }
155 }
156 }
157 if(d_ttdWatch.empty() && d_iter == d_container.end())
158 break;
159 }
160 return false;
161 }
162
163 #if 0
164 StatBag S;
165
166 struct SendReceive
167 {
168 typedef int Identifier;
169 typedef int Answer;
170 ComboAddress d_remote;
171 int d_socket;
172 int d_id;
173
174 SendReceive()
175 {
176 d_id = 0;
177 d_socket = socket(AF_INET, SOCK_DGRAM, 0);
178 int val=1;
179 setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
180
181 ComboAddress local("0.0.0.0", 1024);
182 bind(d_socket, (struct sockaddr*)&local, local.getSocklen());
183
184 char buf[512];
185
186 socklen_t remotelen=sizeof(d_remote);
187 cerr<<"Waiting for 'hi' on "<<local.toStringWithPort()<<endl;
188 int len = recvfrom(d_socket, buf, sizeof(buf), 0, (struct sockaddr*)&d_remote, &remotelen);
189 cerr<<d_remote.toStringWithPort()<<" sent 'hi': "<<string(buf, len);
190 Utility::setNonBlocking(d_socket);
191 connect(d_socket, (struct sockaddr*) &d_remote, d_remote.getSocklen());
192 }
193
194 ~SendReceive()
195 {
196 ::send(d_socket, "done\r\n", 6, 0);
197 }
198
199 Identifier send(int& i)
200 {
201 cerr<<"Sending a '"<<i<<"'"<<endl;
202 string msg = (boost::format("%d %d\n") % d_id % i).str();
203 ::send(d_socket, msg.c_str(), msg.length(), 0);
204 return d_id++;
205 }
206
207 bool receive(Identifier& id, int& i)
208 {
209 if(waitForData(d_socket, 0, 500000) > 0) {
210 char buf[512];
211
212 int len = recv(d_socket, buf, sizeof(buf), 0);
213 string msg(buf, len);
214 if(sscanf(msg.c_str(), "%d %d", &id, &i) != 2) {
215 throw runtime_error("Invalid input");
216 }
217 return 1;
218 }
219 return 0;
220 }
221
222 void deliverAnswer(int& i, int j)
223 {
224 cerr<<"We sent "<<i<<", got back: "<<j<<endl;
225 }
226 };
227
228
229 int main()
230 {
231 vector<int> numbers;
232 SendReceive sr;
233 Inflighter<vector<int>, SendReceive> inflighter(numbers, sr);
234
235 for(int n=0; n < 100; ++n)
236 numbers.push_back(n*n);
237
238
239 for(;;) {
240 try {
241 inflighter.run();
242 break;
243 }
244 catch(exception& e) {
245 cerr<<"Caught exception: "<<e.what()<<endl;
246 }
247 }
248
249 }
250 #endif