#include <boost/program_options.hpp>
#include <boost/foreach.hpp>
#include <limits>
-#include "mplexer.hh"
/* syntax: dnsdist 8.8.8.8 8.8.4.4 208.67.222.222 208.67.220.220
Added downstream server 8.8.8.8:53
AtomicCounter g_pos, g_numQueries;
uint16_t g_maxOutstanding;
-void RuntimeError(const boost::format& fmt)
-{
- throw runtime_error(fmt.str());
-}
-
-int Socket(int family, int type, int flags)
-{
- int ret = socket(family, type, flags);
- if(ret < 0)
- RuntimeError(boost::format("creating socket of type %d: %s") % family % strerror(errno));
- return ret;
-}
-
-int Connect(int sockfd, const ComboAddress& remote)
-{
- int ret = connect(sockfd, (struct sockaddr*)&remote, remote.getSocklen());
- if(ret < 0)
- RuntimeError(boost::format("connecting socket to %s: %s") % remote.toStringWithPort() % strerror(errno));
- return ret;
-}
-
-int Bind(int sockfd, const ComboAddress& local)
-{
- int ret = bind(sockfd, (struct sockaddr*)&local, local.getSocklen());
- if(ret < 0)
- RuntimeError(boost::format("binding socket to %s: %s") % local.toStringWithPort() % strerror(errno));
- return ret;
-}
-
-int Accept(int sockfd, ComboAddress& remote)
-{
- socklen_t remlen = remote.getSocklen();
-
- int ret = accept(sockfd, (struct sockaddr*)&remote, &remlen);
- if(ret < 0)
- RuntimeError(boost::format("accepting new connection on socket: %s") % strerror(errno));
- return ret;
-}
-
-int Listen(int sockfd, int limit)
-{
- int ret = listen(sockfd, limit);
- if(ret < 0)
- RuntimeError(boost::format("setting socket to listen: %s") % strerror(errno));
- return ret;
-}
-
-int Setsockopt(int sockfd, int level, int opname, int value)
-{
- int ret = setsockopt(sockfd, level, opname, &value, sizeof(value));
- if(ret < 0)
- RuntimeError(boost::format("setsockopt for level %d and opname %d to %d failed: %s") % level % opname % value % strerror(errno));
- return ret;
-
-}
/* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
Then we have a bunch of connected sockets for talking to downstream servers.
struct ClientState
{
ComboAddress local;
- int fd;
+ int udpFD;
+ int tcpFD;
};
DownstreamState& getBestDownstream()
int len;
for(;;) {
- len = recvfrom(cs->fd, packet, sizeof(packet), 0, (struct sockaddr*) &remote, &socklen);
+ len = recvfrom(cs->udpFD, packet, sizeof(packet), 0, (struct sockaddr*) &remote, &socklen);
if(len < 0)
continue;
g_numQueries++;
else
ss.reuseds++;
- ids->origFD = cs->fd;
+ ids->origFD = cs->udpFD;
ids->age = AtomicCounter();
ids->origID = dh->id;
ids->origRemote = remote;
unsigned int outstanding=0;
for(unsigned int n=0; n < g_numremotes; ++n) {
DownstreamState& dss = g_dstates[n];
- cout<<dss.remote.toStringWithPort()<<": "<<dss.outstanding<<" outstanding, "<<(dss.queries - prev[n].queries)/interval <<" qps"<<endl;
+ cout<<'\t'<<dss.remote.toStringWithPort()<<": "<<dss.outstanding<<" outstanding, "<<(dss.queries - prev[n].queries)/interval <<" qps"<<endl;
outstanding += dss.outstanding;
prev[n].queries = dss.queries;
ids.origFD = -1;
dss.reuseds++;
--dss.outstanding;
- }
-
+ }
}
}
cout<<outstanding<<" outstanding queries, " <<(g_numQueries - lastQueries)/interval <<" qps"<<endl;
Let's start naively.
*/
-int getTCPDownstream()
+int getTCPDownstream(DownstreamState** ds)
{
- DownstreamState& ds= getBestDownstream();
- cout<<"TCP connecting to downstream "<<ds.remote.toStringWithPort()<<endl;
- int sock = Socket(ds.remote.sin4.sin_family, SOCK_STREAM, 0);
- Connect(sock, ds.remote);
+ *ds = &getBestDownstream();
+ cout<<"TCP connecting to downstream "<<(*ds)->remote.toStringWithPort()<<endl;
+ int sock = SSocket((*ds)->remote.sin4.sin_family, SOCK_STREAM, 0);
+ SConnect(sock, (*ds)->remote);
return sock;
}
ComboAddress remote;
remote.sin4.sin_family = cs->local.sin4.sin_family;
- int ds = -1;
+ int dsock = -1;
int client = -1;
+ DownstreamState *ds=0;
for(;;) {
try {
- client = Accept(cs->fd, remote);
+ client = SAccept(cs->tcpFD, remote);
if(g_verbose)
cout << "Got connection from "<<remote.toStringWithPort()<<endl;
- if(ds == -1)
- ds = getTCPDownstream();
+ if(dsock == -1)
+ dsock = getTCPDownstream(&ds);
else if(g_verbose)
cout <<"Reusing existing TCP connection"<<endl;
if(!getMsgLen(client, &qlen))
break;
g_numQueries++;
+ ds->queries++;
char query[qlen];
int ret = read(client, query, qlen);
retry:;
- if(!putMsgLen(ds, qlen)) {
+ if(!putMsgLen(dsock, qlen)) {
if(g_verbose)
cout<<"Downstream connection died on us, getting a new one!"<<endl;
- close(ds);
- ds=getTCPDownstream();
+ close(dsock);
+ dsock=getTCPDownstream(&ds);
goto retry;
}
+ ret = write(dsock, query, qlen);
- ret = write(ds, query, qlen);
-
- if(!getMsgLen(ds, &rlen)) {
+ if(!getMsgLen(dsock, &rlen)) {
if(g_verbose)
cout<<"Downstream connection died on us phase 2, getting a new one!"<<endl;
- close(ds);
- ds=getTCPDownstream();
+ close(dsock);
+ dsock=getTCPDownstream(&ds);
goto retry;
}
char answerbuffer[rlen];
- ret = read(ds, answerbuffer, rlen);
+ ret = read(dsock, answerbuffer, rlen);
putMsgLen(client, rlen);
ret = write(client, answerbuffer, rlen);
}
if(g_verbose)
- cout<<"Closing connection"<<endl;
+ cout<<"Closing client connection"<<endl;
close(client);
client=-1;
}
dss.remote = ComboAddress(remote, 53);
- dss.fd = Socket(dss.remote.sin4.sin_family, SOCK_DGRAM, 0);
- Connect(dss.fd, dss.remote);
+ dss.fd = SSocket(dss.remote.sin4.sin_family, SOCK_DGRAM, 0);
+ SConnect(dss.fd, dss.remote);
dss.idStates.resize(g_maxOutstanding);
BOOST_FOREACH(const string& local, locals) {
ClientState* cs = new ClientState;
cs->local= ComboAddress(local, 53);
- cs->fd = Socket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
+ cs->udpFD = SSocket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
if(cs->local.sin4.sin_family == AF_INET6) {
- Setsockopt(cs->fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
+ SSetsockopt(cs->udpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
}
- Bind(cs->fd, cs->local);
+ SBind(cs->udpFD, cs->local);
pthread_create(&tid, 0, udpClientThread, (void*) cs);
}
ClientState* cs = new ClientState;
cs->local= ComboAddress(local, 53);
- cs->fd = Socket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
+ cs->tcpFD = SSocket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
- Setsockopt(cs->fd, SOL_SOCKET, SO_REUSEADDR, 1);
- Setsockopt(cs->fd, SOL_TCP,TCP_DEFER_ACCEPT, 1);
+ SSetsockopt(cs->tcpFD, SOL_SOCKET, SO_REUSEADDR, 1);
+ SSetsockopt(cs->tcpFD, SOL_TCP,TCP_DEFER_ACCEPT, 1);
if(cs->local.sin4.sin_family == AF_INET6) {
- Setsockopt(cs->fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
+ SSetsockopt(cs->tcpFD, IPPROTO_IPV6, IPV6_V6ONLY, 1);
}
- Bind(cs->fd, cs->local);
- Listen(cs->fd, 64);
+ SBind(cs->tcpFD, cs->local);
+ SListen(cs->tcpFD, 64);
pthread_create(&tid, 0, tcpClientThread, (void*) cs);
}
-
pthread_t stattid;
pthread_create(&stattid, 0, statThread, 0);
void* status;
--- /dev/null
+#include "iputils.hh"
+/** these functions provide a very lightweight wrapper to the Berkeley sockets API. Errors -> exceptions! */
+
+static void RuntimeError(const boost::format& fmt)
+{
+ throw runtime_error(fmt.str());
+}
+
+
+int SSocket(int family, int type, int flags)
+{
+ int ret = socket(family, type, flags);
+ if(ret < 0)
+ RuntimeError(boost::format("creating socket of type %d: %s") % family % strerror(errno));
+ return ret;
+}
+
+int SConnect(int sockfd, const ComboAddress& remote)
+{
+ int ret = connect(sockfd, (struct sockaddr*)&remote, remote.getSocklen());
+ if(ret < 0)
+ RuntimeError(boost::format("connecting socket to %s: %s") % remote.toStringWithPort() % strerror(errno));
+ return ret;
+}
+
+int SBind(int sockfd, const ComboAddress& local)
+{
+ int ret = bind(sockfd, (struct sockaddr*)&local, local.getSocklen());
+ if(ret < 0)
+ RuntimeError(boost::format("binding socket to %s: %s") % local.toStringWithPort() % strerror(errno));
+ return ret;
+}
+
+int SAccept(int sockfd, ComboAddress& remote)
+{
+ socklen_t remlen = remote.getSocklen();
+
+ int ret = accept(sockfd, (struct sockaddr*)&remote, &remlen);
+ if(ret < 0)
+ RuntimeError(boost::format("accepting new connection on socket: %s") % strerror(errno));
+ return ret;
+}
+
+int SListen(int sockfd, int limit)
+{
+ int ret = listen(sockfd, limit);
+ if(ret < 0)
+ RuntimeError(boost::format("setting socket to listen: %s") % strerror(errno));
+ return ret;
+}
+
+int SSetsockopt(int sockfd, int level, int opname, int value)
+{
+ int ret = setsockopt(sockfd, level, opname, &value, sizeof(value));
+ if(ret < 0)
+ RuntimeError(boost::format("setsockopt for level %d and opname %d to %d failed: %s") % level % opname % value % strerror(errno));
+ return ret;
+}
+
+