]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
mplexer: Keep TTD ordered so we can scan for timeouts efficiently
authorRemi Gacogne <remi.gacogne@powerdns.com>
Fri, 22 Mar 2019 14:03:14 +0000 (15:03 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 4 Apr 2019 09:54:04 +0000 (11:54 +0200)
pdns/devpollmplexer.cc
pdns/epollmplexer.cc
pdns/kqueuemplexer.cc
pdns/mplexer.hh
pdns/pollmplexer.cc
pdns/portsmplexer.cc

index 8b5d531f16909ab1dbd2812d118f9540fef78569..35df6dc8fc9940f5e498d409a4cab5659da1ce6d 100644 (file)
@@ -160,13 +160,13 @@ int DevPollFDMultiplexer::run(struct timeval* now, int timeout)
     d_iter=d_readCallbacks.find(dvp.dp_fds[n].fd);
     
     if(d_iter != d_readCallbacks.end()) {
-      d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+      d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter);
       continue; // so we don't refind ourselves as writable!
     }
     d_iter=d_writeCallbacks.find(dvp.dp_fds[n].fd);
     
     if(d_iter != d_writeCallbacks.end()) {
-      d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+      d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter);
     }
   }
   delete[] dvp.dp_fds;
index 983c8d708faf0f1decb3b4b0c20669bb6fa5e400..433687d21456f8c24e7227683bb39bcdf719a04d 100644 (file)
@@ -156,13 +156,13 @@ int EpollFDMultiplexer::run(struct timeval* now, int timeout)
     d_iter=d_readCallbacks.find(d_eevents[n].data.fd);
     
     if(d_iter != d_readCallbacks.end()) {
-      d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+      d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter);
       continue; // so we don't refind ourselves as writable!
     }
     d_iter=d_writeCallbacks.find(d_eevents[n].data.fd);
     
     if(d_iter != d_writeCallbacks.end()) {
-      d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+      d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter);
     }
   }
   d_inrun=false;
index 5338c1ec3913f0f61643aa9984b29c2d3c6be4d5..42e834257a6cb346ddaabe096b491250273ab375 100644 (file)
@@ -144,14 +144,14 @@ int KqueueFDMultiplexer::run(struct timeval* now, int timeout)
   for(int n=0; n < ret; ++n) {
     d_iter=d_readCallbacks.find(d_kevents[n].ident);
     if(d_iter != d_readCallbacks.end()) {
-      d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+      d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter);
       continue; // so we don't find ourselves as writable again
     }
 
     d_iter=d_writeCallbacks.find(d_kevents[n].ident);
 
     if(d_iter != d_writeCallbacks.end()) {
-      d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+      d_iter->d_callback(d_iter->f_fd, d_iter->d_parameter);
     }
   }
 
index b42e90092847f8dcb35bc80032ca939b68a17652..a008ec7cf7983b917f77650a4047e1f3eccb89e0 100644 (file)
 #include <boost/shared_array.hpp>
 #include <boost/tuple/tuple.hpp>
 #include <boost/tuple/tuple_comparison.hpp>
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+#include <boost/multi_index/hashed_index.hpp>
+#include <boost/multi_index/key_extractors.hpp>
 #include <vector>
 #include <map>
 #include <stdexcept>
 #include <string>
 #include <sys/time.h>
 
+using namespace ::boost::multi_index;
+
 class FDMultiplexerException : public std::runtime_error
 {
 public:
@@ -57,8 +63,9 @@ protected:
   struct Callback
   {
     callbackfunc_t d_callback;
-    funcparam_t d_parameter;
+    mutable funcparam_t d_parameter;
     struct timeval d_ttd;
+    int d_fd;
   };
 
 public:
@@ -109,8 +116,10 @@ public:
       throw FDMultiplexerException("attempt to timestamp fd not in the multiplexer");
     }
 
+    auto newEntry = *it;
     tv.tv_sec += timeout;
-    it->second.d_ttd = tv;
+    newEntry.d_ttd = tv;
+    d_readCallbacks.replace(it, newEntry);
   }
 
   virtual void setWriteTTD(int fd, struct timeval tv, int timeout)
@@ -120,29 +129,23 @@ public:
       throw FDMultiplexerException("attempt to timestamp fd not in the multiplexer");
     }
 
+    auto newEntry = *it;
     tv.tv_sec += timeout;
-    it->second.d_ttd = tv;
-  }
-
-  virtual funcparam_t& getReadParameter(int fd) 
-  {
-    const auto& it = d_readCallbacks.find(fd);
-    if(it == d_readCallbacks.end()) {
-      throw FDMultiplexerException("attempt to look up data in multiplexer for unlisted fd "+std::to_string(fd));
-    }
-
-    return it->second.d_parameter;
+    newEntry.d_ttd = tv;
+    d_writeCallbacks.replace(it, newEntry);
   }
 
   virtual std::vector<std::pair<int, funcparam_t> > getTimeouts(const struct timeval& tv, bool writes=false)
   {
-    const auto tied = boost::tie(tv.tv_sec, tv.tv_usec);
     std::vector<std::pair<int, funcparam_t> > ret;
+    const auto tied = boost::tie(tv.tv_sec, tv.tv_usec);
+    auto& idx = writes ? d_writeCallbacks.get<TTDOrderedTag>() : d_readCallbacks.get<TTDOrderedTag>();
 
-    for(const auto& entry : (writes ? d_writeCallbacks : d_readCallbacks)) {
-      if(entry.second.d_ttd.tv_sec && tied > boost::tie(entry.second.d_ttd.tv_sec, entry.second.d_ttd.tv_usec)) {
-        ret.push_back(std::make_pair(entry.first, entry.second.d_parameter));
+    for (auto it = idx.begin(); it != idx.end(); ++it) {
+      if (it->d_ttd.tv_sec == 0 || tied <= boost::tie(it->d_ttd.tv_sec, it->d_ttd.tv_usec)) {
+        break;
       }
+      ret.push_back(std::make_pair(it->d_fd, it->d_parameter));
     }
 
     return ret;
@@ -160,7 +163,42 @@ public:
   virtual std::string getName() const = 0;
 
 protected:
-  typedef std::map<int, Callback> callbackmap_t;
+  struct FDBasedTag {};
+  struct TTDOrderedTag {};
+  struct ttd_compare
+  {
+    /* we want a 0 TTD (no timeout) to come _after_ everything else */
+    bool operator() (const struct timeval& lhs, const struct timeval& rhs) const
+    {
+      /* special treatment if at least one of the TTD is 0,
+         normal comparison otherwise */
+      if (lhs.tv_sec == 0 && rhs.tv_sec == 0) {
+        return false;
+      }
+      if (lhs.tv_sec == 0 && rhs.tv_sec != 0) {
+        return false;
+      }
+      if (lhs.tv_sec != 0 && rhs.tv_sec == 0) {
+        return true;
+      }
+
+      return std::tie(lhs.tv_sec, lhs.tv_usec) < std::tie(rhs.tv_sec, rhs.tv_usec);
+    }
+  };
+
+  typedef multi_index_container<
+    Callback,
+    indexed_by <
+                hashed_unique<tag<FDBasedTag>,
+                              member<Callback,int,&Callback::d_fd>
+                              >,
+                ordered_non_unique<tag<TTDOrderedTag>,
+                                   member<Callback,struct timeval,&Callback::d_ttd>,
+                                   ttd_compare
+                                   >
+               >
+  > callbackmap_t;
+
   callbackmap_t d_readCallbacks, d_writeCallbacks;
 
   virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr)=0;
@@ -171,6 +209,7 @@ protected:
   void accountingAddFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr)
   {
     Callback cb;
+    cb.d_fd = fd;
     cb.d_callback=toDo;
     cb.d_parameter=parameter;
     memset(&cb.d_ttd, 0, sizeof(cb.d_ttd));
@@ -178,7 +217,7 @@ protected:
       cb.d_ttd = *ttd;
     }
 
-    auto pair = cbmap.insert({fd, cb});
+    auto pair = cbmap.insert(cb);
     if (!pair.second) {
       throw FDMultiplexerException("Tried to add fd "+std::to_string(fd)+ " to multiplexer twice");
     }
index be1d0c9f62b04a8dda8cd47a14ca34ae8a517347..8e3e8b1d3a054ba6e5857cec87f4686c74bab43d 100644 (file)
@@ -67,7 +67,7 @@ void PollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo,
 
 void PollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
 {
-  if(d_inrun && d_iter->first==fd)  // trying to remove us!
+  if(d_inrun && d_iter->d_fd==fd)  // trying to remove us!
     ++d_iter;
 
   if(!cbmap.erase(fd))
@@ -81,13 +81,13 @@ vector<struct pollfd> PollFDMultiplexer::preparePollFD() const
 
   struct pollfd pollfd;
   for(const auto& cb : d_readCallbacks) {
-    pollfd.fd = cb.first;
+    pollfd.fd = cb.d_fd;
     pollfd.events = POLLIN;
     pollfds.push_back(pollfd);
   }
 
   for(const auto& cb : d_writeCallbacks) {
-    pollfd.fd = cb.first;
+    pollfd.fd = cb.d_fd;
     pollfd.events = POLLOUT;
     pollfds.push_back(pollfd);
   }
@@ -132,7 +132,7 @@ int PollFDMultiplexer::run(struct timeval* now, int timeout)
       d_iter=d_readCallbacks.find(pollfd.fd);
     
       if(d_iter != d_readCallbacks.end()) {
-        d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+        d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter);
         continue; // so we don't refind ourselves as writable!
       }
     }
@@ -140,7 +140,7 @@ int PollFDMultiplexer::run(struct timeval* now, int timeout)
       d_iter=d_writeCallbacks.find(pollfd.fd);
     
       if(d_iter != d_writeCallbacks.end()) {
-        d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+        d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter);
       }
     }
   }
index 8aaf71bc59462bdb92671670aa0e61e79b4e0678..39939b2f4fbc437f491ed0a7561613a18f00e657 100644 (file)
@@ -113,7 +113,7 @@ int PortsFDMultiplexer::run(struct timeval* now, int timeout)
     d_iter=d_readCallbacks.find(d_pevents[n].portev_object);
     
     if(d_iter != d_readCallbacks.end()) {
-      d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+      d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter);
       if(d_readCallbacks.count(d_pevents[n].portev_object) && port_associate(d_portfd, PORT_SOURCE_FD, d_pevents[n].portev_object, 
                         POLLIN, 0) < 0)
         throw FDMultiplexerException("Unable to add fd back to ports (read): "+stringerror());
@@ -123,7 +123,7 @@ int PortsFDMultiplexer::run(struct timeval* now, int timeout)
     d_iter=d_writeCallbacks.find(d_pevents[n].portev_object);
     
     if(d_iter != d_writeCallbacks.end()) {
-      d_iter->second.d_callback(d_iter->first, d_iter->second.d_parameter);
+      d_iter->d_callback(d_iter->d_fd, d_iter->d_parameter);
       if(d_writeCallbacks.count(d_pevents[n].portev_object) && port_associate(d_portfd, PORT_SOURCE_FD, d_pevents[n].portev_object, 
                         POLLOUT, 0) < 0)
         throw FDMultiplexerException("Unable to add fd back to ports (write): "+stringerror());