#include "misc.hh"
#include "statbag.hh"
#include <netinet/tcp.h>
-#include <boost/array.hpp>
#include <boost/program_options.hpp>
#include <boost/foreach.hpp>
#include <limits>
+#include "mplexer.hh"
/* syntax: dnsdist 8.8.8.8 8.8.4.4 208.67.222.222 208.67.220.220
Added downstream server 8.8.8.8:53
return ret;
}
-/* the grand design. Per socket we listen on for incoming queries there is one thread.
+int Accept(int sockfd, ComboAddress& remote)
+{
+ socklen_t remlen = remote.getSocklen();
+
+ int ret = accept(sockfd, (struct sockaddr*)&remote, &remlen);
+ if(ret < 0)
+ RuntimeError(boost::format("accepting new connection on socket: %s") % strerror(errno));
+ return ret;
+}
+
+int Listen(int sockfd, int limit)
+{
+ int ret = listen(sockfd, limit);
+ if(ret < 0)
+ RuntimeError(boost::format("setting socket to listen: %s") % strerror(errno));
+ return ret;
+}
+
+int Setsockopt(int sockfd, int level, int opname, int value)
+{
+ int ret = setsockopt(sockfd, level, opname, &value, sizeof(value));
+ if(ret < 0)
+ RuntimeError(boost::format("setsockopt for level %d and opname %d to %d failed: %s") % level % opname % value % strerror(errno));
+ return ret;
+
+}
+
+/* UDP: the grand design. Per socket we listen on for incoming queries there is one thread.
Then we have a bunch of connected sockets for talking to downstream servers.
We send directly to those sockets.
struct IDState
{
+ IDState() : origFD(-1) {}
+
int origFD; // set to <0 to indicate this state is empty
uint16_t origID;
ComboAddress origRemote;
AtomicCounter age;
- bool used;
};
struct DownstreamState
int fd;
pthread_t tid;
ComboAddress remote;
+
vector<IDState> idStates;
AtomicCounter idOffset;
AtomicCounter sendErrors;
}
// listens to incoming queries, sends out to downstream servers, noting the intended return path
-void* clientThread(void* p)
+void* udpClientThread(void* p)
{
ClientState* cs = (ClientState*) p;
if(g_verbose)
ids->age = AtomicCounter();
ids->origID = dh->id;
ids->origRemote = remote;
- ids->used = true;
+
dh->id = idOffset;
len = send(ss.fd, packet, len, 0);
void* statThread(void*)
{
- int interval =g_vm["stats-interval"].as<int>();
+ int interval = 1;
if(!interval)
return 0;
uint32_t lastQueries=0;
return 0;
}
+/* TCP: the grand design.
+ We forward 'messages' between clients and downstream servers. Messages are 65k bytes large, tops.
+ An answer might theoretically consist of multiple messages (for example, in the case of AXFR), initially
+ we will not go there.
+
+ In a sense there is a strong symmetry between UDP and TCP, once a connection to a downstream has been setup.
+ This symmetry is broken because of head-of-line blocking within TCP though, necessitating additional connections
+ to guarantee performance.
+
+ So the idea is to have a 'pool' of available downstream connections, and forward messages to/from them and never queue.
+ So whenever an answer comes in, we know where it needs to go.
+
+ Let's start naively.
+*/
+
+int getTCPDownstream()
+{
+ DownstreamState& ds= getBestDownstream();
+ cout<<"TCP connecting to downstream "<<ds.remote.toStringWithPort()<<endl;
+ int sock = Socket(ds.remote.sin4.sin_family, SOCK_STREAM, 0);
+ Connect(sock, ds.remote);
+ return sock;
+}
+
+bool getMsgLen(int fd, uint16_t* len)
+{
+ uint16_t raw;
+ int ret = read(fd, &raw, 2);
+ if(ret != 2)
+ return false;
+ *len = ntohs(raw);
+ return true;
+}
+
+bool putMsgLen(int fd, uint16_t len)
+{
+ uint16_t raw = htons(len);
+ int ret = write(fd, &raw, 2);
+ return ret==2;
+}
+
+
+/* spawn as many of these as required, they call Accept on a socket on which they will accept queries, and
+ they will initiate downstream connections to service them. An attempt is made to reuse existing
+ connections.
+
+ As it stands, this code is both dangerous (doesn't check for partial reads/writes) and inefficient
+ (does split writes).
+
+ In addition, we leak filedescriptors.
+*/
+void* tcpClientThread(void* p)
+{
+ ClientState* cs = (ClientState*) p;
+
+ ComboAddress remote;
+ remote.sin4.sin_family = cs->local.sin4.sin_family;
+
+ int ds = -1;
+ int client = -1;
+ for(;;) {
+ try {
+ client = Accept(cs->fd, remote);
+ if(g_verbose)
+ cout << "Got connection from "<<remote.toStringWithPort()<<endl;
+
+ if(ds == -1)
+ ds = getTCPDownstream();
+ else if(g_verbose)
+ cout <<"Reusing existing TCP connection"<<endl;
+
+ uint16_t qlen, rlen;
+ for(;;) {
+ if(!getMsgLen(client, &qlen))
+ break;
+ g_numQueries++;
+ char query[qlen];
+ int ret = read(client, query, qlen);
+
+ retry:;
+ if(!putMsgLen(ds, qlen)) {
+ if(g_verbose)
+ cout<<"Downstream connection died on us, getting a new one!"<<endl;
+ close(ds);
+ ds=getTCPDownstream();
+ goto retry;
+ }
+
+
+ ret = write(ds, query, qlen);
+
+ if(!getMsgLen(ds, &rlen)) {
+ if(g_verbose)
+ cout<<"Downstream connection died on us phase 2, getting a new one!"<<endl;
+ close(ds);
+ ds=getTCPDownstream();
+ goto retry;
+ }
+
+ char answerbuffer[rlen];
+ ret = read(ds, answerbuffer, rlen);
+
+ putMsgLen(client, rlen);
+ ret = write(client, answerbuffer, rlen);
+ }
+ if(g_verbose)
+ cout<<"Closing connection"<<endl;
+ close(client);
+ client=-1;
+ }
+ catch(std::exception& e)
+ {
+ if(client >= 0)
+ close(client);
+
+ continue;
+ }
+ }
+
+ return 0;
+}
+
+
int main(int argc, char** argv)
try
{
+ signal(SIGPIPE, SIG_IGN);
po::options_description desc("Allowed options"), hidden, alloptions;
desc.add_options()
("help,h", "produce help message")
- ("stats-interval,s", po::value<int>()->default_value(5), "produce statistics output every n seconds")
("local", po::value<vector<string> >(), "Listen on which address")
("max-outstanding", po::value<uint16_t>()->default_value(65535), "maximum outstanding queries per downstream")
("verbose,v", "be verbose");
Connect(dss.fd, dss.remote);
dss.idStates.resize(g_maxOutstanding);
- BOOST_FOREACH(IDState& ids, dss.idStates) {
- ids.origFD = -1;
- ids.used = false;
- }
pthread_create(&dss.tid, 0, responderThread, (void*)&dss);
}
cs->local= ComboAddress(local, 53);
cs->fd = Socket(cs->local.sin4.sin_family, SOCK_DGRAM, 0);
if(cs->local.sin4.sin_family == AF_INET6) {
- int val = 1;
- setsockopt(cs->fd, IPPROTO_IPV6, IPV6_V6ONLY, &val, sizeof(val));
+ Setsockopt(cs->fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
}
+ Bind(cs->fd, cs->local);
+ pthread_create(&tid, 0, udpClientThread, (void*) cs);
+ }
+
+ BOOST_FOREACH(const string& local, locals) {
+ ClientState* cs = new ClientState;
+ cs->local= ComboAddress(local, 53);
+
+ cs->fd = Socket(cs->local.sin4.sin_family, SOCK_STREAM, 0);
+
+ Setsockopt(cs->fd, SOL_SOCKET, SO_REUSEADDR, 1);
+ Setsockopt(cs->fd, SOL_TCP,TCP_DEFER_ACCEPT, 1);
+ if(cs->local.sin4.sin_family == AF_INET6) {
+ Setsockopt(cs->fd, IPPROTO_IPV6, IPV6_V6ONLY, 1);
+ }
+
Bind(cs->fd, cs->local);
-
- pthread_create(&tid, 0, clientThread, (void*) cs);
+ Listen(cs->fd, 64);
+ pthread_create(&tid, 0, tcpClientThread, (void*) cs);
}
+
pthread_t stattid;
pthread_create(&stattid, 0, statThread, 0);
-
void* status;
pthread_join(tid, &status);