From: bert hubert Date: Mon, 2 Feb 2015 13:40:33 +0000 (+0100) Subject: add dummy responder for load testing, move calidns to recvmmsg and sendmmsg (but... X-Git-Tag: dnsdist-1.0.0-alpha1~308 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c1e527e30426e615263f6fe3a63613483cd984bc;p=thirdparty%2Fpdns.git add dummy responder for load testing, move calidns to recvmmsg and sendmmsg (but probably wrong) --- diff --git a/pdns/Makefile.am b/pdns/Makefile.am index d19a3efd5e..3ac219dd2a 100644 --- a/pdns/Makefile.am +++ b/pdns/Makefile.am @@ -84,9 +84,9 @@ bin_PROGRAMS += \ nproxy \ nsec3dig \ saxfr - + if CXX2011 -bin_PROGRAMS += dnsdist calidns +bin_PROGRAMS += dnsdist calidns dumresp endif endif @@ -102,6 +102,7 @@ EXTRA_PROGRAMS = \ dnsscope \ dnstcpbench \ dnswasher \ + dumresp \ notify \ nproxy \ nsec3dig \ @@ -478,6 +479,7 @@ calidns_SOURCES = \ dnsparser.cc dnsparser.hh \ dnsrecords.cc \ dnswriter.cc dnswriter.hh \ + iputils.cc \ logger.cc \ misc.cc misc.hh \ nsecrecords.cc \ @@ -491,6 +493,14 @@ calidns_SOURCES = \ calidns_LDADD = $(POLARSSL_LIBS) calidns_LDFLAGS=$(THREADFLAGS) +dumresp_SOURCES = \ + dumresp.cc \ + logger.cc \ + misc.cc misc.hh \ + statbag.cc \ + unix_utility.cc \ + qtype.cc + saxfr_SOURCES = \ base32.cc \ diff --git a/pdns/calidns.cc b/pdns/calidns.cc index 48e3b90a1f..0932c9f43d 100644 --- a/pdns/calidns.cc +++ b/pdns/calidns.cc @@ -8,23 +8,59 @@ #include #include "statbag.hh" #include +#include +#include using std::thread; +using std::unique_ptr; StatBag S; std::atomic g_recvcounter; volatile bool g_done; -void* recvThread(Socket* s) +void* recvThread(const vector* sockets) { - char response[1500]; + vector rfds, fds; + for(const auto& s : *sockets) { + struct pollfd pfd; + pfd.fd = s->getHandle(); + pfd.events = POLLIN; + pfd.revents = 0; + rfds.push_back(pfd); + } + + int err; + + vector buf(100); + for(auto& m : buf) { + fillMSGHdr(&m.msg_hdr, new struct iovec, new char[512], 512, new char[1500], 1500, new ComboAddress("127.0.0.1")); + } + while(!g_done) { - try { - s->read(response, sizeof(response)); - g_recvcounter++; + fds=rfds; + + err = poll(&fds[0], fds.size(), -1); + if(err < 0) { + if(errno==EINTR) + continue; + unixDie("Unable to poll for new UDP events"); + } + + + for(struct pollfd &pfd : fds) { + if(pfd.revents & POLLIN) { + + if((err=recvmmsg(pfd.fd, &buf[0], buf.size(), MSG_WAITFORONE, 0)) < 0 ) { + if(errno != EAGAIN) + cerr<<"recvfrom gave error, ignoring: "<* sockets, const vector >* packets, int qps, bool even) +void sendThread(const vector* sockets, const vector >* packets, int qps, ComboAddress dest) { - - int burst=20; + unsigned int burst=100; struct timespec nsec; nsec.tv_sec=0; - nsec.tv_nsec=(unsigned long)(burst*1000000000.0/qps); - - + nsec.tv_nsec=1*(unsigned long)(burst*1000000000.0/qps); int count=0; + struct Unit { + Unit(){} + Unit(const Unit&) = delete; + Unit(Unit&&) = default; + struct msghdr msgh; + struct iovec iov; + char cbuf[256]; + struct mmsghdr mmh; + }; + vector units; + for(const auto& p : *packets) { count++; - if((count%2)==even) - continue; - (*sockets)[count % sockets->size()]->write((const char*)&p[0], p.size()); - if(!(count%burst)) + Unit u; + + fillMSGHdr(&u.msgh, &u.iov, u.cbuf, 0, (char*)&p[0], p.size(), &dest); + units.emplace_back(std::move(u)); + + if(units.size()==burst) { + vector job; + for(auto& u : units) { + job.push_back({u.msgh, 0}); + } + sendmmsg((*sockets)[count % sockets->size()]->getHandle(), + &job[0], job.size(), 0); nanosleep(&nsec, 0); + units.clear(); + + } } } // calidns queryfile destination qps - - - int main(int argc, char** argv) try { @@ -107,16 +159,16 @@ try vector sockets; ComboAddress dest(argv[2]); - for(int i=0; i < 6; ++i) { + for(int i=0; i < 24; ++i) { Socket *sock = new Socket(AF_INET, SOCK_DGRAM); - sock->connect(dest); + // sock->connect(dest); setSocketSendBuffer(sock->getHandle(), 2000000); setSocketReceiveBuffer(sock->getHandle(), 2000000); sockets.push_back(sock); - new thread(recvThread, sock); - } + } + new thread(recvThread, &sockets); int qps=atoi(argv[3]); ofstream plot("plot"); @@ -126,11 +178,7 @@ try DTime dt; dt.set(); - thread t1(sendThread, &sockets, &packets, qps, 0); - thread t2(sendThread, &sockets, &packets, qps, 1); - - t1.join(); - t2.join(); + sendThread(&sockets, &packets, qps, dest); auto udiff = dt.udiff(); auto realqps=packets.size()/(udiff/1000000.0); @@ -138,8 +186,8 @@ try usleep(50000); double perc=g_recvcounter.load()*100.0/packets.size(); - cout<<"Received "<qr) + continue; + dh->qr=1; + dh->ad=0; + if(sendto(s.getHandle(), buffer, len, 0, (struct sockaddr*)&rem, socklen) < 0) + unixDie("sendto"); + + } +} +catch(std::exception& e) +{ + cerr<<"Fatal error: "<