From: bert hubert Date: Sat, 17 Jan 2015 22:24:03 +0000 (+0100) Subject: move dnsdist to native c++2011 primitives & our variadic template logging X-Git-Tag: dnsdist-1.0.0-alpha1~324 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=64e4ebb4b5262250b92fa2764d8982b7d6bd0298;p=thirdparty%2Fpdns.git move dnsdist to native c++2011 primitives & our variadic template logging --- diff --git a/pdns/dnsdist.cc b/pdns/dnsdist.cc index f7110078bf..30733a495e 100644 --- a/pdns/dnsdist.cc +++ b/pdns/dnsdist.cc @@ -1,6 +1,6 @@ /* PowerDNS Versatile Database Driven Nameserver - Copyright (C) 2013 PowerDNS.COM BV + Copyright (C) 2013 - 2015 PowerDNS.COM BV This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License version 2 @@ -25,8 +25,11 @@ #include #include #include +#include #include +#include #include "arguments.hh" +#include "dolog.hh" /* syntax: dnsdist 8.8.8.8 8.8.4.4 208.67.222.222 208.67.220.220 Added downstream server 8.8.8.8:53 @@ -46,19 +49,15 @@ ArgvMap& arg() StatBag S; namespace po = boost::program_options; po::variables_map g_vm; - +using std::atomic; +using std::thread; bool g_verbose; -AtomicCounter g_pos; -AtomicCounter g_regexBlocks; +atomic g_pos; +atomic g_regexBlocks; uint16_t g_maxOutstanding; bool g_console; -#define infolog(X,Y) if(g_verbose) { syslog(LOG_INFO, "%s", (boost::format((X)) % Y).str().c_str()); \ - if(g_console) cout << boost::format((X)) %Y << endl; } do{}while(0) -#define warnlog(X,Y) { syslog(LOG_WARNING, "%s", (boost::format((X)) % Y).str().c_str()); \ - if(g_console) cout << boost::format((X)) %Y << endl; } do{}while(0) -#define errlog(X,Y) {syslog(LOG_ERR, "%s", (boost::format((X)) % Y).str().c_str()); \ - if(g_console) cout << boost::format((X)) %Y << endl; }do{}while(0) +#define vinfolog if(g_verbose)infolog /* UDP: the grand design. Per socket we listen on for incoming queries there is one thread. Then we have a bunch of connected sockets for talking to downstream servers. @@ -78,35 +77,40 @@ bool g_console; struct IDState { IDState() : origFD(-1) {} + IDState(const IDState& orig) + { + origFD = orig.origFD; + origID = orig.origID; + origRemote = orig.origRemote; + age.store(orig.age.load()); + } int origFD; // set to <0 to indicate this state is empty uint16_t origID; ComboAddress origRemote; - AtomicCounter age; + atomic age; }; -struct DownstreamState +struct DownstreamState { int fd; - pthread_t tid; + thread tid; ComboAddress remote; - vector idStates; - AtomicCounter idOffset; - AtomicCounter sendErrors; - AtomicCounter outstanding; - AtomicCounter reuseds; - AtomicCounter queries; + atomic idOffset{0}; + atomic sendErrors{0}; + atomic outstanding{0}; + atomic reuseds{0}; + atomic queries{0}; }; DownstreamState* g_dstates; unsigned int g_numdownstreams; // listens on a dedicated socket, lobs answers from downstream servers to original requestors -void* responderThread(void *p) +void* responderThread(DownstreamState* state) { - DownstreamState* state = (DownstreamState*)p; - char packet[65536]; + char packet[4096]; struct dnsheader* dh = (struct dnsheader*)packet; int len; @@ -122,11 +126,12 @@ void* responderThread(void *p) if(ids->origFD < 0) continue; else - --state->outstanding; // you'd think you could game this, but we're using connected socket + --state->outstanding; // you'd think an attacker could game this, but we're using connected socket dh->id = ids->origID; sendto(ids->origFD, packet, len, 0, (struct sockaddr*)&ids->origRemote, ids->origRemote.getSocklen()); - infolog("Got answer from %s, relayed to %s", state->remote.toStringWithPort() % ids->origRemote.toStringWithPort()); + + vinfolog("Got answer from %s, relayed to %s", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort()); ids->origFD = -1; } @@ -173,11 +178,9 @@ static void daemonize(void) // listens to incoming queries, sends out to downstream servers, noting the intended return path -void* udpClientThread(void* p) +void* udpClientThread(ClientState* cs) try { - ClientState* cs = (ClientState*) p; - ComboAddress remote; remote.sin4.sin_family = cs->local.sin4.sin_family; socklen_t socklen = cs->local.getSocklen(); @@ -206,7 +209,6 @@ try } } - /* right now, this is our simple round robin downstream selector */ DownstreamState& ss = getBestDownstream(); ss.queries++; @@ -219,7 +221,7 @@ try ss.reuseds++; ids->origFD = cs->udpFD; - ids->age = AtomicCounter(); + ids->age = 0; ids->origID = dh->id; ids->origRemote = remote; @@ -229,7 +231,7 @@ try if(len < 0) ss.sendErrors++; - infolog("Got query from %s, relayed to %s", remote.toStringWithPort() % ss.remote.toStringWithPort()); + vinfolog("Got query from %s, relayed to %s", remote.toStringWithPort(), ss.remote.toStringWithPort()); } return 0; } @@ -267,7 +269,8 @@ catch(...) int getTCPDownstream(DownstreamState** ds) { *ds = &getBestDownstream(); - infolog("TCP connecting to downstream %s", (*ds)->remote.toStringWithPort()); + + vinfolog("TCP connecting to downstream %s", (*ds)->remote.toStringWithPort()); int sock = SSocket((*ds)->remote.sin4.sin_family, SOCK_STREAM, 0); SConnect(sock, (*ds)->remote); return sock; @@ -304,12 +307,13 @@ struct ConnectionInfo ComboAddress remote; }; -void* tcpClientThread(void* p); +void* tcpClientThread(int pipefd); + class TCPClientCollection { vector d_tcpclientthreads; - AtomicCounter d_pos; + atomic d_pos; public: - AtomicCounter d_queued, d_numthreads; + atomic d_queued, d_numthreads; TCPClientCollection() { @@ -326,27 +330,25 @@ public: // Should not be called simultaneously! void addTCPClientThread() { - infolog("Adding TCP Client threa%c", 'd'); + + vinfolog("Adding TCP Client thread"); + int pipefds[2]; if(pipe(pipefds) < 0) unixDie("Creating pipe"); - int *fd = new int(pipefds[0]); - d_tcpclientthreads.push_back(pipefds[1]); - - pthread_t tid; - pthread_create(&tid, 0, tcpClientThread, (void*)fd); + + d_tcpclientthreads.push_back(pipefds[1]); + thread t1(tcpClientThread, pipefds[0]); + t1.detach(); ++d_numthreads; } } g_tcpclientthreads; -void* tcpClientThread(void* p) +void* tcpClientThread(int pipefd) { /* we get launched with a pipe on which we receive file descriptors from clients that we own from that point on */ - int pipefd = *(int*)p; - delete (int*)p; - int dsock = -1; DownstreamState *ds=0; @@ -360,7 +362,7 @@ void* tcpClientThread(void* p) if(dsock == -1) dsock = getTCPDownstream(&ds); else { - infolog("Reusing existing TCP connection to %s", ds->remote.toStringWithPort()); + vinfolog("Reusing existing TCP connection to %s", ds->remote.toStringWithPort()); } uint16_t qlen, rlen; @@ -376,7 +378,7 @@ void* tcpClientThread(void* p) // FIXME: drop AXFR queries here, they confuse us retry:; if(!putMsgLen(dsock, qlen)) { - infolog("Downstream connection to %s died on us, getting a new one!", ds->remote.toStringWithPort()); + vinfolog("Downstream connection to %s died on us, getting a new one!", ds->remote.toStringWithPort()); close(dsock); dsock=getTCPDownstream(&ds); goto retry; @@ -385,7 +387,7 @@ void* tcpClientThread(void* p) writen2(dsock, query, qlen); if(!getMsgLen(dsock, &rlen)) { - infolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->remote.toStringWithPort()); + vinfolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->remote.toStringWithPort()); close(dsock); dsock=getTCPDownstream(&ds); goto retry; @@ -399,7 +401,8 @@ void* tcpClientThread(void* p) } } catch(...){} - infolog("Closing client connection with %s", ci.remote.toStringWithPort()); + + vinfolog("Closing client connection with %s", ci.remote.toStringWithPort()); close(ci.fd); ci.fd=-1; --ds->outstanding; @@ -424,7 +427,8 @@ void* tcpAcceptorThread(void* p) try { ConnectionInfo* ci = new ConnectionInfo; ci->fd = SAccept(cs->tcpFD, remote); - infolog("Got connection from %s", remote.toStringWithPort()); + + vinfolog("Got connection from %s", remote.toStringWithPort()); ci->remote = remote; writen2(g_tcpclientthreads.getThread(), &ci, sizeof(ci)); @@ -436,14 +440,17 @@ void* tcpAcceptorThread(void* p) } -void* statThread(void*) +void* statThread() { int interval = 1; if(!interval) return 0; uint32_t lastQueries=0; - vector prev; - prev.resize(g_numdownstreams); + + uint64_t pqueries[g_numdownstreams]; + + for(unsigned int n=0; n < g_numdownstreams; ++n) + pqueries[n] = g_dstates[n].queries.load(); for(;;) { sleep(interval); @@ -455,15 +462,16 @@ void* statThread(void*) uint64_t numQueries=0; for(unsigned int n=0; n < g_numdownstreams; ++n) { DownstreamState& dss = g_dstates[n]; - infolog(" %s: %d outstanding, %f qps", dss.remote.toStringWithPort() % dss.outstanding % ((dss.queries - prev[n].queries)/interval)); + + vinfolog(" %s: %d outstanding, %f qps", dss.remote.toStringWithPort(), dss.outstanding.load(), ((dss.queries.load() - pqueries[n])/interval)); outstanding += dss.outstanding; - prev[n].queries = dss.queries; + pqueries[n]=dss.queries.load(); numQueries += dss.queries; for(unsigned int i=0 ; i < g_maxOutstanding; ++i) { IDState& ids = dss.idStates[i]; if(ids.origFD >=0 && ids.age++ > 2) { - ids.age = AtomicCounter(); + ids.age = 0; ids.origFD = -1; dss.reuseds++; --dss.outstanding; @@ -471,7 +479,7 @@ void* statThread(void*) } } - infolog("%d outstanding queries, %d qps", outstanding % ((numQueries - lastQueries)/interval)); + vinfolog("%d outstanding queries, %d qps", outstanding, ((numQueries - lastQueries)/interval)); lastQueries=numQueries; } return 0; @@ -524,8 +532,7 @@ try daemonize(); } else { - infolog("Running in the %s", "foreground"); - + vinfolog("Running in the foreground"); } vector remotes = g_vm["remotes"].as >(); @@ -533,30 +540,29 @@ try g_numdownstreams = remotes.size(); g_dstates = new DownstreamState[g_numdownstreams]; int pos=0; - BOOST_FOREACH(const string& remote, remotes) { + for(const string& remote : remotes) { DownstreamState& dss = g_dstates[pos++]; dss.remote = ComboAddress(remote, 53); - + dss.fd = SSocket(dss.remote.sin4.sin_family, SOCK_DGRAM, 0); SConnect(dss.fd, dss.remote); dss.idStates.resize(g_maxOutstanding); - infolog("Added downstream server %s", dss.remote.toStringWithPort()); - pthread_create(&dss.tid, 0, responderThread, (void*)&dss); + dss.tid = move(thread(responderThread, &dss)); } - pthread_t tid; vector locals; if(g_vm.count("local")) locals = g_vm["local"].as >(); else locals.push_back("::"); - BOOST_FOREACH(const string& local, locals) { + for(const string& local : locals) { + cerr<local= ComboAddress(local, 53); cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0); @@ -565,10 +571,11 @@ try } SBind(cs->udpFD, cs->local); - pthread_create(&tid, 0, udpClientThread, (void*) cs); + thread t1(udpClientThread, cs); + t1.detach(); } - BOOST_FOREACH(const string& local, locals) { + for(const string& local : locals) { ClientState* cs = new ClientState; cs->local= ComboAddress(local, 53); @@ -585,15 +592,13 @@ try SBind(cs->tcpFD, cs->local); SListen(cs->tcpFD, 64); warnlog("Listening on %s",cs->local.toStringWithPort()); - - pthread_create(&tid, 0, tcpAcceptorThread, (void*) cs); + + thread t1(tcpAcceptorThread, cs); + t1.detach(); } - pthread_t stattid; - pthread_create(&stattid, 0, statThread, 0); - void* status; - - pthread_join(tid, &status); + thread stattid(statThread); + stattid.join(); } catch(std::exception &e) { diff --git a/pdns/dolog.hh b/pdns/dolog.hh new file mode 100644 index 0000000000..a3b632c590 --- /dev/null +++ b/pdns/dolog.hh @@ -0,0 +1,78 @@ +#pragma once +#include +#include +#include + +/* This file is intended not to be metronome specific, and is simple example of C++2011 + variadic templates in action. + + The goal is rapid easy to use logging to console & syslog. + + Usage: + string address="localhost"; + infolog("Bound to %s port %d", address, port); + warnlog("Query took %d milliseconds", 1232.4); // yes, %d + errlog("Unable to bind to %s: %s", ca.toStringWithPort(), strerr(errno)); + + If bool g_console is true, will log to stdout. Will syslog in any case with LOG_INFO, + LOG_WARNING, LOG_ERR respectively. If g_verbose=false, infolog is a noop. + More generically, dolog(someiostream, "Hello %s", stream) will log to someiostream + + This will happily print a string to %d! Doesn't do further format processing. +*/ + +inline void dolog(std::ostream& os, const char*s) +{ + os< +void dolog(std::ostream& os, const char* s, T value, Args... args) +{ + while (*s) { + if (*s == '%') { + if (*(s + 1) == '%') { + ++s; + } + else { + os << value; + s += 2; + dolog(os, s, args...); + return; + } + } + os << *s++; + } +} + +extern bool g_console; +extern bool g_verbose; + +template +void genlog(int level, const char* s, Args... args) +{ + std::ostringstream str; + dolog(str, s, args...); + syslog(level, "%s", str.str().c_str()); + if(g_console) + std::cout< +void infolog(const char* s, Args... args) +{ + if(g_verbose) + genlog(LOG_INFO, s, args...); +} + +template +void warnlog(const char* s, Args... args) +{ + genlog(LOG_WARNING, s, args...); +} + +template +void errlog(const char* s, Args... args) +{ + genlog(LOG_ERR, s, args...); +}