8 #include <boost/multi_index_container.hpp>
9 #include <boost/format.hpp>
14 #include <sys/socket.h>
16 #include "namespaces.hh"
17 using namespace boost::multi_index
;
21 template<typename Container
, typename SenderReceiver
> class Inflighter
24 Inflighter(Container
& c
, SenderReceiver
& sr
) : d_container(c
), d_sr(sr
), d_init(false)
29 d_unexpectedResponse
= d_timeouts
= 0;
33 d_iter
= d_container
.begin();
37 bool run(); //!< keep calling this as long as it returns 1, or if it throws an exception
39 unsigned int d_maxInFlight
;
40 unsigned int d_timeoutSeconds
;
43 uint64_t getTimeouts()
48 uint64_t getUnexpecteds()
50 return d_unexpectedResponse
;
56 typename
Container::iterator iter
;
57 typename
SenderReceiver::Identifier id
;
58 struct timeval sentTime
, ttd
;
61 typedef multi_index_container
<
65 member
<TTDItem
, typename
SenderReceiver::Identifier
, &TTDItem::id
>
69 member
<TTDItem
, struct timeval
, &TTDItem::ttd
>
74 Container
& d_container
;
77 ttdwatch_t d_ttdWatch
;
78 typename
Container::iterator d_iter
;
81 uint64_t d_unexpectedResponse
, d_timeouts
;
84 template<typename Container
, typename SendReceive
> bool Inflighter
<Container
, SendReceive
>::run()
92 // 'send' as many items as allowed, limited by 'max in flight' and our burst parameter (which limits query rate growth)
93 while(d_iter
!= d_container
.end() && d_ttdWatch
.size() < d_maxInFlight
) {
96 ttdi
.id
= d_sr
.send(*ttdi
.iter
);
97 gettimeofday(&ttdi
.sentTime
, 0);
98 ttdi
.ttd
= ttdi
.sentTime
;
99 ttdi
.ttd
.tv_sec
+= d_timeoutSeconds
;
100 if(d_ttdWatch
.count(ttdi
.id
)) {
101 // cerr<<"DUPLICATE INSERT!"<<endl;
103 d_ttdWatch
.insert(ttdi
);
105 if(++burst
== d_burst
)
111 // if there are queries in flight, handle responses
112 if(!d_ttdWatch
.empty()) {
113 // cerr<<"Have "<< d_ttdWatch.size() <<" queries in flight"<<endl;
114 typename
SendReceive::Answer answer
;
115 typename
SendReceive::Identifier id
;
117 // get as many answers as available - 'receive' should block for a short while to wait for an answer
118 while(d_sr
.receive(id
, answer
)) {
119 typename
ttdwatch_t::iterator ival
= d_ttdWatch
.find(id
); // match up what we received to what we were waiting for
121 if(ival
!= d_ttdWatch
.end()) { // found something!
124 gettimeofday(&now
, 0);
125 unsigned int usec
= 1000000*(now
.tv_sec
- ival
->sentTime
.tv_sec
) + (now
.tv_usec
- ival
->sentTime
.tv_usec
);
126 d_sr
.deliverAnswer(*ival
->iter
, answer
, usec
); // deliver to sender/receiver
127 d_ttdWatch
.erase(ival
);
128 break; // we can send new questions!
131 // cerr<<"UNEXPECTED ANSWER: "<<id<<endl;
132 d_unexpectedResponse
++;
137 if(!processed
/* || d_ttdWatch.size() > 10000 */ ) { // no new responses, time for some cleanup of the ttdWatch
139 gettimeofday(&now
, 0);
141 typedef typename
ttdwatch_t::template index
<TimeTag
>::type waiters_by_ttd_index_t
;
142 waiters_by_ttd_index_t
& waiters_index
= boost::multi_index::get
<TimeTag
>(d_ttdWatch
);
144 // this provides a list of items sorted by age
145 for(typename
waiters_by_ttd_index_t::iterator valiter
= waiters_index
.begin(); valiter
!= waiters_index
.end(); ) {
146 if(valiter
->ttd
.tv_sec
< now
.tv_sec
|| (valiter
->ttd
.tv_sec
== now
.tv_sec
&& valiter
->ttd
.tv_usec
< now
.tv_usec
)) {
147 d_sr
.deliverTimeout(valiter
->id
); // so backend can release id
148 waiters_index
.erase(valiter
++);
149 // cerr<<"Have timeout for id="<< valiter->id <<endl;
153 break; // if this one was too new, rest will be too
157 if(d_ttdWatch
.empty() && d_iter
== d_container
.end())
168 typedef int Identifier
;
170 ComboAddress d_remote
;
177 d_socket
= socket(AF_INET
, SOCK_DGRAM
, 0);
179 setsockopt(d_socket
, SOL_SOCKET
, SO_REUSEADDR
, &val
, sizeof(val
));
181 ComboAddress
local("0.0.0.0", 1024);
182 bind(d_socket
, (struct sockaddr
*)&local
, local
.getSocklen());
186 socklen_t remotelen
=sizeof(d_remote
);
187 cerr
<<"Waiting for 'hi' on "<<local
.toStringWithPort()<<endl
;
188 int len
= recvfrom(d_socket
, buf
, sizeof(buf
), 0, (struct sockaddr
*)&d_remote
, &remotelen
);
189 cerr
<<d_remote
.toStringWithPort()<<" sent 'hi': "<<string(buf
, len
);
190 Utility::setNonBlocking(d_socket
);
191 connect(d_socket
, (struct sockaddr
*) &d_remote
, d_remote
.getSocklen());
196 ::send(d_socket
, "done\r\n", 6, 0);
199 Identifier
send(int& i
)
201 cerr
<<"Sending a '"<<i
<<"'"<<endl
;
202 string msg
= (boost::format("%d %d\n") % d_id
% i
).str();
203 ::send(d_socket
, msg
.c_str(), msg
.length(), 0);
207 bool receive(Identifier
& id
, int& i
)
209 if(waitForData(d_socket
, 0, 500000) > 0) {
212 int len
= recv(d_socket
, buf
, sizeof(buf
), 0);
213 string
msg(buf
, len
);
214 if(sscanf(msg
.c_str(), "%d %d", &id
, &i
) != 2) {
215 throw runtime_error("Invalid input");
222 void deliverAnswer(int& i
, int j
)
224 cerr
<<"We sent "<<i
<<", got back: "<<j
<<endl
;
233 Inflighter
<vector
<int>, SendReceive
> inflighter(numbers
, sr
);
235 for(int n
=0; n
< 100; ++n
)
236 numbers
.push_back(n
*n
);
244 catch(exception
& e
) {
245 cerr
<<"Caught exception: "<<e
.what()<<endl
;