]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/inflighter.cc
and the final bit of whitespace/tab cleanup
[thirdparty/pdns.git] / pdns / inflighter.cc
1 #include <vector>
2 #include <deque>
3 #include <iostream>
4 #include <boost/foreach.hpp>
5 #include <boost/multi_index_container.hpp>
6 #include <boost/format.hpp>
7 #include <sys/time.h>
8 #include <time.h>
9 #include "iputils.hh"
10 #include "statbag.hh"
11 #include <sys/socket.h>
12
13 using namespace std;
14 using namespace boost::multi_index;
15
16 struct TimeTag{};
17
18 template<typename Container, typename SenderReceiver> class Inflighter
19 {
20 public:
21 Inflighter(Container& c, SenderReceiver& sr) : d_container(c), d_sr(sr), d_init(false)
22 {
23 d_burst = 2;
24 d_maxInFlight = 5;
25 d_timeoutSeconds = 3;
26 d_unexpectedResponse = d_timeouts = 0;
27 }
28 void init()
29 {
30 d_iter = d_container.begin();
31 d_init=true;
32 }
33 void run();
34
35 unsigned int d_maxInFlight;
36 unsigned int d_timeoutSeconds;
37 int d_burst;
38
39 private:
40 struct TTDItem
41 {
42 typename Container::iterator iter;
43 typename SenderReceiver::Identifier id;
44 struct timeval ttd;
45 };
46
47 typedef multi_index_container<
48 TTDItem,
49 indexed_by<
50 ordered_unique<
51 member<TTDItem, typename SenderReceiver::Identifier, &TTDItem::id>
52 >,
53 ordered_non_unique<
54 tag<TimeTag>,
55 member<TTDItem, struct timeval, &TTDItem::ttd>
56 >
57 >
58 >ttdwatch_t;
59
60 Container& d_container;
61 SenderReceiver& d_sr;
62
63
64
65 ttdwatch_t d_ttdWatch;
66 typename Container::iterator d_iter;
67 bool d_init;
68
69 uint64_t d_unexpectedResponse, d_timeouts;
70 };
71
72 template<typename Container, typename SendReceive> void Inflighter<Container, SendReceive>::run()
73 {
74 if(!d_init)
75 init();
76
77 // cout << "Have "<<d_container.size() << " things to do!"<<endl;
78
79 for(;;) {
80 int burst = 0;
81 while(d_iter != d_container.end() && d_ttdWatch.size() < d_maxInFlight) {
82 TTDItem ttdi;
83 ttdi.iter = ++d_iter;
84 ttdi.id = d_sr.send(*ttdi.iter);
85 gettimeofday(&ttdi.ttd, 0);
86 ttdi.ttd.tv_sec += d_timeoutSeconds;
87
88 d_ttdWatch.insert(ttdi);
89
90 if(++burst == d_burst)
91 break;
92 }
93 int processed=0;
94 if(!d_ttdWatch.empty()) {
95 // cerr<<"Have "<< d_ttdWatch.size() <<" queries in flight"<<endl;
96 typename SendReceive::Answer answer;
97 typename SendReceive::Identifier id;
98
99 while(d_sr.receive(id, answer)) {
100 typename ttdwatch_t::iterator ival = d_ttdWatch.find(id);
101 if(ival != d_ttdWatch.end()) {
102 ++processed;
103 // cerr<<"Received expected item with id '"<<id<<"' and value '"<<item<<"'"<<endl;
104 d_sr.deliverAnswer(*ival->iter, answer);
105 d_ttdWatch.erase(ival);
106 break; // we can send new questions!
107 }
108 else {
109 // cerr<<"UNEXPECTED ANSWER!"<<endl;
110 d_unexpectedResponse++;
111 }
112 }
113
114
115 if(!processed) { // no new responses, time for some cleanup
116 struct timeval now;
117 gettimeofday(&now, 0);
118
119 typedef typename ttdwatch_t::template index<TimeTag>::type waiters_by_ttd_index_t;
120 waiters_by_ttd_index_t& waiters_index = boost::multi_index::get<TimeTag>(d_ttdWatch);
121
122 for(typename waiters_by_ttd_index_t::iterator valiter = waiters_index.begin(); valiter != waiters_index.end(); ) {
123 if(valiter->ttd.tv_sec < now.tv_sec || (valiter->ttd.tv_sec == now.tv_sec && valiter->ttd.tv_usec < now.tv_usec)) {
124 waiters_index.erase(valiter++);
125 // cerr<<"Have timeout"<<endl;
126 }
127 else
128 break;
129 }
130 }
131 }
132 if(d_ttdWatch.empty() && d_iter == d_container.end())
133 break;
134 }
135 }
136
137 #if 0
138 StatBag S;
139
140 struct SendReceive
141 {
142 typedef int Identifier;
143 typedef int Answer;
144 ComboAddress d_remote;
145 int d_socket;
146 int d_id;
147
148 SendReceive()
149 {
150 d_id = 0;
151 d_socket = socket(AF_INET, SOCK_DGRAM, 0);
152 int val=1;
153 setsockopt(d_socket, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
154
155 ComboAddress local("0.0.0.0", 1024);
156 bind(d_socket, (struct sockaddr*)&local, local.getSocklen());
157
158 char buf[512];
159
160 socklen_t remotelen=sizeof(d_remote);
161 cerr<<"Waiting for 'hi' on "<<local.toStringWithPort()<<endl;
162 int len = recvfrom(d_socket, buf, sizeof(buf), 0, (struct sockaddr*)&d_remote, &remotelen);
163 cerr<<d_remote.toStringWithPort()<<" sent 'hi': "<<string(buf, len);
164 Utility::setNonBlocking(d_socket);
165 connect(d_socket, (struct sockaddr*) &d_remote, d_remote.getSocklen());
166 }
167
168 ~SendReceive()
169 {
170 ::send(d_socket, "done\r\n", 6, 0);
171 }
172
173 Identifier send(int& i)
174 {
175 cerr<<"Sending a '"<<i<<"'"<<endl;
176 string msg = (boost::format("%d %d\n") % d_id % i).str();
177 ::send(d_socket, msg.c_str(), msg.length(), 0);
178 return d_id++;
179 }
180
181 bool receive(Identifier& id, int& i)
182 {
183 if(waitForData(d_socket, 0, 500000) > 0) {
184 char buf[512];
185
186 int len = recv(d_socket, buf, sizeof(buf), 0);
187 string msg(buf, len);
188 if(sscanf(msg.c_str(), "%d %d", &id, &i) != 2) {
189 throw runtime_error("Invalid input");
190 }
191 return 1;
192 }
193 return 0;
194 }
195
196 void deliverAnswer(int& i, int j)
197 {
198 cerr<<"We sent "<<i<<", got back: "<<j<<endl;
199 }
200 };
201
202
203 int main()
204 {
205 vector<int> numbers;
206 SendReceive sr;
207 Inflighter<vector<int>, SendReceive> inflighter(numbers, sr);
208
209 for(int n=0; n < 100; ++n)
210 numbers.push_back(n*n);
211
212
213 for(;;) {
214 try {
215 inflighter.run();
216 break;
217 }
218 catch(exception& e) {
219 cerr<<"Caught exception: "<<e.what()<<endl;
220 }
221 }
222
223 }
224 #endif