]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Use the best multiplexer available instead of poll() 6317/head
authorRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 6 Mar 2018 15:20:48 +0000 (16:20 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Tue, 6 Mar 2018 16:14:56 +0000 (17:14 +0100)
pdns/devpollmplexer.cc
pdns/dnsdist.cc
pdns/dnsdist.hh
pdns/epollmplexer.cc
pdns/kqueuemplexer.cc
pdns/mplexer.hh
pdns/pollmplexer.cc

index 5e11404d1c6448946f338b28fee9593be6c6608e..f9234965a8f9c56a9dc218ecd847cc70a1c6b3ff 100644 (file)
@@ -46,11 +46,12 @@ public:
     close(d_devpollfd);
   }
 
-  virtual int run(struct timeval* tv, int timeout=500);
+  virtual int run(struct timeval* tv, int timeout=500) override;
+  virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
 
-  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter);
-  virtual void removeFD(callbackmap_t& cbmap, int fd);
-  string getName()
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override;
+  virtual void removeFD(callbackmap_t& cbmap, int fd) override;
+  string getName() const override
   {
     return "/dev/poll";
   }
@@ -112,6 +113,26 @@ void DevPollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
   }
 }
 
+void DevPollFDMultiplexer::getAvailableFDs(std::vector<int>& fds, int timeout)
+{
+  struct dvpoll dvp;
+  dvp.dp_nfds = d_readCallbacks.size() + d_writeCallbacks.size();
+  dvp.dp_fds = new pollfd[dvp.dp_nfds];
+  dvp.dp_timeout = timeout;
+  int ret=ioctl(d_devpollfd, DP_POLL, &dvp);
+
+  if(ret < 0 && errno!=EINTR) {
+    delete[] dvp.dp_fds;
+    throw FDMultiplexerException("/dev/poll returned error: "+stringerror());
+  }
+
+  for(int n=0; n < ret; ++n) {
+    fds.push_back(dvp.dp_fds[n].fd);
+  }
+
+  delete[] dvp.dp_fds;
+}
+
 int DevPollFDMultiplexer::run(struct timeval* now, int timeout)
 {
   if(d_inrun) {
@@ -121,9 +142,9 @@ int DevPollFDMultiplexer::run(struct timeval* now, int timeout)
   dvp.dp_nfds = d_readCallbacks.size() + d_writeCallbacks.size();
   dvp.dp_fds = new pollfd[dvp.dp_nfds];
   dvp.dp_timeout = timeout;
-  int ret=ioctl(d_devpollfd, DP_POLL, &dvp); 
+  int ret=ioctl(d_devpollfd, DP_POLL, &dvp);
   gettimeofday(now,0); // MANDATORY!
-  
+
   if(ret < 0 && errno!=EINTR) {
     delete[] dvp.dp_fds;
     throw FDMultiplexerException("/dev/poll returned error: "+stringerror());
index 449779a27af7c53a3160508bec52f6ef8d908aca..b91222cdff426777d09a3a6f9fbea062e07e0d69 100644 (file)
@@ -386,29 +386,22 @@ static bool sendUDPResponse(int origFD, char* response, uint16_t responseLen, in
 
 static int pickBackendSocketForSending(DownstreamState* state)
 {
-  return state->fds[state->fdOffset++ % state->fds.size()];
+  return state->sockets[state->socketsOffset++ % state->sockets.size()];
 }
 
-static int pickBackendSocketForReceiving(const std::shared_ptr<DownstreamState>& state)
+static void pickBackendSocketsReadyForReceiving(const std::shared_ptr<DownstreamState>& state, std::vector<int>& ready)
 {
-  if (state->fds.size() == 1) {
-    return state->fds[0];
-  }
+  ready.clear();
 
-  std::set<int> fds;
-  for (auto fd : state->fds) {
-    if (fd >= 0) {
-      fds.insert(fd);
-    }
+  if (state->sockets.size() == 1) {
+    ready.push_back(state->sockets[0]);
+    return ;
   }
 
-  int selected = -1;
-  int res = waitForMultiData(fds, -1, -1, &selected);
-  if (res != 1) {
-    throw std::runtime_error("Error selecting a socket for a backend " + state->remote.toStringWithPort() + ": " + strerror(errno));
+  {
+    std::lock_guard<std::mutex> lock(state->socketsLock);
+    state->mplexer->getAvailableFDs(ready, -1);
   }
-
-  return selected;
 }
 
 // listens on a dedicated socket, lobs answers from downstream servers to original requestors
@@ -427,115 +420,120 @@ try {
   vector<uint8_t> rewrittenResponse;
 
   uint16_t queryId = 0;
+  std::vector<int> sockets;
+  sockets.reserve(dss->sockets.size());
+
   for(;;) {
     dnsheader* dh = reinterpret_cast<struct dnsheader*>(packet);
     bool outstandingDecreased = false;
     try {
-      int fd = pickBackendSocketForReceiving(dss);
-      ssize_t got = recv(fd, packet, sizeof(packet), 0);
-      char * response = packet;
-      size_t responseSize = sizeof(packet);
+      pickBackendSocketsReadyForReceiving(dss, sockets);
+      for (const auto& fd : sockets) {
+        ssize_t got = recv(fd, packet, sizeof(packet), 0);
+        char * response = packet;
+        size_t responseSize = sizeof(packet);
 
-      if (got < (ssize_t) sizeof(dnsheader))
-        continue;
+        if (got < (ssize_t) sizeof(dnsheader))
+          continue;
 
-      uint16_t responseLen = (uint16_t) got;
-      queryId = dh->id;
+        uint16_t responseLen = (uint16_t) got;
+        queryId = dh->id;
 
-      if(queryId >= dss->idStates.size())
-        continue;
+        if(queryId >= dss->idStates.size())
+          continue;
 
-      IDState* ids = &dss->idStates[queryId];
-      int origFD = ids->origFD;
+        IDState* ids = &dss->idStates[queryId];
+        int origFD = ids->origFD;
 
-      if(origFD < 0) // duplicate
-        continue;
+        if(origFD < 0) // duplicate
+          continue;
 
-      /* setting age to 0 to prevent the maintainer thread from
-         cleaning this IDS while we process the response.
-         We have already a copy of the origFD, so it would
-         mostly mess up the outstanding counter.
-      */
-      ids->age = 0;
+        /* setting age to 0 to prevent the maintainer thread from
+           cleaning this IDS while we process the response.
+           We have already a copy of the origFD, so it would
+           mostly mess up the outstanding counter.
+        */
+        ids->age = 0;
 
-      if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote)) {
-        continue;
-      }
+        if (!responseContentMatches(response, responseLen, ids->qname, ids->qtype, ids->qclass, dss->remote)) {
+          continue;
+        }
 
-      --dss->outstanding;  // you'd think an attacker could game this, but we're using connected socket
-      outstandingDecreased = true;
+        --dss->outstanding;  // you'd think an attacker could game this, but we're using connected socket
+        outstandingDecreased = true;
 
-      if(dh->tc && g_truncateTC) {
-        truncateTC(response, &responseLen);
-      }
+        if(dh->tc && g_truncateTC) {
+          truncateTC(response, &responseLen);
+        }
 
-      dh->id = ids->origID;
+        dh->id = ids->origID;
 
-      uint16_t addRoom = 0;
-      DNSResponse dr(&ids->qname, ids->qtype, ids->qclass, &ids->origDest, &ids->origRemote, dh, sizeof(packet), responseLen, false, &ids->sentTime.d_start);
+        uint16_t addRoom = 0;
+        DNSResponse dr(&ids->qname, ids->qtype, ids->qclass, &ids->origDest, &ids->origRemote, dh, sizeof(packet), responseLen, false, &ids->sentTime.d_start);
 #ifdef HAVE_PROTOBUF
-      dr.uniqueId = ids->uniqueId;
+        dr.uniqueId = ids->uniqueId;
 #endif
-      dr.qTag = ids->qTag;
+        dr.qTag = ids->qTag;
 
-      if (!processResponse(localRespRulactions, dr, &ids->delayMsec)) {
-        continue;
-      }
+        if (!processResponse(localRespRulactions, dr, &ids->delayMsec)) {
+          continue;
+        }
 
 #ifdef HAVE_DNSCRYPT
-      if (ids->dnsCryptQuery) {
-        addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
-      }
+        if (ids->dnsCryptQuery) {
+          addRoom = DNSCRYPT_MAX_RESPONSE_PADDING_AND_MAC_SIZE;
+        }
 #endif
-      if (!fixUpResponse(&response, &responseLen, &responseSize, ids->qname, ids->origFlags, ids->ednsAdded, ids->ecsAdded, rewrittenResponse, addRoom)) {
-        continue;
-      }
+        if (!fixUpResponse(&response, &responseLen, &responseSize, ids->qname, ids->origFlags, ids->ednsAdded, ids->ecsAdded, rewrittenResponse, addRoom)) {
+          continue;
+        }
 
-      if (ids->packetCache && !ids->skipCache) {
-        ids->packetCache->insert(ids->cacheKey, ids->qname, ids->qtype, ids->qclass, response, responseLen, false, dh->rcode, ids->tempFailureTTL);
-      }
+        if (ids->packetCache && !ids->skipCache) {
+          ids->packetCache->insert(ids->cacheKey, ids->qname, ids->qtype, ids->qclass, response, responseLen, false, dh->rcode, ids->tempFailureTTL);
+        }
 
-      if (ids->cs && !ids->cs->muted) {
+        if (ids->cs && !ids->cs->muted) {
 #ifdef HAVE_DNSCRYPT
-        if (!encryptResponse(response, &responseLen, responseSize, false, ids->dnsCryptQuery, &dh, &dhCopy)) {
-          continue;
-        }
+          if (!encryptResponse(response, &responseLen, responseSize, false, ids->dnsCryptQuery, &dh, &dhCopy)) {
+            continue;
+          }
 #endif
 
-        ComboAddress empty;
-        empty.sin4.sin_family = 0;
-        /* if ids->destHarvested is false, origDest holds the listening address.
-           We don't want to use that as a source since it could be 0.0.0.0 for example. */
-        sendUDPResponse(origFD, response, responseLen, ids->delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
-      }
+          ComboAddress empty;
+          empty.sin4.sin_family = 0;
+          /* if ids->destHarvested is false, origDest holds the listening address.
+             We don't want to use that as a source since it could be 0.0.0.0 for example. */
+          sendUDPResponse(origFD, response, responseLen, ids->delayMsec, ids->destHarvested ? ids->origDest : empty, ids->origRemote);
+        }
 
-      g_stats.responses++;
+        g_stats.responses++;
 
-      double udiff = ids->sentTime.udiff();
-      vinfolog("Got answer from %s, relayed to %s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
+        double udiff = ids->sentTime.udiff();
+        vinfolog("Got answer from %s, relayed to %s, took %f usec", dss->remote.toStringWithPort(), ids->origRemote.toStringWithPort(), udiff);
 
-      {
-        struct timespec ts;
-        gettime(&ts);
-        std::lock_guard<std::mutex> lock(g_rings.respMutex);
-        g_rings.respRing.push_back({ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote});
-      }
+        {
+          struct timespec ts;
+          gettime(&ts);
+          std::lock_guard<std::mutex> lock(g_rings.respMutex);
+          g_rings.respRing.push_back({ts, ids->origRemote, ids->qname, ids->qtype, (unsigned int)udiff, (unsigned int)got, *dh, dss->remote});
+        }
 
-      if(dh->rcode == RCode::ServFail)
-        g_stats.servfailResponses++;
-      dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
+        if(dh->rcode == RCode::ServFail)
+          g_stats.servfailResponses++;
+        dss->latencyUsec = (127.0 * dss->latencyUsec / 128.0) + udiff/128.0;
 
-      doLatencyStats(udiff);
+        doLatencyStats(udiff);
 
-      if (ids->origFD == origFD) {
+        if (ids->origFD == origFD) {
 #ifdef HAVE_DNSCRYPT
-        ids->dnsCryptQuery = nullptr;
+          ids->dnsCryptQuery = nullptr;
 #endif
-        ids->origFD = -1;
-        outstandingDecreased = false;
-      }
+          ids->origFD = -1;
+          outstandingDecreased = false;
+        }
 
-      rewrittenResponse.clear();
+        rewrittenResponse.clear();
+      }
     }
     catch(const std::exception& e){
       vinfolog("Got an error in UDP responder thread while parsing a response from %s, id %d: %s", dss->remote.toStringWithPort(), queryId, e.what());
@@ -570,8 +568,12 @@ catch(...)
 void DownstreamState::reconnect()
 {
   connected = false;
-  for (auto& fd : fds) {
+  for (auto& fd : sockets) {
     if (fd != -1) {
+      {
+        std::lock_guard<std::mutex> lock(socketsLock);
+        mplexer->removeReadFD(fd);
+      }
       /* shutdown() is needed to wake up recv() in the responderThread */
       shutdown(fd, SHUT_RDWR);
       close(fd);
@@ -585,6 +587,10 @@ void DownstreamState::reconnect()
       }
       try {
         SConnect(fd, remote);
+        {
+          std::lock_guard<std::mutex> lock(socketsLock);
+          mplexer->addReadFD(fd, [](int, boost::any) {});
+        }
         connected = true;
       }
       catch(const std::runtime_error& error) {
@@ -597,7 +603,7 @@ void DownstreamState::reconnect()
 
   /* if at least one (re-)connection failed, close all sockets */
   if (!connected) {
-    for (auto& fd : fds) {
+    for (auto& fd : sockets) {
       if (fd != -1) {
         /* shutdown() is needed to wake up recv() in the responderThread */
         shutdown(fd, SHUT_RDWR);
@@ -610,8 +616,10 @@ void DownstreamState::reconnect()
 
 DownstreamState::DownstreamState(const ComboAddress& remote_, const ComboAddress& sourceAddr_, unsigned int sourceItf_, size_t numberOfSockets): remote(remote_), sourceAddr(sourceAddr_), sourceItf(sourceItf_)
 {
-  fds.resize(numberOfSockets);
-  for (auto& fd : fds) {
+  mplexer = std::unique_ptr<FDMultiplexer>(FDMultiplexer::getMultiplexerSilent());
+
+  sockets.resize(numberOfSockets);
+  for (auto& fd : sockets) {
     fd = -1;
   }
 
@@ -1842,9 +1850,13 @@ void* healthChecksThread()
           warnlog("Marking downstream %s as '%s'", dss->getNameWithAddr(), newState ? "up" : "down");
 
           if (newState && !dss->connected) {
-            for (auto& fd : dss->fds) {
+            for (auto& fd : dss->sockets) {
               try {
                 SConnect(fd, dss->remote);
+                {
+                  std::lock_guard<std::mutex> lock(dss->socketsLock);
+                  dss->mplexer->addReadFD(fd, [](int, boost::any) {});
+                }
                 dss->connected = true;
               }
               catch(const std::runtime_error& error) {
@@ -1989,7 +2001,7 @@ static void checkFileDescriptorsLimits(size_t udpBindsCount, size_t tcpBindsCoun
   /* UDP sockets to backends */
   size_t backendUDPSocketsCount = 0;
   for (const auto& backend : backends) {
-    backendUDPSocketsCount += backend->fds.size();
+    backendUDPSocketsCount += backend->sockets.size();
   }
   requiredFDsCount += backendUDPSocketsCount;
   /* TCP sockets to backends */
index 131b280a95acc8119ce53bc5ed4f018cc84e0ea4..49e028d8fc8c6c1b40493a6ba98983bf871e629b 100644 (file)
@@ -24,6 +24,7 @@
 #include "ext/luawrapper/include/LuaContext.hpp"
 #include <time.h>
 #include "misc.hh"
+#include "mplexer.hh"
 #include "iputils.hh"
 #include "dnsname.hh"
 #include <atomic>
@@ -527,7 +528,7 @@ struct DownstreamState
   DownstreamState(const ComboAddress& remote_): DownstreamState(remote_, ComboAddress(), 0, 1) {}
   ~DownstreamState()
   {
-    for (auto& fd : fds) {
+    for (auto& fd : sockets) {
       if (fd >= 0) {
         close(fd);
         fd = -1;
@@ -535,7 +536,9 @@ struct DownstreamState
     }
   }
 
-  std::vector<int> fds;
+  std::vector<int> sockets;
+  std::mutex socketsLock;
+  std::unique_ptr<FDMultiplexer> mplexer{nullptr};
   std::thread tid;
   ComboAddress remote;
   QPSLimiter qps;
@@ -555,7 +558,7 @@ struct DownstreamState
     std::atomic<uint64_t> queries{0};
   } prev;
   string name;
-  size_t fdOffset{0};
+  size_t socketsOffset{0};
   double queryLoad{0.0};
   double dropRate{0.0};
   double latencyUsec{0.0};
index 2deae83b768af571f155bf210d2107dc235920c9..97d7e82ff6a887a0799e29e6a5873fa4614bead7 100644 (file)
@@ -42,11 +42,12 @@ public:
     close(d_epollfd);
   }
 
-  virtual int run(struct timeval* tv, int timeout=500);
+  virtual int run(struct timeval* tv, int timeout=500) override;
+  virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
 
-  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter);
-  virtual void removeFD(callbackmap_t& cbmap, int fd);
-  string getName()
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override;
+  virtual void removeFD(callbackmap_t& cbmap, int fd) override;
+  string getName() const override
   {
     return "epoll";
   }
@@ -69,8 +70,8 @@ static struct EpollRegisterOurselves
   }
 } doItEpoll;
 
-
 int EpollFDMultiplexer::s_maxevents=1024;
+
 EpollFDMultiplexer::EpollFDMultiplexer() : d_eevents(new epoll_event[s_maxevents])
 {
   d_epollfd=epoll_create(s_maxevents); // not hard max
@@ -123,6 +124,18 @@ void EpollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
     throw FDMultiplexerException("Removing fd from epoll set: "+stringerror());
 }
 
+void EpollFDMultiplexer::getAvailableFDs(std::vector<int>& fds, int timeout)
+{
+  int ret=epoll_wait(d_epollfd, d_eevents.get(), s_maxevents, timeout);
+
+  if(ret < 0 && errno!=EINTR)
+    throw FDMultiplexerException("epoll returned error: "+stringerror());
+
+  for(int n=0; n < ret; ++n) {
+    fds.push_back(d_eevents[n].data.fd);
+  }
+}
+
 int EpollFDMultiplexer::run(struct timeval* now, int timeout)
 {
   if(d_inrun) {
@@ -131,7 +144,7 @@ int EpollFDMultiplexer::run(struct timeval* now, int timeout)
   
   int ret=epoll_wait(d_epollfd, d_eevents.get(), s_maxevents, timeout);
   gettimeofday(now,0); // MANDATORY
-  
+
   if(ret < 0 && errno!=EINTR)
     throw FDMultiplexerException("epoll returned error: "+stringerror());
 
index 2fb66c8c78bd1dcdd3112ce23f2f66b2a642876f..44d3f467354a84374b1a7ac4c20a33eb862f93e9 100644 (file)
@@ -44,11 +44,12 @@ public:
     close(d_kqueuefd);
   }
 
-  virtual int run(struct timeval* tv, int timeout=500);
+  virtual int run(struct timeval* tv, int timeout=500) override;
+  virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
 
-  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter);
-  virtual void removeFD(callbackmap_t& cbmap, int fd);
-  string getName()
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter) override;
+  virtual void removeFD(callbackmap_t& cbmap, int fd) override;
+  string getName() const override
   {
     return "kqueue";
   }
@@ -85,7 +86,7 @@ void KqueueFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toD
 
   struct kevent kqevent;
   EV_SET(&kqevent, fd, (&cbmap == &d_readCallbacks) ? EVFILT_READ : EVFILT_WRITE, EV_ADD, 0,0,0);
-  
+
   if(kevent(d_kqueuefd, &kqevent, 1, 0, 0, 0) < 0) {
     cbmap.erase(fd);
     throw FDMultiplexerException("Adding fd to kqueue set: "+stringerror());
@@ -103,6 +104,22 @@ void KqueueFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
     throw FDMultiplexerException("Removing fd from kqueue set: "+stringerror());
 }
 
+void KqueueFDMultiplexer::getAvailableFDs(std::vector<int>& fds, int timeout)
+{
+  struct timespec ts;
+  ts.tv_sec=timeout/1000;
+  ts.tv_nsec=(timeout % 1000) * 1000000;
+
+  int ret = kevent(d_kqueuefd, 0, 0, d_kevents.get(), s_maxevents, &ts);
+
+  if(ret < 0 && errno != EINTR)
+    throw FDMultiplexerException("kqueue returned error: "+stringerror());
+
+  for(int n=0; n < ret; ++n) {
+    fds.push_back(d_kevents[n].ident);
+  }
+}
+
 int KqueueFDMultiplexer::run(struct timeval* now, int timeout)
 {
   if(d_inrun) {
@@ -115,7 +132,7 @@ int KqueueFDMultiplexer::run(struct timeval* now, int timeout)
 
   int ret=kevent(d_kqueuefd, 0, 0, d_kevents.get(), s_maxevents, &ts);
   gettimeofday(now,0); // MANDATORY!
-  
+
   if(ret < 0 && errno!=EINTR)
     throw FDMultiplexerException("kqueue returned error: "+stringerror());
 
index ed0f2a02a7f59d3eb24f17c8e229e7df07a61cdc..d70143d46be8d9d7e1aabc51aa558cb7b6b691e7 100644 (file)
@@ -73,6 +73,9 @@ public:
   /* timeout is in ms */
   virtual int run(struct timeval* tv, int timeout=500) = 0;
 
+  /* timeout is in ms, 0 will return immediatly, -1 will block until at least one FD is ready */
+  virtual void getAvailableFDs(std::vector<int>& fds, int timeout) = 0;
+
   //! Add an fd to the read watch list - currently an fd can only be on one list at a time!
   virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t())
   {
@@ -132,7 +135,7 @@ public:
     return theMap;
   }
   
-  virtual std::string getName() = 0;
+  virtual std::string getName() const = 0;
 
 protected:
   typedef std::map<int, Callback> callbackmap_t;
index 37f78290aa2a7dff7ed0140279e02ff425e891cd..e155072a77ed233f6bd9991060e8d61a3a308ce1 100644 (file)
@@ -34,15 +34,18 @@ public:
   {
   }
 
-  virtual int run(struct timeval* tv, int timeout=500);
+  virtual int run(struct timeval* tv, int timeout=500) override;
+  virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
 
-  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter);
-  virtual void removeFD(callbackmap_t& cbmap, int fd);
-  string getName()
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override;
+  virtual void removeFD(callbackmap_t& cbmap, int fd) override;
+
+  string getName() const override
   {
     return "poll";
   }
 private:
+  vector<struct pollfd> preparePollFD() const;
 };
 
 static FDMultiplexer* make()
@@ -77,32 +80,50 @@ void PollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
     throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer");
 }
 
-bool pollfdcomp(const struct pollfd& a, const struct pollfd& b)
+vector<struct pollfd> PollFDMultiplexer::preparePollFD() const
 {
-  return a.fd < b.fd;
-}
-
-int PollFDMultiplexer::run(struct timeval* now, int timeout)
-{
-  if(d_inrun) {
-    throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n");
-  }
-  
   vector<struct pollfd> pollfds;
-  
+  pollfds.reserve(d_readCallbacks.size() + d_writeCallbacks.size());
+
   struct pollfd pollfd;
-  for(callbackmap_t::const_iterator i=d_readCallbacks.begin(); i != d_readCallbacks.end(); ++i) {
-    pollfd.fd = i->first;
+  for(const auto& cb : d_readCallbacks) {
+    pollfd.fd = cb.first;
     pollfd.events = POLLIN;
     pollfds.push_back(pollfd);
   }
 
-  for(callbackmap_t::const_iterator i=d_writeCallbacks.begin(); i != d_writeCallbacks.end(); ++i) {
-    pollfd.fd = i->first;
+  for(const auto& cb : d_writeCallbacks) {
+    pollfd.fd = cb.first;
     pollfd.events = POLLOUT;
     pollfds.push_back(pollfd);
   }
 
+  return pollfds;
+}
+
+void PollFDMultiplexer::getAvailableFDs(std::vector<int>& fds, int timeout)
+{
+  auto pollfds = preparePollFD();
+  int ret = poll(&pollfds[0], pollfds.size(), timeout);
+
+  if (ret < 0 && errno != EINTR)
+    throw FDMultiplexerException("poll returned error: " + stringerror());
+
+  for(const auto& pollfd : pollfds) {
+    if (pollfd.revents == POLLIN || pollfd.revents == POLLOUT) {
+      fds.push_back(pollfd.fd);
+    }
+  }
+}
+
+int PollFDMultiplexer::run(struct timeval* now, int timeout)
+{
+  if(d_inrun) {
+    throw FDMultiplexerException("FDMultiplexer::run() is not reentrant!\n");
+  }
+
+  auto pollfds = preparePollFD();
+
   int ret=poll(&pollfds[0], pollfds.size(), timeout);
   gettimeofday(now, 0); // MANDATORY!
   
@@ -112,17 +133,17 @@ int PollFDMultiplexer::run(struct timeval* now, int timeout)
   d_iter=d_readCallbacks.end();
   d_inrun=true;
 
-  for(unsigned int n = 0; n < pollfds.size(); ++n) {  
-    if(pollfds[n].revents == POLLIN) {
-      d_iter=d_readCallbacks.find(pollfds[n].fd);
+  for(const auto& pollfd : pollfds) {
+    if(pollfd.revents == POLLIN) {
+      d_iter=d_readCallbacks.find(pollfd.fd);
     
       if(d_iter != d_readCallbacks.end()) {
         d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
         continue; // so we don't refind ourselves as writable!
       }
     }
-    else if(pollfds[n].revents == POLLOUT) {
-      d_iter=d_writeCallbacks.find(pollfds[n].fd);
+    else if(pollfd.revents == POLLOUT) {
+      d_iter=d_writeCallbacks.find(pollfd.fd);
     
       if(d_iter != d_writeCallbacks.end()) {
         d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);