setSocketBuffer(fd, SO_SNDBUF, size);
}
+void sendThread(const vector<Socket*>* sockets, const vector<vector<uint8_t> >* packets, int qps, bool even)
+{
+
+ int burst=20;
+ struct timespec nsec;
+ nsec.tv_sec=0;
+ nsec.tv_nsec=(unsigned long)(burst*1000000000.0/qps);
+
+
+ int count=0;
+
+ for(const auto& p : *packets) {
+ count++;
+ if((count%2)==even)
+ continue;
+
+ (*sockets)[count % sockets->size()]->write((const char*)&p[0], p.size());
+ if(!(count%burst))
+ nanosleep(&nsec, 0);
+ }
+}
+
// calidns queryfile destination qps
+
+
+
int main(int argc, char** argv)
try
{
+ struct sched_param param;
+ param.sched_priority=99;
+
+ if(sched_setscheduler(0, SCHED_FIFO, ¶m) < 0)
+ unixDie("setting scheduler");
+
ifstream ifs(argv[1]);
string line;
reportAllTypes();
cout<<"Generated "<<packets.size()<<" queries"<<endl;
random_shuffle(packets.begin(), packets.end());
-
- Socket sock(AF_INET, SOCK_DGRAM);
- ComboAddress dest(argv[2]);
- sock.connect(dest);
- setSocketSendBuffer(sock.getHandle(), 1000000);
- setSocketReceiveBuffer(sock.getHandle(), 1000000);
-
-
- thread t1(recvThread, &sock);
+ vector<Socket*> sockets;
+ ComboAddress dest(argv[2]);
+ for(int i=0; i < 6; ++i) {
+ Socket *sock = new Socket(AF_INET, SOCK_DGRAM);
+
+ sock->connect(dest);
+ setSocketSendBuffer(sock->getHandle(), 2000000);
+ setSocketReceiveBuffer(sock->getHandle(), 2000000);
+ sockets.push_back(sock);
+ new thread(recvThread, sock);
+ }
int qps=atoi(argv[3]);
- cout<<"Calibration run, aiming at "<<qps<< "qps"<<endl;
- int burst=40;
- struct timespec nsec;
- nsec.tv_sec=0;
- nsec.tv_nsec=(unsigned long)(burst*1000000000.0/qps);
- DTime dt;
- dt.set();
- int count=0;
- for(const auto& p : packets) {
- sock.write((const char*)&p[0], p.size());
- if(!((count++)%burst))
- nanosleep(&nsec, 0);
+ ofstream plot("plot");
+ for(qps=10000;;qps+=5000) {
+ cout<<"Aiming at "<<qps<< "qps"<<endl;
+ g_recvcounter.store(0);
+ DTime dt;
+ dt.set();
+
+ thread t1(sendThread, &sockets, &packets, qps, 0);
+ thread t2(sendThread, &sockets, &packets, qps, 1);
+
+ t1.join();
+ t2.join();
+
+ auto udiff = dt.udiff();
+ auto realqps=packets.size()/(udiff/1000000.0);
+ cout<<"Achieved "<<realqps<<"qps"<< " over "<< udiff/1000000.0<<" seconds"<<endl;
+
+ usleep(50000);
+ double perc=g_recvcounter.load()*100.0/packets.size();
+ cout<<"Received "<<g_recvcounter.load()<<" packets ("<<perc<<")"<<endl;
+ plot<<qps<<" "<<realqps<<" "<<perc<<endl;
}
- auto udiff = dt.udiff();
- auto realqps=packets.size()/(udiff/1000000.0);
- cout<<"Achieved "<<realqps<<"qps"<< " over "<< udiff/1000000.0<<" seconds"<<endl;
-
- sleep(1);
-
- cout<<"Received "<<g_recvcounter.load()<<" packets"<<endl;
- t1.detach();
+ plot.flush();
+ // t1.detach();
}
catch(std::exception& e)
{