]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
add dummy responder for load testing, move calidns to recvmmsg and sendmmsg (but...
authorbert hubert <bert.hubert@netherlabs.nl>
Mon, 2 Feb 2015 13:40:33 +0000 (14:40 +0100)
committerbert hubert <bert.hubert@netherlabs.nl>
Mon, 2 Feb 2015 13:40:33 +0000 (14:40 +0100)
pdns/Makefile.am
pdns/calidns.cc
pdns/dumresp.cc [new file with mode: 0644]

index d19a3efd5ecb6eccdce87a6a3682a28fd818479d..3ac219dd2a0d0a95cc47a760a984c91ebd393eb0 100644 (file)
@@ -84,9 +84,9 @@ bin_PROGRAMS += \
        nproxy \
        nsec3dig \
        saxfr
-       
+
 if CXX2011
-bin_PROGRAMS += dnsdist calidns
+bin_PROGRAMS += dnsdist calidns dumresp
 endif
 
 endif
@@ -102,6 +102,7 @@ EXTRA_PROGRAMS = \
        dnsscope \
        dnstcpbench \
        dnswasher \
+       dumresp \
        notify \
        nproxy \
        nsec3dig \
@@ -478,6 +479,7 @@ calidns_SOURCES = \
        dnsparser.cc dnsparser.hh \
        dnsrecords.cc \
        dnswriter.cc dnswriter.hh \
+       iputils.cc \
        logger.cc \
        misc.cc misc.hh \
        nsecrecords.cc \
@@ -491,6 +493,14 @@ calidns_SOURCES = \
 calidns_LDADD = $(POLARSSL_LIBS)       
 calidns_LDFLAGS=$(THREADFLAGS) 
 
+dumresp_SOURCES = \
+       dumresp.cc \
+       logger.cc \
+       misc.cc misc.hh \
+       statbag.cc \
+       unix_utility.cc \
+       qtype.cc 
+
 
 saxfr_SOURCES = \
        base32.cc \
index 48e3b90a1fcd70e46cead9795241a06e55a1bd20..0932c9f43d17939ec05f0a13685b744c97517e8a 100644 (file)
@@ -8,23 +8,59 @@
 #include <atomic>
 #include "statbag.hh"
 #include <fstream>
+#include <poll.h>
+#include <memory>
 using std::thread;
+using std::unique_ptr;
 
 StatBag S;
 
 std::atomic<unsigned int> g_recvcounter;
 volatile bool g_done;
 
-void* recvThread(Socket* s)
+void* recvThread(const vector<Socket*>* sockets)
 {
-  char response[1500];
+  vector<pollfd> rfds, fds;
+  for(const auto& s : *sockets) {
+    struct pollfd pfd;
+    pfd.fd = s->getHandle();
+    pfd.events = POLLIN;
+    pfd.revents = 0;
+    rfds.push_back(pfd);
+  }
+
+  int err;
+
+  vector<struct mmsghdr> buf(100);
+  for(auto& m : buf) {
+    fillMSGHdr(&m.msg_hdr, new struct iovec, new char[512], 512, new char[1500], 1500, new ComboAddress("127.0.0.1"));
+  }
+
   while(!g_done) {
-    try {
-      s->read(response, sizeof(response));
-      g_recvcounter++;
+    fds=rfds;
+
+    err = poll(&fds[0], fds.size(), -1);
+    if(err < 0) {
+      if(errno==EINTR)
+       continue;
+      unixDie("Unable to poll for new UDP events");
+    }    
+
+    
+    for(struct pollfd &pfd : fds) {
+      if(pfd.revents & POLLIN) {
+       
+       if((err=recvmmsg(pfd.fd, &buf[0], buf.size(), MSG_WAITFORONE, 0)) < 0 ) {
+         if(errno != EAGAIN)
+           cerr<<"recvfrom gave error, ignoring: "<<strerror(errno)<<endl;
+         unixDie("recvmmsg");
+         continue;
+       }
+       g_recvcounter+=err;     
+      }
     }
-    catch(...){}
   }
+
   return 0;
 }
 
@@ -54,34 +90,50 @@ static void setSocketSendBuffer(int fd, uint32_t size)
   setSocketBuffer(fd, SO_SNDBUF, size);
 }
 
-void sendThread(const vector<Socket*>* sockets, const vector<vector<uint8_t> >* packets, int qps, bool even)
+void sendThread(const vector<Socket*>* sockets, const vector<vector<uint8_t> >* packets, int qps, ComboAddress dest)
 {
-
-  int burst=20;
+  unsigned int burst=100;
   struct timespec nsec;
   nsec.tv_sec=0;
-  nsec.tv_nsec=(unsigned long)(burst*1000000000.0/qps);
-  
-  
+  nsec.tv_nsec=1*(unsigned long)(burst*1000000000.0/qps);
   int count=0;
 
+  struct Unit {
+    Unit(){}
+    Unit(const Unit&) = delete;
+    Unit(Unit&&) = default;
+    struct msghdr msgh;
+    struct iovec iov;
+    char cbuf[256];
+    struct mmsghdr mmh;
+  };
+  vector<Unit> units;
+
   for(const auto& p : *packets) {
     count++;
-    if((count%2)==even)
-      continue;
 
-    (*sockets)[count % sockets->size()]->write((const char*)&p[0], p.size());
-    if(!(count%burst))
+    Unit u;
+
+    fillMSGHdr(&u.msgh, &u.iov, u.cbuf, 0, (char*)&p[0], p.size(), &dest);
+    units.emplace_back(std::move(u));
+    
+    if(units.size()==burst)  {
+      vector<struct mmsghdr> job;
+      for(auto& u : units) {
+       job.push_back({u.msgh, 0});
+      }
+      sendmmsg((*sockets)[count % sockets->size()]->getHandle(), 
+              &job[0], job.size(), 0);
       nanosleep(&nsec, 0);
+      units.clear();
+
+    }
   }
 }
 
 
 // calidns queryfile destination qps
 
-
-
-
 int main(int argc, char** argv)
 try
 {
@@ -107,16 +159,16 @@ try
   
   vector<Socket*> sockets;
   ComboAddress dest(argv[2]);  
-  for(int i=0; i < 6; ++i) {
+  for(int i=0; i < 24; ++i) {
     Socket *sock = new Socket(AF_INET, SOCK_DGRAM);
 
-    sock->connect(dest);
+    //    sock->connect(dest);
     setSocketSendBuffer(sock->getHandle(), 2000000);
     setSocketReceiveBuffer(sock->getHandle(), 2000000);
     sockets.push_back(sock);
-    new thread(recvThread, sock);
-  }
 
+  }
+  new thread(recvThread, &sockets);
   int qps=atoi(argv[3]);
 
   ofstream plot("plot");
@@ -126,11 +178,7 @@ try
     DTime dt;
     dt.set();
 
-    thread t1(sendThread, &sockets, &packets, qps, 0);
-    thread t2(sendThread, &sockets, &packets, qps, 1);
-
-    t1.join();
-    t2.join();
+    sendThread(&sockets, &packets, qps, dest);
     
     auto udiff = dt.udiff();
     auto realqps=packets.size()/(udiff/1000000.0);
@@ -138,8 +186,8 @@ try
     
     usleep(50000);
     double perc=g_recvcounter.load()*100.0/packets.size();
-    cout<<"Received "<<g_recvcounter.load()<<" packets ("<<perc<<")"<<endl;
-    plot<<qps<<" "<<realqps<<" "<<perc<<endl;
+    cout<<"Received "<<g_recvcounter.load()<<" packets ("<<perc<<"%)"<<endl;
+    plot<<qps<<" "<<realqps<<" "<<perc<<" "<<g_recvcounter.load()/(udiff/1000000.0)<<endl;
   }
   plot.flush();
   // t1.detach();
diff --git a/pdns/dumresp.cc b/pdns/dumresp.cc
new file mode 100644 (file)
index 0000000..135a94b
--- /dev/null
@@ -0,0 +1,50 @@
+#include "iputils.hh"
+#include "sstuff.hh"
+#include "statbag.hh"
+
+StatBag S;
+
+int main(int argc, char** argv)
+try
+{
+  if(argc != 3) {
+    cerr<<"Syntax: dumresp local-address number-of-threads"<<endl;
+    exit(EXIT_FAILURE);
+  }
+
+  for(int i=1 ; i < atoi(argv[2]); ++i) {
+    if(!fork())
+      break;
+  }
+  Socket s(AF_INET, SOCK_DGRAM);
+  ComboAddress local(argv[1], 5300);
+  
+  int one=1;
+  if(setsockopt(s.getHandle(), SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one)) < 0)
+    unixDie("setsockopt for REUSEPORT");
+
+  s.bind(local);
+  char buffer[1500];
+  struct dnsheader* dh = (struct dnsheader*)buffer;
+  int len;
+  ComboAddress rem=local;
+  socklen_t socklen = rem.getSocklen();
+  for(;;) {
+    len=recvfrom(s.getHandle(), buffer, sizeof(buffer), 0, (struct sockaddr*)&rem, &socklen);
+    if(len < 0)
+      unixDie("recvfrom");
+
+    if(dh->qr)
+      continue;
+    dh->qr=1;
+    dh->ad=0;
+    if(sendto(s.getHandle(), buffer, len, 0,  (struct sockaddr*)&rem, socklen) < 0)
+      unixDie("sendto");
+
+  }
+}
+catch(std::exception& e)
+{
+  cerr<<"Fatal error: "<<e.what()<<endl;
+  exit(EXIT_FAILURE);
+}