po::variables_map g_vm;
bool g_verbose;
-AtomicCounter g_pos, g_timeouts;
+AtomicCounter g_pos;
+
+void RuntimeError(const boost::format& fmt)
+{
+ throw runtime_error(fmt.str());
+}
int Socket(int family, int type, int flags)
{
int ret = socket(family, type, flags);
if(ret < 0)
- throw runtime_error((boost::format("creating socket of type %d: %s") % family % strerror(errno)).str());
+ RuntimeError(boost::format("creating socket of type %d: %s") % family % strerror(errno));
return ret;
}
{
int ret = connect(sockfd, (struct sockaddr*)&remote, remote.getSocklen());
if(ret < 0)
- throw runtime_error((boost::format("connecting socket to %s: %s") % remote.toStringWithPort() % strerror(errno)).str());
+ RuntimeError(boost::format("connecting socket to %s: %s") % remote.toStringWithPort() % strerror(errno));
return ret;
}
{
int ret = bind(sockfd, (struct sockaddr*)&local, local.getSocklen());
if(ret < 0)
- throw runtime_error((boost::format("binding socket to %s: %s") % local.toStringWithPort() % strerror(errno)).str());
+ RuntimeError(boost::format("binding socket to %s: %s") % local.toStringWithPort() % strerror(errno));
return ret;
}
struct IDState
{
+ int origFD; // set to <0 to indicate this state is empty
uint16_t origID;
ComboAddress origRemote;
- int origFD;
+ bool used;
};
-struct SocketState
+struct DownstreamState
{
- int fd;
+ int fd;
pthread_t tid;
ComboAddress remote;
vector<IDState> idStates;
AtomicCounter idOffset;
+ AtomicCounter sendErrors;
};
-SocketState* g_socketstates;
+DownstreamState* g_dstates;
unsigned int g_numremotes;
+// listens on a dedicated socket, lobs answers from downstream servers to original requestors
void* responderThread(void *p)
{
- SocketState* state = (SocketState*)p;
+ DownstreamState* state = (DownstreamState*)p;
if(g_verbose)
cout << "Added downstream server "<<state->remote.toStringWithPort()<<endl;
char packet[65536];
int fd;
};
-// listens to incoming queries, sends out to downstream servers
+// listens to incoming queries, sends out to downstream servers, noting the intended return path
void* clientThread(void* p)
{
ClientState* cs = (ClientState*) p;
for(;;) {
len = recvfrom(cs->fd, packet, sizeof(packet), 0, (struct sockaddr*) &remote, &socklen);
- if(len < 0)
+ if(len < 0)
continue;
- SocketState& ss = g_socketstates[(g_pos++) % g_numremotes];
+ /* right now, this is our simple round robin downstream selector */
+ DownstreamState& ss = g_dstates[(g_pos++) % g_numremotes];
unsigned int idOffset = ss.idOffset++;
IDState* ids = &ss.idStates[idOffset];
ids->origFD = cs->fd;
ids->origID = dh->id;
ids->origRemote = remote;
+ ids->used = true;
dh->id = idOffset;
- send(ss.fd, packet, len, 0);
+ len = send(ss.fd, packet, len, 0);
+ if(len < 0)
+ ss.sendErrors++;
+
if(g_verbose)
cout<<"Got query from "<<remote.toStringWithPort()<<",relayed to "<<ss.remote.toStringWithPort()<<endl;
}
return 0;
}
+void* statThread(void*)
+{
+ int interval =g_vm["stats-interval"].as<int>();
+ if(!interval)
+ return 0;
+
+ for(;;) {
+ sleep(interval);
+ unsigned int outstanding=0;
+ for(unsigned int n=0; n < g_numremotes; ++n) {
+ const DownstreamState& dss = g_dstates[n];
+ for(unsigned int i=0 ; i < 65536; ++i) {
+ const IDState& ids = dss.idStates[i];
+ if(ids.used && ids.origFD >=0)
+ outstanding++;
+ }
+ }
+ cout<<outstanding<<" outstanding queries"<<endl;
+ }
+ return 0;
+}
+
int main(int argc, char** argv)
try
{
po::options_description desc("Allowed options"), hidden, alloptions;
desc.add_options()
("help,h", "produce help message")
+ ("stats-interval,s", po::value<int>()->default_value(5), "produce statistics output every n seconds")
("local", po::value<vector<string> >(), "Listen on which address")
("verbose,v", "be verbose");
vector<string> remotes = g_vm["remotes"].as<vector<string> >();
g_numremotes = remotes.size();
- g_socketstates = new SocketState[g_numremotes];
+ g_dstates = new DownstreamState[g_numremotes];
int pos=0;
BOOST_FOREACH(const string& remote, remotes) {
- SocketState& ss = g_socketstates[pos++];
+ DownstreamState& dss = g_dstates[pos++];
- ss.remote = ComboAddress(remote, 53);
+ dss.remote = ComboAddress(remote, 53);
- ss.fd = Socket(ss.remote.sin4.sin_family, SOCK_DGRAM, 0);
- Connect(ss.fd, ss.remote);
+ dss.fd = Socket(dss.remote.sin4.sin_family, SOCK_DGRAM, 0);
+ Connect(dss.fd, dss.remote);
- ss.idStates.resize(65536);
- BOOST_FOREACH(IDState& ids, ss.idStates) {
+ dss.idStates.resize(65536);
+ BOOST_FOREACH(IDState& ids, dss.idStates) {
ids.origFD = -1;
+ ids.used = false;
}
- pthread_create(&ss.tid, 0, responderThread, (void*)&ss);
+ pthread_create(&dss.tid, 0, responderThread, (void*)&dss);
}
pthread_t tid;
pthread_create(&tid, 0, clientThread, (void*) cs);
}
+ pthread_t stattid;
+ pthread_create(&stattid, 0, statThread, 0);
+
void* status;
pthread_join(tid, &status);