]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
move dnsdist to native c++2011 primitives & our variadic template logging
authorbert hubert <bert.hubert@netherlabs.nl>
Sat, 17 Jan 2015 22:24:03 +0000 (23:24 +0100)
committerbert hubert <bert.hubert@netherlabs.nl>
Sat, 17 Jan 2015 22:24:03 +0000 (23:24 +0100)
pdns/dnsdist.cc
pdns/dolog.hh [new file with mode: 0644]

index f7110078bfe35ddb0800d551fdc1e462b2ffdfdb..30733a495ef9652c919792b024e8e0118d39f4c8 100644 (file)
@@ -1,6 +1,6 @@
 /*
     PowerDNS Versatile Database Driven Nameserver
-    Copyright (C) 2013  PowerDNS.COM BV
+    Copyright (C) 2013 - 2015  PowerDNS.COM BV
 
     This program is free software; you can redistribute it and/or modify
     it under the terms of the GNU General Public License version 2
 #include <netinet/tcp.h>
 #include <boost/program_options.hpp>
 #include <boost/foreach.hpp>
+#include <thread>
 #include <limits>
+#include <atomic>
 #include "arguments.hh"
+#include "dolog.hh"
 
 /* syntax: dnsdist 8.8.8.8 8.8.4.4 208.67.222.222 208.67.220.220
    Added downstream server 8.8.8.8:53
@@ -46,19 +49,15 @@ ArgvMap& arg()
 StatBag S;
 namespace po = boost::program_options;
 po::variables_map g_vm;
-
+using std::atomic;
+using std::thread;
 bool g_verbose;
-AtomicCounter g_pos;
-AtomicCounter g_regexBlocks;
+atomic<uint64_t> g_pos;
+atomic<uint64_t> g_regexBlocks;
 uint16_t g_maxOutstanding;
 bool g_console;
 
-#define infolog(X,Y) if(g_verbose) { syslog(LOG_INFO, "%s", (boost::format((X)) % Y).str().c_str()); \
-    if(g_console) cout << boost::format((X)) %Y << endl; } do{}while(0)
-#define warnlog(X,Y) { syslog(LOG_WARNING, "%s", (boost::format((X)) % Y).str().c_str()); \
-    if(g_console) cout << boost::format((X)) %Y << endl; } do{}while(0)
-#define errlog(X,Y) {syslog(LOG_ERR, "%s", (boost::format((X)) % Y).str().c_str()); \
-    if(g_console) cout << boost::format((X)) %Y << endl; }do{}while(0)
+#define vinfolog if(g_verbose)infolog
 
 /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
    Then we have a bunch of connected sockets for talking to downstream servers. 
@@ -78,35 +77,40 @@ bool g_console;
 struct IDState
 {
   IDState() : origFD(-1) {}
+  IDState(const IDState& orig)
+  {
+    origFD = orig.origFD;
+    origID = orig.origID;
+    origRemote = orig.origRemote;
+    age.store(orig.age.load());
+  }
 
   int origFD;  // set to <0 to indicate this state is empty
   uint16_t origID;
   ComboAddress origRemote;
-  AtomicCounter age;
+  atomic<uint64_t> age;
 };
 
-struct DownstreamState
+struct DownstreamState 
 {
   int fd;            
-  pthread_t tid;
+  thread tid;
   ComboAddress remote;
-
   vector<IDState> idStates;
-  AtomicCounter idOffset;
-  AtomicCounter sendErrors;
-  AtomicCounter outstanding;
-  AtomicCounter reuseds;
-  AtomicCounter queries;
+  atomic<uint64_t> idOffset{0};
+  atomic<uint64_t> sendErrors{0};
+  atomic<uint64_t> outstanding{0};
+  atomic<uint64_t> reuseds{0};
+  atomic<uint64_t> queries{0};
 };
 
 DownstreamState* g_dstates;
 unsigned int g_numdownstreams;
 
 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
-void* responderThread(void *p)
+void* responderThread(DownstreamState* state)
 {
-  DownstreamState* state = (DownstreamState*)p;
-  char packet[65536];
+  char packet[4096];
 
   struct dnsheader* dh = (struct dnsheader*)packet;
   int len;
@@ -122,11 +126,12 @@ void* responderThread(void *p)
     if(ids->origFD < 0)
       continue;
     else
-      --state->outstanding;  // you'd think you could game this, but we're using connected socket
+      --state->outstanding;  // you'd think an attacker could game this, but we're using connected socket
 
     dh->id = ids->origID;
     sendto(ids->origFD, packet, len, 0, (struct sockaddr*)&ids->origRemote, ids->origRemote.getSocklen());
-    infolog("Got answer from %s, relayed to %s", state->remote.toStringWithPort() % ids->origRemote.toStringWithPort());
+
+    vinfolog("Got answer from %s, relayed to %s", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort());
 
     ids->origFD = -1;
   }
@@ -173,11 +178,9 @@ static void daemonize(void)
 
 
 // listens to incoming queries, sends out to downstream servers, noting the intended return path 
-void* udpClientThread(void* p)
+void* udpClientThread(ClientState* cs)
 try
 {
-  ClientState* cs = (ClientState*) p;
-
   ComboAddress remote;
   remote.sin4.sin_family = cs->local.sin4.sin_family;
   socklen_t socklen = cs->local.getSocklen();
@@ -206,7 +209,6 @@ try
       }
     }
 
-    /* right now, this is our simple round robin downstream selector */
     DownstreamState& ss = getBestDownstream();
     ss.queries++;
 
@@ -219,7 +221,7 @@ try
       ss.reuseds++;
 
     ids->origFD = cs->udpFD;
-    ids->age = AtomicCounter();
+    ids->age = 0;
     ids->origID = dh->id;
     ids->origRemote = remote;
 
@@ -229,7 +231,7 @@ try
     if(len < 0) 
       ss.sendErrors++;
 
-    infolog("Got query from %s, relayed to %s", remote.toStringWithPort() % ss.remote.toStringWithPort());
+    vinfolog("Got query from %s, relayed to %s", remote.toStringWithPort(), ss.remote.toStringWithPort());
   }
   return 0;
 }
@@ -267,7 +269,8 @@ catch(...)
 int getTCPDownstream(DownstreamState** ds)
 {
   *ds = &getBestDownstream();
-  infolog("TCP connecting to downstream %s", (*ds)->remote.toStringWithPort());
+  
+  vinfolog("TCP connecting to downstream %s", (*ds)->remote.toStringWithPort());
   int sock = SSocket((*ds)->remote.sin4.sin_family, SOCK_STREAM, 0);
   SConnect(sock, (*ds)->remote);
   return sock;
@@ -304,12 +307,13 @@ struct ConnectionInfo
   ComboAddress remote;
 };
 
-void* tcpClientThread(void* p);
+void* tcpClientThread(int pipefd);
+
 class TCPClientCollection {
   vector<int> d_tcpclientthreads;
-  AtomicCounter d_pos;
+  atomic<uint64_t> d_pos;
 public:
-  AtomicCounter d_queued, d_numthreads;
+  atomic<uint64_t> d_queued, d_numthreads;
 
   TCPClientCollection()
   {
@@ -326,27 +330,25 @@ public:
   // Should not be called simultaneously!
   void addTCPClientThread()
   {  
-    infolog("Adding TCP Client threa%c", 'd');
+    
+    vinfolog("Adding TCP Client thread");
+
     int pipefds[2];
     if(pipe(pipefds) < 0)
       unixDie("Creating pipe");
-    int *fd = new int(pipefds[0]);
-    d_tcpclientthreads.push_back(pipefds[1]);
-    
-    pthread_t tid;
-    pthread_create(&tid, 0, tcpClientThread, (void*)fd);
+
+    d_tcpclientthreads.push_back(pipefds[1]);    
+    thread t1(tcpClientThread, pipefds[0]);
+    t1.detach();
     ++d_numthreads;
   }
 } g_tcpclientthreads;
 
 
-void* tcpClientThread(void* p)
+void* tcpClientThread(int pipefd)
 {
   /* we get launched with a pipe on which we receive file descriptors from clients that we own
      from that point on */
-  int pipefd = *(int*)p;
-  delete (int*)p;
-
   int dsock = -1;
   DownstreamState *ds=0;
   
@@ -360,7 +362,7 @@ void* tcpClientThread(void* p)
     if(dsock == -1)
       dsock = getTCPDownstream(&ds);
     else {
-      infolog("Reusing existing TCP connection to %s", ds->remote.toStringWithPort());
+      vinfolog("Reusing existing TCP connection to %s", ds->remote.toStringWithPort());
     }
 
     uint16_t qlen, rlen;
@@ -376,7 +378,7 @@ void* tcpClientThread(void* p)
         // FIXME: drop AXFR queries here, they confuse us
       retry:; 
         if(!putMsgLen(dsock, qlen)) {
-          infolog("Downstream connection to %s died on us, getting a new one!", ds->remote.toStringWithPort());
+         vinfolog("Downstream connection to %s died on us, getting a new one!", ds->remote.toStringWithPort());
           close(dsock);
           dsock=getTCPDownstream(&ds);
           goto retry;
@@ -385,7 +387,7 @@ void* tcpClientThread(void* p)
         writen2(dsock, query, qlen);
       
         if(!getMsgLen(dsock, &rlen)) {
-          infolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->remote.toStringWithPort());
+         vinfolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->remote.toStringWithPort());
           close(dsock);
           dsock=getTCPDownstream(&ds);
           goto retry;
@@ -399,7 +401,8 @@ void* tcpClientThread(void* p)
       }
     }
     catch(...){}
-    infolog("Closing client connection with %s", ci.remote.toStringWithPort());
+    
+    vinfolog("Closing client connection with %s", ci.remote.toStringWithPort());
     close(ci.fd); 
     ci.fd=-1;
     --ds->outstanding;
@@ -424,7 +427,8 @@ void* tcpAcceptorThread(void* p)
     try {
       ConnectionInfo* ci = new ConnectionInfo;      
       ci->fd = SAccept(cs->tcpFD, remote);
-      infolog("Got connection from %s", remote.toStringWithPort());
+
+      vinfolog("Got connection from %s", remote.toStringWithPort());
       
       ci->remote = remote;
       writen2(g_tcpclientthreads.getThread(), &ci, sizeof(ci));
@@ -436,14 +440,17 @@ void* tcpAcceptorThread(void* p)
 }
 
 
-void* statThread(void*)
+void* statThread()
 {
   int interval = 1;
   if(!interval)
     return 0;
   uint32_t lastQueries=0;
-  vector<DownstreamState> prev;
-  prev.resize(g_numdownstreams);
+
+  uint64_t pqueries[g_numdownstreams];
+
+  for(unsigned int n=0; n < g_numdownstreams; ++n) 
+    pqueries[n] = g_dstates[n].queries.load();
 
   for(;;) {
     sleep(interval);
@@ -455,15 +462,16 @@ void* statThread(void*)
     uint64_t numQueries=0;
     for(unsigned int n=0; n < g_numdownstreams; ++n) {
       DownstreamState& dss = g_dstates[n];
-      infolog(" %s: %d outstanding, %f qps", dss.remote.toStringWithPort() % dss.outstanding % ((dss.queries - prev[n].queries)/interval));
+
+      vinfolog(" %s: %d outstanding, %f qps", dss.remote.toStringWithPort(), dss.outstanding.load(), ((dss.queries.load() - pqueries[n])/interval));
 
       outstanding += dss.outstanding;
-      prev[n].queries = dss.queries;
+      pqueries[n]=dss.queries.load();
       numQueries += dss.queries;
       for(unsigned int i=0 ; i < g_maxOutstanding; ++i) {
         IDState& ids = dss.idStates[i];
         if(ids.origFD >=0 && ids.age++ > 2) {
-          ids.age = AtomicCounter();
+          ids.age = 0;
           ids.origFD = -1;
           dss.reuseds++;
           --dss.outstanding;
@@ -471,7 +479,7 @@ void* statThread(void*)
       }
     }
 
-    infolog("%d outstanding queries, %d qps", outstanding  % ((numQueries - lastQueries)/interval));
+    vinfolog("%d outstanding queries, %d qps", outstanding, ((numQueries - lastQueries)/interval));
     lastQueries=numQueries;
   }
   return 0;
@@ -524,8 +532,7 @@ try
     daemonize();
   }
   else {
-    infolog("Running in the %s", "foreground");
-
+    vinfolog("Running in the foreground");
   }
 
   vector<string> remotes = g_vm["remotes"].as<vector<string> >();
@@ -533,30 +540,29 @@ try
   g_numdownstreams = remotes.size();
   g_dstates = new DownstreamState[g_numdownstreams];
   int pos=0;
-  BOOST_FOREACH(const string& remote, remotes) {
+  for(const string& remote : remotes) {
     DownstreamState& dss = g_dstates[pos++];
  
     dss.remote = ComboAddress(remote, 53);
-    
+
     dss.fd = SSocket(dss.remote.sin4.sin_family, SOCK_DGRAM, 0);
     SConnect(dss.fd, dss.remote);
 
     dss.idStates.resize(g_maxOutstanding);
 
-
     infolog("Added downstream server %s", dss.remote.toStringWithPort());
 
-    pthread_create(&dss.tid, 0, responderThread, (void*)&dss);
+    dss.tid = move(thread(responderThread, &dss));
   }
 
-  pthread_t tid;
   vector<string> locals;
   if(g_vm.count("local"))
     locals = g_vm["local"].as<vector<string> >();
   else
     locals.push_back("::");
 
-  BOOST_FOREACH(const string& local, locals) {
+  for(const string& local : locals) {
+    cerr<<local<<endl;
     ClientState* cs = new ClientState;
     cs->local= ComboAddress(local, 53);
     cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
@@ -565,10 +571,11 @@ try
     }
     SBind(cs->udpFD, cs->local);    
 
-    pthread_create(&tid, 0, udpClientThread, (void*) cs);
+    thread t1(udpClientThread, cs);
+    t1.detach();
   }
 
-  BOOST_FOREACH(const string& local, locals) {
+  for(const string& local : locals) {
     ClientState* cs = new ClientState;
     cs->local= ComboAddress(local, 53);
 
@@ -585,15 +592,13 @@ try
     SBind(cs->tcpFD, cs->local);
     SListen(cs->tcpFD, 64);
     warnlog("Listening on %s",cs->local.toStringWithPort());
-
-    pthread_create(&tid, 0, tcpAcceptorThread, (void*) cs);
+    
+    thread t1(tcpAcceptorThread, cs);
+    t1.detach();
   }
 
-  pthread_t stattid;
-  pthread_create(&stattid, 0, statThread, 0);
-  void* status;
-
-  pthread_join(tid, &status);
+  thread stattid(statThread);
+  stattid.join();
 }
 catch(std::exception &e)
 {
diff --git a/pdns/dolog.hh b/pdns/dolog.hh
new file mode 100644 (file)
index 0000000..a3b632c
--- /dev/null
@@ -0,0 +1,78 @@
+#pragma once
+#include <iostream>
+#include <sstream>
+#include <syslog.h>
+
+/* This file is intended not to be metronome specific, and is simple example of C++2011
+   variadic templates in action.
+
+   The goal is rapid easy to use logging to console & syslog. 
+
+   Usage: 
+          string address="localhost";
+          infolog("Bound to %s port %d", address, port);
+          warnlog("Query took %d milliseconds", 1232.4); // yes, %d
+          errlog("Unable to bind to %s: %s", ca.toStringWithPort(), strerr(errno));
+
+   If bool g_console is true, will log to stdout. Will syslog in any case with LOG_INFO,
+   LOG_WARNING, LOG_ERR respectively. If g_verbose=false, infolog is a noop.
+   More generically, dolog(someiostream, "Hello %s", stream) will log to someiostream
+
+   This will happily print a string to %d! Doesn't do further format processing.
+*/
+
+inline void dolog(std::ostream& os, const char*s)
+{
+  os<<s;
+}
+
+template<typename T, typename... Args>
+void dolog(std::ostream& os, const char* s, T value, Args... args)
+{
+  while (*s) {
+    if (*s == '%') {
+      if (*(s + 1) == '%') {
+       ++s;
+      }
+      else {
+       os << value;
+       s += 2;
+       dolog(os, s, args...); 
+       return;
+      }
+    }
+    os << *s++;
+  }    
+}
+
+extern bool g_console;
+extern bool g_verbose;
+
+template<typename... Args>
+void genlog(int level, const char* s, Args... args)
+{
+  std::ostringstream str;
+  dolog(str, s, args...);
+  syslog(level, "%s", str.str().c_str());
+  if(g_console) 
+    std::cout<<str.str()<<std::endl;
+}
+
+template<typename... Args>
+void infolog(const char* s, Args... args)
+{
+  if(g_verbose)
+    genlog(LOG_INFO, s, args...);
+}
+
+template<typename... Args>
+void warnlog(const char* s, Args... args)
+{
+  genlog(LOG_WARNING, s, args...);
+}
+
+template<typename... Args>
+void errlog(const char* s, Args... args)
+{
+  genlog(LOG_ERR, s, args...);
+}