]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - pdns/mplexer.hh
Merge pull request #7909 from qvr/expungebyname-stats
[thirdparty/pdns.git] / pdns / mplexer.hh
index ed0f2a02a7f59d3eb24f17c8e229e7df07a61cdc..927651c332c5324e43b470715e29438d63922052 100644 (file)
 #include <boost/shared_array.hpp>
 #include <boost/tuple/tuple.hpp>
 #include <boost/tuple/tuple_comparison.hpp>
+#include <boost/multi_index_container.hpp>
+#include <boost/multi_index/ordered_index.hpp>
+#include <boost/multi_index/hashed_index.hpp>
+#include <boost/multi_index/key_extractors.hpp>
 #include <vector>
 #include <map>
 #include <stdexcept>
 #include <string>
 #include <sys/time.h>
 
+using namespace ::boost::multi_index;
+
 class FDMultiplexerException : public std::runtime_error
 {
 public:
@@ -51,14 +57,15 @@ class FDMultiplexer
 {
 public:
   typedef boost::any funcparam_t;
+  typedef boost::function< void(int, funcparam_t&) > callbackfunc_t;
 protected:
 
-  typedef boost::function< void(int, funcparam_t&) > callbackfunc_t;
   struct Callback
   {
     callbackfunc_t d_callback;
-    funcparam_t d_parameter;
+    mutable funcparam_t d_parameter;
     struct timeval d_ttd;
+    int d_fd;
   };
 
 public:
@@ -73,16 +80,19 @@ public:
   /* timeout is in ms */
   virtual int run(struct timeval* tv, int timeout=500) = 0;
 
+  /* timeout is in ms, 0 will return immediatly, -1 will block until at least one FD is ready */
+  virtual void getAvailableFDs(std::vector<int>& fds, int timeout) = 0;
+
   //! Add an fd to the read watch list - currently an fd can only be on one list at a time!
-  virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t())
+  virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t(), const struct timeval* ttd=nullptr)
   {
-    this->addFD(d_readCallbacks, fd, toDo, parameter);
+    this->addFD(d_readCallbacks, fd, toDo, parameter, ttd);
   }
 
   //! Add an fd to the write watch list - currently an fd can only be on one list at a time!
-  virtual void addWriteFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t())
+  virtual void addWriteFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t(), const struct timeval* ttd=nullptr)
   {
-    this->addFD(d_writeCallbacks, fd, toDo, parameter);
+    this->addFD(d_writeCallbacks, fd, toDo, parameter, ttd);
   }
 
   //! Remove an fd from the read watch list. You can't call this function on an fd that is closed already!
@@ -101,25 +111,43 @@ public:
 
   virtual void setReadTTD(int fd, struct timeval tv, int timeout)
   {
-    if(!d_readCallbacks.count(fd))
+    const auto& it = d_readCallbacks.find(fd);
+    if (it == d_readCallbacks.end()) {
       throw FDMultiplexerException("attempt to timestamp fd not in the multiplexer");
+    }
+
+    auto newEntry = *it;
     tv.tv_sec += timeout;
-    d_readCallbacks[fd].d_ttd=tv;
+    newEntry.d_ttd = tv;
+    d_readCallbacks.replace(it, newEntry);
   }
 
-  virtual funcparam_t& getReadParameter(int fd) 
+  virtual void setWriteTTD(int fd, struct timeval tv, int timeout)
   {
-    if(!d_readCallbacks.count(fd))
-      throw FDMultiplexerException("attempt to look up data in multiplexer for unlisted fd "+std::to_string(fd));
-    return d_readCallbacks[fd].d_parameter;
+    const auto& it = d_writeCallbacks.find(fd);
+    if (it == d_writeCallbacks.end()) {
+      throw FDMultiplexerException("attempt to timestamp fd not in the multiplexer");
+    }
+
+    auto newEntry = *it;
+    tv.tv_sec += timeout;
+    newEntry.d_ttd = tv;
+    d_writeCallbacks.replace(it, newEntry);
   }
 
-  virtual std::vector<std::pair<int, funcparam_t> > getTimeouts(const struct timeval& tv)
+  virtual std::vector<std::pair<int, funcparam_t> > getTimeouts(const struct timeval& tv, bool writes=false)
   {
     std::vector<std::pair<int, funcparam_t> > ret;
-    for(callbackmap_t::iterator i=d_readCallbacks.begin(); i!=d_readCallbacks.end(); ++i)
-      if(i->second.d_ttd.tv_sec && boost::tie(tv.tv_sec, tv.tv_usec) > boost::tie(i->second.d_ttd.tv_sec, i->second.d_ttd.tv_usec)) 
-        ret.push_back(std::make_pair(i->first, i->second.d_parameter));
+    const auto tied = boost::tie(tv.tv_sec, tv.tv_usec);
+    auto& idx = writes ? d_writeCallbacks.get<TTDOrderedTag>() : d_readCallbacks.get<TTDOrderedTag>();
+
+    for (auto it = idx.begin(); it != idx.end(); ++it) {
+      if (it->d_ttd.tv_sec == 0 || tied <= boost::tie(it->d_ttd.tv_sec, it->d_ttd.tv_usec)) {
+        break;
+      }
+      ret.push_back(std::make_pair(it->d_fd, it->d_parameter));
+    }
+
     return ret;
   }
 
@@ -132,33 +160,79 @@ public:
     return theMap;
   }
   
-  virtual std::string getName() = 0;
+  virtual std::string getName() const = 0;
+
+  size_t getWatchedFDCount(bool writeFDs) const
+  {
+    return writeFDs ? d_writeCallbacks.size() : d_readCallbacks.size();
+  }
 
 protected:
-  typedef std::map<int, Callback> callbackmap_t;
+  struct FDBasedTag {};
+  struct TTDOrderedTag {};
+  struct ttd_compare
+  {
+    /* we want a 0 TTD (no timeout) to come _after_ everything else */
+    bool operator() (const struct timeval& lhs, const struct timeval& rhs) const
+    {
+      /* special treatment if at least one of the TTD is 0,
+         normal comparison otherwise */
+      if (lhs.tv_sec == 0 && rhs.tv_sec == 0) {
+        return false;
+      }
+      if (lhs.tv_sec == 0 && rhs.tv_sec != 0) {
+        return false;
+      }
+      if (lhs.tv_sec != 0 && rhs.tv_sec == 0) {
+        return true;
+      }
+
+      return std::tie(lhs.tv_sec, lhs.tv_usec) < std::tie(rhs.tv_sec, rhs.tv_usec);
+    }
+  };
+
+  typedef multi_index_container<
+    Callback,
+    indexed_by <
+                hashed_unique<tag<FDBasedTag>,
+                              member<Callback,int,&Callback::d_fd>
+                              >,
+                ordered_non_unique<tag<TTDOrderedTag>,
+                                   member<Callback,struct timeval,&Callback::d_ttd>,
+                                   ttd_compare
+                                   >
+               >
+  > callbackmap_t;
+
   callbackmap_t d_readCallbacks, d_writeCallbacks;
 
-  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter)=0;
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr)=0;
   virtual void removeFD(callbackmap_t& cbmap, int fd)=0;
   bool d_inrun;
   callbackmap_t::iterator d_iter;
 
-  void accountingAddFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter)
+  void accountingAddFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr)
   {
     Callback cb;
+    cb.d_fd = fd;
     cb.d_callback=toDo;
     cb.d_parameter=parameter;
     memset(&cb.d_ttd, 0, sizeof(cb.d_ttd));
-  
-    if(cbmap.count(fd))
+    if (ttd) {
+      cb.d_ttd = *ttd;
+    }
+
+    auto pair = cbmap.insert(cb);
+    if (!pair.second) {
       throw FDMultiplexerException("Tried to add fd "+std::to_string(fd)+ " to multiplexer twice");
-    cbmap[fd]=cb;
+    }
   }
 
   void accountingRemoveFD(callbackmap_t& cbmap, int fd) 
   {
-    if(!cbmap.erase(fd)) 
+    if(!cbmap.erase(fd)) {
       throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer");
+    }
   }
 };