4 #include <boost/foreach.hpp>
5 #include <boost/multi_index_container.hpp>
6 #include <boost/format.hpp>
11 #include <sys/socket.h>
14 using namespace boost::multi_index
;
18 template<typename Container
, typename SenderReceiver
> class Inflighter
21 Inflighter(Container
& c
, SenderReceiver
& sr
) : d_container(c
), d_sr(sr
), d_init(false)
26 d_unexpectedResponse
= d_timeouts
= 0;
30 d_iter
= d_container
.begin();
35 unsigned int d_maxInFlight
;
36 unsigned int d_timeoutSeconds
;
42 typename
Container::iterator iter
;
43 typename
SenderReceiver::Identifier id
;
47 typedef multi_index_container
<
51 member
<TTDItem
, typename
SenderReceiver::Identifier
, &TTDItem::id
>
55 member
<TTDItem
, struct timeval
, &TTDItem::ttd
>
60 Container
& d_container
;
65 ttdwatch_t d_ttdWatch
;
66 typename
Container::iterator d_iter
;
69 uint64_t d_unexpectedResponse
, d_timeouts
;
72 template<typename Container
, typename SendReceive
> void Inflighter
<Container
, SendReceive
>::run()
77 // cout << "Have "<<d_container.size() << " things to do!"<<endl;
81 while(d_iter
!= d_container
.end() && d_ttdWatch
.size() < d_maxInFlight
) {
84 ttdi
.id
= d_sr
.send(*ttdi
.iter
);
85 gettimeofday(&ttdi
.ttd
, 0);
86 ttdi
.ttd
.tv_sec
+= d_timeoutSeconds
;
88 d_ttdWatch
.insert(ttdi
);
90 if(++burst
== d_burst
)
94 if(!d_ttdWatch
.empty()) {
95 // cerr<<"Have "<< d_ttdWatch.size() <<" queries in flight"<<endl;
96 typename
SendReceive::Answer answer
;
97 typename
SendReceive::Identifier id
;
99 while(d_sr
.receive(id
, answer
)) {
100 typename
ttdwatch_t::iterator ival
= d_ttdWatch
.find(id
);
101 if(ival
!= d_ttdWatch
.end()) {
103 // cerr<<"Received expected item with id '"<<id<<"' and value '"<<item<<"'"<<endl;
104 d_sr
.deliverAnswer(*ival
->iter
, answer
);
105 d_ttdWatch
.erase(ival
);
106 break; // we can send new questions!
109 // cerr<<"UNEXPECTED ANSWER!"<<endl;
110 d_unexpectedResponse
++;
115 if(!processed
) { // no new responses, time for some cleanup
117 gettimeofday(&now
, 0);
119 typedef typename
ttdwatch_t::template index
<TimeTag
>::type waiters_by_ttd_index_t
;
120 waiters_by_ttd_index_t
& waiters_index
= boost::multi_index::get
<TimeTag
>(d_ttdWatch
);
122 for(typename
waiters_by_ttd_index_t::iterator valiter
= waiters_index
.begin(); valiter
!= waiters_index
.end(); ) {
123 if(valiter
->ttd
.tv_sec
< now
.tv_sec
|| (valiter
->ttd
.tv_sec
== now
.tv_sec
&& valiter
->ttd
.tv_usec
< now
.tv_usec
)) {
124 waiters_index
.erase(valiter
++);
125 // cerr<<"Have timeout"<<endl;
132 if(d_ttdWatch
.empty() && d_iter
== d_container
.end())
142 typedef int Identifier
;
144 ComboAddress d_remote
;
151 d_socket
= socket(AF_INET
, SOCK_DGRAM
, 0);
153 setsockopt(d_socket
, SOL_SOCKET
, SO_REUSEADDR
, &val
, sizeof(val
));
155 ComboAddress
local("0.0.0.0", 1024);
156 bind(d_socket
, (struct sockaddr
*)&local
, local
.getSocklen());
160 socklen_t remotelen
=sizeof(d_remote
);
161 cerr
<<"Waiting for 'hi' on "<<local
.toStringWithPort()<<endl
;
162 int len
= recvfrom(d_socket
, buf
, sizeof(buf
), 0, (struct sockaddr
*)&d_remote
, &remotelen
);
163 cerr
<<d_remote
.toStringWithPort()<<" sent 'hi': "<<string(buf
, len
);
164 Utility::setNonBlocking(d_socket
);
165 connect(d_socket
, (struct sockaddr
*) &d_remote
, d_remote
.getSocklen());
170 ::send(d_socket
, "done\r\n", 6, 0);
173 Identifier
send(int& i
)
175 cerr
<<"Sending a '"<<i
<<"'"<<endl
;
176 string msg
= (boost::format("%d %d\n") % d_id
% i
).str();
177 ::send(d_socket
, msg
.c_str(), msg
.length(), 0);
181 bool receive(Identifier
& id
, int& i
)
183 if(waitForData(d_socket
, 0, 500000) > 0) {
186 int len
= recv(d_socket
, buf
, sizeof(buf
), 0);
187 string
msg(buf
, len
);
188 if(sscanf(msg
.c_str(), "%d %d", &id
, &i
) != 2) {
189 throw runtime_error("Invalid input");
196 void deliverAnswer(int& i
, int j
)
198 cerr
<<"We sent "<<i
<<", got back: "<<j
<<endl
;
207 Inflighter
<vector
<int>, SendReceive
> inflighter(numbers
, sr
);
209 for(int n
=0; n
< 100; ++n
)
210 numbers
.push_back(n
*n
);
218 catch(exception
& e
) {
219 cerr
<<"Caught exception: "<<e
.what()<<endl
;