/*
PowerDNS Versatile Database Driven Nameserver
- Copyright (C) 2013 PowerDNS.COM BV
+ Copyright (C) 2013 - 2015 PowerDNS.COM BV
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License version 2
#include <netinet/tcp.h>
#include <boost/program_options.hpp>
#include <boost/foreach.hpp>
+#include <thread>
#include <limits>
+#include <atomic>
#include "arguments.hh"
+#include "dolog.hh"
/* syntax: dnsdist 8.8.8.8 8.8.4.4 208.67.222.222 208.67.220.220
Added downstream server 8.8.8.8:53
StatBag S;
namespace po = boost::program_options;
po::variables_map g_vm;
-
+using std::atomic;
+using std::thread;
bool g_verbose;
-AtomicCounter g_pos;
-AtomicCounter g_regexBlocks;
+atomic<uint64_t> g_pos;
+atomic<uint64_t> g_regexBlocks;
uint16_t g_maxOutstanding;
bool g_console;
-#define infolog(X,Y) if(g_verbose) { syslog(LOG_INFO, "%s", (boost::format((X)) % Y).str().c_str()); \
- if(g_console) cout << boost::format((X)) %Y << endl; } do{}while(0)
-#define warnlog(X,Y) { syslog(LOG_WARNING, "%s", (boost::format((X)) % Y).str().c_str()); \
- if(g_console) cout << boost::format((X)) %Y << endl; } do{}while(0)
-#define errlog(X,Y) {syslog(LOG_ERR, "%s", (boost::format((X)) % Y).str().c_str()); \
- if(g_console) cout << boost::format((X)) %Y << endl; }do{}while(0)
+#define vinfolog if(g_verbose)infolog
/* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
Then we have a bunch of connected sockets for talking to downstream servers.
struct IDState
{
IDState() : origFD(-1) {}
+ IDState(const IDState& orig)
+ {
+ origFD = orig.origFD;
+ origID = orig.origID;
+ origRemote = orig.origRemote;
+ age.store(orig.age.load());
+ }
int origFD; // set to <0 to indicate this state is empty
uint16_t origID;
ComboAddress origRemote;
- AtomicCounter age;
+ atomic<uint64_t> age;
};
-struct DownstreamState
+struct DownstreamState
{
int fd;
- pthread_t tid;
+ thread tid;
ComboAddress remote;
-
vector<IDState> idStates;
- AtomicCounter idOffset;
- AtomicCounter sendErrors;
- AtomicCounter outstanding;
- AtomicCounter reuseds;
- AtomicCounter queries;
+ atomic<uint64_t> idOffset{0};
+ atomic<uint64_t> sendErrors{0};
+ atomic<uint64_t> outstanding{0};
+ atomic<uint64_t> reuseds{0};
+ atomic<uint64_t> queries{0};
};
DownstreamState* g_dstates;
unsigned int g_numdownstreams;
// listens on a dedicated socket, lobs answers from downstream servers to original requestors
-void* responderThread(void *p)
+void* responderThread(DownstreamState* state)
{
- DownstreamState* state = (DownstreamState*)p;
- char packet[65536];
+ char packet[4096];
struct dnsheader* dh = (struct dnsheader*)packet;
int len;
if(ids->origFD < 0)
continue;
else
- --state->outstanding; // you'd think you could game this, but we're using connected socket
+ --state->outstanding; // you'd think an attacker could game this, but we're using connected socket
dh->id = ids->origID;
sendto(ids->origFD, packet, len, 0, (struct sockaddr*)&ids->origRemote, ids->origRemote.getSocklen());
- infolog("Got answer from %s, relayed to %s", state->remote.toStringWithPort() % ids->origRemote.toStringWithPort());
+
+ vinfolog("Got answer from %s, relayed to %s", state->remote.toStringWithPort(), ids->origRemote.toStringWithPort());
ids->origFD = -1;
}
// listens to incoming queries, sends out to downstream servers, noting the intended return path
-void* udpClientThread(void* p)
+void* udpClientThread(ClientState* cs)
try
{
- ClientState* cs = (ClientState*) p;
-
ComboAddress remote;
remote.sin4.sin_family = cs->local.sin4.sin_family;
socklen_t socklen = cs->local.getSocklen();
}
}
- /* right now, this is our simple round robin downstream selector */
DownstreamState& ss = getBestDownstream();
ss.queries++;
ss.reuseds++;
ids->origFD = cs->udpFD;
- ids->age = AtomicCounter();
+ ids->age = 0;
ids->origID = dh->id;
ids->origRemote = remote;
if(len < 0)
ss.sendErrors++;
- infolog("Got query from %s, relayed to %s", remote.toStringWithPort() % ss.remote.toStringWithPort());
+ vinfolog("Got query from %s, relayed to %s", remote.toStringWithPort(), ss.remote.toStringWithPort());
}
return 0;
}
int getTCPDownstream(DownstreamState** ds)
{
*ds = &getBestDownstream();
- infolog("TCP connecting to downstream %s", (*ds)->remote.toStringWithPort());
+
+ vinfolog("TCP connecting to downstream %s", (*ds)->remote.toStringWithPort());
int sock = SSocket((*ds)->remote.sin4.sin_family, SOCK_STREAM, 0);
SConnect(sock, (*ds)->remote);
return sock;
ComboAddress remote;
};
-void* tcpClientThread(void* p);
+void* tcpClientThread(int pipefd);
+
class TCPClientCollection {
vector<int> d_tcpclientthreads;
- AtomicCounter d_pos;
+ atomic<uint64_t> d_pos;
public:
- AtomicCounter d_queued, d_numthreads;
+ atomic<uint64_t> d_queued, d_numthreads;
TCPClientCollection()
{
// Should not be called simultaneously!
void addTCPClientThread()
{
- infolog("Adding TCP Client threa%c", 'd');
+
+ vinfolog("Adding TCP Client thread");
+
int pipefds[2];
if(pipe(pipefds) < 0)
unixDie("Creating pipe");
- int *fd = new int(pipefds[0]);
- d_tcpclientthreads.push_back(pipefds[1]);
-
- pthread_t tid;
- pthread_create(&tid, 0, tcpClientThread, (void*)fd);
+
+ d_tcpclientthreads.push_back(pipefds[1]);
+ thread t1(tcpClientThread, pipefds[0]);
+ t1.detach();
++d_numthreads;
}
} g_tcpclientthreads;
-void* tcpClientThread(void* p)
+void* tcpClientThread(int pipefd)
{
/* we get launched with a pipe on which we receive file descriptors from clients that we own
from that point on */
- int pipefd = *(int*)p;
- delete (int*)p;
-
int dsock = -1;
DownstreamState *ds=0;
if(dsock == -1)
dsock = getTCPDownstream(&ds);
else {
- infolog("Reusing existing TCP connection to %s", ds->remote.toStringWithPort());
+ vinfolog("Reusing existing TCP connection to %s", ds->remote.toStringWithPort());
}
uint16_t qlen, rlen;
// FIXME: drop AXFR queries here, they confuse us
retry:;
if(!putMsgLen(dsock, qlen)) {
- infolog("Downstream connection to %s died on us, getting a new one!", ds->remote.toStringWithPort());
+ vinfolog("Downstream connection to %s died on us, getting a new one!", ds->remote.toStringWithPort());
close(dsock);
dsock=getTCPDownstream(&ds);
goto retry;
writen2(dsock, query, qlen);
if(!getMsgLen(dsock, &rlen)) {
- infolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->remote.toStringWithPort());
+ vinfolog("Downstream connection to %s died on us phase 2, getting a new one!", ds->remote.toStringWithPort());
close(dsock);
dsock=getTCPDownstream(&ds);
goto retry;
}
}
catch(...){}
- infolog("Closing client connection with %s", ci.remote.toStringWithPort());
+
+ vinfolog("Closing client connection with %s", ci.remote.toStringWithPort());
close(ci.fd);
ci.fd=-1;
--ds->outstanding;
try {
ConnectionInfo* ci = new ConnectionInfo;
ci->fd = SAccept(cs->tcpFD, remote);
- infolog("Got connection from %s", remote.toStringWithPort());
+
+ vinfolog("Got connection from %s", remote.toStringWithPort());
ci->remote = remote;
writen2(g_tcpclientthreads.getThread(), &ci, sizeof(ci));
}
-void* statThread(void*)
+void* statThread()
{
int interval = 1;
if(!interval)
return 0;
uint32_t lastQueries=0;
- vector<DownstreamState> prev;
- prev.resize(g_numdownstreams);
+
+ uint64_t pqueries[g_numdownstreams];
+
+ for(unsigned int n=0; n < g_numdownstreams; ++n)
+ pqueries[n] = g_dstates[n].queries.load();
for(;;) {
sleep(interval);
uint64_t numQueries=0;
for(unsigned int n=0; n < g_numdownstreams; ++n) {
DownstreamState& dss = g_dstates[n];
- infolog(" %s: %d outstanding, %f qps", dss.remote.toStringWithPort() % dss.outstanding % ((dss.queries - prev[n].queries)/interval));
+
+ vinfolog(" %s: %d outstanding, %f qps", dss.remote.toStringWithPort(), dss.outstanding.load(), ((dss.queries.load() - pqueries[n])/interval));
outstanding += dss.outstanding;
- prev[n].queries = dss.queries;
+ pqueries[n]=dss.queries.load();
numQueries += dss.queries;
for(unsigned int i=0 ; i < g_maxOutstanding; ++i) {
IDState& ids = dss.idStates[i];
if(ids.origFD >=0 && ids.age++ > 2) {
- ids.age = AtomicCounter();
+ ids.age = 0;
ids.origFD = -1;
dss.reuseds++;
--dss.outstanding;
}
}
- infolog("%d outstanding queries, %d qps", outstanding % ((numQueries - lastQueries)/interval));
+ vinfolog("%d outstanding queries, %d qps", outstanding, ((numQueries - lastQueries)/interval));
lastQueries=numQueries;
}
return 0;
daemonize();
}
else {
- infolog("Running in the %s", "foreground");
-
+ vinfolog("Running in the foreground");
}
vector<string> remotes = g_vm["remotes"].as<vector<string> >();
g_numdownstreams = remotes.size();
g_dstates = new DownstreamState[g_numdownstreams];
int pos=0;
- BOOST_FOREACH(const string& remote, remotes) {
+ for(const string& remote : remotes) {
DownstreamState& dss = g_dstates[pos++];
dss.remote = ComboAddress(remote, 53);
-
+
dss.fd = SSocket(dss.remote.sin4.sin_family, SOCK_DGRAM, 0);
SConnect(dss.fd, dss.remote);
dss.idStates.resize(g_maxOutstanding);
-
infolog("Added downstream server %s", dss.remote.toStringWithPort());
- pthread_create(&dss.tid, 0, responderThread, (void*)&dss);
+ dss.tid = move(thread(responderThread, &dss));
}
- pthread_t tid;
vector<string> locals;
if(g_vm.count("local"))
locals = g_vm["local"].as<vector<string> >();
else
locals.push_back("::");
- BOOST_FOREACH(const string& local, locals) {
+ for(const string& local : locals) {
+ cerr<<local<<endl;
ClientState* cs = new ClientState;
cs->local= ComboAddress(local, 53);
cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
}
SBind(cs->udpFD, cs->local);
- pthread_create(&tid, 0, udpClientThread, (void*) cs);
+ thread t1(udpClientThread, cs);
+ t1.detach();
}
- BOOST_FOREACH(const string& local, locals) {
+ for(const string& local : locals) {
ClientState* cs = new ClientState;
cs->local= ComboAddress(local, 53);
SBind(cs->tcpFD, cs->local);
SListen(cs->tcpFD, 64);
warnlog("Listening on %s",cs->local.toStringWithPort());
-
- pthread_create(&tid, 0, tcpAcceptorThread, (void*) cs);
+
+ thread t1(tcpAcceptorThread, cs);
+ t1.detach();
}
- pthread_t stattid;
- pthread_create(&stattid, 0, statThread, 0);
- void* status;
-
- pthread_join(tid, &status);
+ thread stattid(statThread);
+ stattid.join();
}
catch(std::exception &e)
{
--- /dev/null
+#pragma once
+#include <iostream>
+#include <sstream>
+#include <syslog.h>
+
+/* This file is intended not to be metronome specific, and is simple example of C++2011
+ variadic templates in action.
+
+ The goal is rapid easy to use logging to console & syslog.
+
+ Usage:
+ string address="localhost";
+ infolog("Bound to %s port %d", address, port);
+ warnlog("Query took %d milliseconds", 1232.4); // yes, %d
+ errlog("Unable to bind to %s: %s", ca.toStringWithPort(), strerr(errno));
+
+ If bool g_console is true, will log to stdout. Will syslog in any case with LOG_INFO,
+ LOG_WARNING, LOG_ERR respectively. If g_verbose=false, infolog is a noop.
+ More generically, dolog(someiostream, "Hello %s", stream) will log to someiostream
+
+ This will happily print a string to %d! Doesn't do further format processing.
+*/
+
+inline void dolog(std::ostream& os, const char*s)
+{
+ os<<s;
+}
+
+template<typename T, typename... Args>
+void dolog(std::ostream& os, const char* s, T value, Args... args)
+{
+ while (*s) {
+ if (*s == '%') {
+ if (*(s + 1) == '%') {
+ ++s;
+ }
+ else {
+ os << value;
+ s += 2;
+ dolog(os, s, args...);
+ return;
+ }
+ }
+ os << *s++;
+ }
+}
+
+extern bool g_console;
+extern bool g_verbose;
+
+template<typename... Args>
+void genlog(int level, const char* s, Args... args)
+{
+ std::ostringstream str;
+ dolog(str, s, args...);
+ syslog(level, "%s", str.str().c_str());
+ if(g_console)
+ std::cout<<str.str()<<std::endl;
+}
+
+template<typename... Args>
+void infolog(const char* s, Args... args)
+{
+ if(g_verbose)
+ genlog(LOG_INFO, s, args...);
+}
+
+template<typename... Args>
+void warnlog(const char* s, Args... args)
+{
+ genlog(LOG_WARNING, s, args...);
+}
+
+template<typename... Args>
+void errlog(const char* s, Args... args)
+{
+ genlog(LOG_ERR, s, args...);
+}