#include <atomic>
#include "statbag.hh"
#include <fstream>
+#include <poll.h>
+#include <memory>
using std::thread;
+using std::unique_ptr;
StatBag S;
std::atomic<unsigned int> g_recvcounter;
volatile bool g_done;
-void* recvThread(Socket* s)
+void* recvThread(const vector<Socket*>* sockets)
{
- char response[1500];
+ vector<pollfd> rfds, fds;
+ for(const auto& s : *sockets) {
+ struct pollfd pfd;
+ pfd.fd = s->getHandle();
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+ rfds.push_back(pfd);
+ }
+
+ int err;
+
+ vector<struct mmsghdr> buf(100);
+ for(auto& m : buf) {
+ fillMSGHdr(&m.msg_hdr, new struct iovec, new char[512], 512, new char[1500], 1500, new ComboAddress("127.0.0.1"));
+ }
+
while(!g_done) {
- try {
- s->read(response, sizeof(response));
- g_recvcounter++;
+ fds=rfds;
+
+ err = poll(&fds[0], fds.size(), -1);
+ if(err < 0) {
+ if(errno==EINTR)
+ continue;
+ unixDie("Unable to poll for new UDP events");
+ }
+
+
+ for(struct pollfd &pfd : fds) {
+ if(pfd.revents & POLLIN) {
+
+ if((err=recvmmsg(pfd.fd, &buf[0], buf.size(), MSG_WAITFORONE, 0)) < 0 ) {
+ if(errno != EAGAIN)
+ cerr<<"recvfrom gave error, ignoring: "<<strerror(errno)<<endl;
+ unixDie("recvmmsg");
+ continue;
+ }
+ g_recvcounter+=err;
+ }
}
- catch(...){}
}
+
return 0;
}
setSocketBuffer(fd, SO_SNDBUF, size);
}
-void sendThread(const vector<Socket*>* sockets, const vector<vector<uint8_t> >* packets, int qps, bool even)
+void sendThread(const vector<Socket*>* sockets, const vector<vector<uint8_t> >* packets, int qps, ComboAddress dest)
{
-
- int burst=20;
+ unsigned int burst=100;
struct timespec nsec;
nsec.tv_sec=0;
- nsec.tv_nsec=(unsigned long)(burst*1000000000.0/qps);
-
-
+ nsec.tv_nsec=1*(unsigned long)(burst*1000000000.0/qps);
int count=0;
+ struct Unit {
+ Unit(){}
+ Unit(const Unit&) = delete;
+ Unit(Unit&&) = default;
+ struct msghdr msgh;
+ struct iovec iov;
+ char cbuf[256];
+ struct mmsghdr mmh;
+ };
+ vector<Unit> units;
+
for(const auto& p : *packets) {
count++;
- if((count%2)==even)
- continue;
- (*sockets)[count % sockets->size()]->write((const char*)&p[0], p.size());
- if(!(count%burst))
+ Unit u;
+
+ fillMSGHdr(&u.msgh, &u.iov, u.cbuf, 0, (char*)&p[0], p.size(), &dest);
+ units.emplace_back(std::move(u));
+
+ if(units.size()==burst) {
+ vector<struct mmsghdr> job;
+ for(auto& u : units) {
+ job.push_back({u.msgh, 0});
+ }
+ sendmmsg((*sockets)[count % sockets->size()]->getHandle(),
+ &job[0], job.size(), 0);
nanosleep(&nsec, 0);
+ units.clear();
+
+ }
}
}
// calidns queryfile destination qps
-
-
-
int main(int argc, char** argv)
try
{
vector<Socket*> sockets;
ComboAddress dest(argv[2]);
- for(int i=0; i < 6; ++i) {
+ for(int i=0; i < 24; ++i) {
Socket *sock = new Socket(AF_INET, SOCK_DGRAM);
- sock->connect(dest);
+ // sock->connect(dest);
setSocketSendBuffer(sock->getHandle(), 2000000);
setSocketReceiveBuffer(sock->getHandle(), 2000000);
sockets.push_back(sock);
- new thread(recvThread, sock);
- }
+ }
+ new thread(recvThread, &sockets);
int qps=atoi(argv[3]);
ofstream plot("plot");
DTime dt;
dt.set();
- thread t1(sendThread, &sockets, &packets, qps, 0);
- thread t2(sendThread, &sockets, &packets, qps, 1);
-
- t1.join();
- t2.join();
+ sendThread(&sockets, &packets, qps, dest);
auto udiff = dt.udiff();
auto realqps=packets.size()/(udiff/1000000.0);
usleep(50000);
double perc=g_recvcounter.load()*100.0/packets.size();
- cout<<"Received "<<g_recvcounter.load()<<" packets ("<<perc<<")"<<endl;
- plot<<qps<<" "<<realqps<<" "<<perc<<endl;
+ cout<<"Received "<<g_recvcounter.load()<<" packets ("<<perc<<"%)"<<endl;
+ plot<<qps<<" "<<realqps<<" "<<perc<<" "<<g_recvcounter.load()/(udiff/1000000.0)<<endl;
}
plot.flush();
// t1.detach();
--- /dev/null
+#include "iputils.hh"
+#include "sstuff.hh"
+#include "statbag.hh"
+
+StatBag S;
+
+int main(int argc, char** argv)
+try
+{
+ if(argc != 3) {
+ cerr<<"Syntax: dumresp local-address number-of-threads"<<endl;
+ exit(EXIT_FAILURE);
+ }
+
+ for(int i=1 ; i < atoi(argv[2]); ++i) {
+ if(!fork())
+ break;
+ }
+ Socket s(AF_INET, SOCK_DGRAM);
+ ComboAddress local(argv[1], 5300);
+
+ int one=1;
+ if(setsockopt(s.getHandle(), SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0)
+ unixDie("setsockopt for REUSEPORT");
+
+ s.bind(local);
+ char buffer[1500];
+ struct dnsheader* dh = (struct dnsheader*)buffer;
+ int len;
+ ComboAddress rem=local;
+ socklen_t socklen = rem.getSocklen();
+ for(;;) {
+ len=recvfrom(s.getHandle(), buffer, sizeof(buffer), 0, (struct sockaddr*)&rem, &socklen);
+ if(len < 0)
+ unixDie("recvfrom");
+
+ if(dh->qr)
+ continue;
+ dh->qr=1;
+ dh->ad=0;
+ if(sendto(s.getHandle(), buffer, len, 0, (struct sockaddr*)&rem, socklen) < 0)
+ unixDie("sendto");
+
+ }
+}
+catch(std::exception& e)
+{
+ cerr<<"Fatal error: "<<e.what()<<endl;
+ exit(EXIT_FAILURE);
+}