]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
mplexer: Make it possible to set the read TTD right away
authorRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 4 Mar 2019 10:32:23 +0000 (11:32 +0100)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 4 Apr 2019 09:46:26 +0000 (11:46 +0200)
pdns/devpollmplexer.cc
pdns/epollmplexer.cc
pdns/kqueuemplexer.cc
pdns/mplexer.hh
pdns/pdns_recursor.cc
pdns/pollmplexer.cc
pdns/portsmplexer.cc

index f9234965a8f9c56a9dc218ecd847cc70a1c6b3ff..8b5d531f16909ab1dbd2812d118f9540fef78569 100644 (file)
@@ -49,7 +49,7 @@ public:
   virtual int run(struct timeval* tv, int timeout=500) override;
   virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
 
-  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override;
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr) override;
   virtual void removeFD(callbackmap_t& cbmap, int fd) override;
   string getName() const override
   {
@@ -82,9 +82,9 @@ DevPollFDMultiplexer::DevPollFDMultiplexer()
     
 }
 
-void DevPollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter)
+void DevPollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd)
 {
-  accountingAddFD(cbmap, fd, toDo, parameter);
+  accountingAddFD(cbmap, fd, toDo, parameter, ttd);
 
   struct pollfd devent;
   devent.fd=fd;
index 97d7e82ff6a887a0799e29e6a5873fa4614bead7..983c8d708faf0f1decb3b4b0c20669bb6fa5e400 100644 (file)
@@ -45,7 +45,7 @@ public:
   virtual int run(struct timeval* tv, int timeout=500) override;
   virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
 
-  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override;
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr) override;
   virtual void removeFD(callbackmap_t& cbmap, int fd) override;
   string getName() const override
   {
@@ -94,9 +94,9 @@ EpollFDMultiplexer::EpollFDMultiplexer() : d_eevents(new epoll_event[s_maxevents
     
 }
 
-void EpollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter)
+void EpollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd)
 {
-  accountingAddFD(cbmap, fd, toDo, parameter);
+  accountingAddFD(cbmap, fd, toDo, parameter, ttd);
 
   struct epoll_event eevent;
   
index 44d3f467354a84374b1a7ac4c20a33eb862f93e9..5338c1ec3913f0f61643aa9984b29c2d3c6be4d5 100644 (file)
@@ -47,7 +47,7 @@ public:
   virtual int run(struct timeval* tv, int timeout=500) override;
   virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
 
-  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter) override;
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter, const struct timeval* ttd=nullptr) override;
   virtual void removeFD(callbackmap_t& cbmap, int fd) override;
   string getName() const override
   {
@@ -80,9 +80,9 @@ KqueueFDMultiplexer::KqueueFDMultiplexer() : d_kevents(new struct kevent[s_maxev
     throw FDMultiplexerException("Setting up kqueue: "+stringerror());
 }
 
-void KqueueFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter)
+void KqueueFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter, const struct timeval* ttd)
 {
-  accountingAddFD(cbmap, fd, toDo, parameter);
+  accountingAddFD(cbmap, fd, toDo, parameter, ttd);
 
   struct kevent kqevent;
   EV_SET(&kqevent, fd, (&cbmap == &d_readCallbacks) ? EVFILT_READ : EVFILT_WRITE, EV_ADD, 0,0,0);
index d70143d46be8d9d7e1aabc51aa558cb7b6b691e7..c28a1f218b1c152af1ffedbe6dc7be90076941c7 100644 (file)
@@ -77,9 +77,9 @@ public:
   virtual void getAvailableFDs(std::vector<int>& fds, int timeout) = 0;
 
   //! Add an fd to the read watch list - currently an fd can only be on one list at a time!
-  virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t())
+  virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t(), const struct timeval* ttd=nullptr)
   {
-    this->addFD(d_readCallbacks, fd, toDo, parameter);
+    this->addFD(d_readCallbacks, fd, toDo, parameter, ttd);
   }
 
   //! Add an fd to the write watch list - currently an fd can only be on one list at a time!
@@ -104,25 +104,36 @@ public:
 
   virtual void setReadTTD(int fd, struct timeval tv, int timeout)
   {
-    if(!d_readCallbacks.count(fd))
+    const auto& it = d_readCallbacks.find(fd);
+    if (it == d_readCallbacks.end()) {
       throw FDMultiplexerException("attempt to timestamp fd not in the multiplexer");
+    }
+
     tv.tv_sec += timeout;
-    d_readCallbacks[fd].d_ttd=tv;
+    it->second.d_ttd = tv;
   }
 
   virtual funcparam_t& getReadParameter(int fd) 
   {
-    if(!d_readCallbacks.count(fd))
+    const auto& it = d_readCallbacks.find(fd);
+    if(it == d_readCallbacks.end()) {
       throw FDMultiplexerException("attempt to look up data in multiplexer for unlisted fd "+std::to_string(fd));
-    return d_readCallbacks[fd].d_parameter;
+    }
+
+    return it->second.d_parameter;
   }
 
   virtual std::vector<std::pair<int, funcparam_t> > getTimeouts(const struct timeval& tv)
   {
+    const auto tied = boost::tie(tv.tv_sec, tv.tv_usec);
     std::vector<std::pair<int, funcparam_t> > ret;
-    for(callbackmap_t::iterator i=d_readCallbacks.begin(); i!=d_readCallbacks.end(); ++i)
-      if(i->second.d_ttd.tv_sec && boost::tie(tv.tv_sec, tv.tv_usec) > boost::tie(i->second.d_ttd.tv_sec, i->second.d_ttd.tv_usec)) 
-        ret.push_back(std::make_pair(i->first, i->second.d_parameter));
+
+    for(const auto& entry : d_readCallbacks) {
+      if(entry.second.d_ttd.tv_sec && tied > boost::tie(entry.second.d_ttd.tv_sec, entry.second.d_ttd.tv_usec)) {
+        ret.push_back(std::make_pair(entry.first, entry.second.d_parameter));
+      }
+    }
+
     return ret;
   }
 
@@ -141,27 +152,32 @@ protected:
   typedef std::map<int, Callback> callbackmap_t;
   callbackmap_t d_readCallbacks, d_writeCallbacks;
 
-  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter)=0;
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr)=0;
   virtual void removeFD(callbackmap_t& cbmap, int fd)=0;
   bool d_inrun;
   callbackmap_t::iterator d_iter;
 
-  void accountingAddFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter)
+  void accountingAddFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr)
   {
     Callback cb;
     cb.d_callback=toDo;
     cb.d_parameter=parameter;
     memset(&cb.d_ttd, 0, sizeof(cb.d_ttd));
-  
-    if(cbmap.count(fd))
+    if (ttd) {
+      cb.d_ttd = *ttd;
+    }
+
+    auto pair = cbmap.insert({fd, cb});
+    if (!pair.second) {
       throw FDMultiplexerException("Tried to add fd "+std::to_string(fd)+ " to multiplexer twice");
-    cbmap[fd]=cb;
+    }
   }
 
   void accountingRemoveFD(callbackmap_t& cbmap, int fd) 
   {
-    if(!cbmap.erase(fd)) 
+    if(!cbmap.erase(fd)) {
       throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer");
+    }
   }
 };
 
index ae37f9da8d92a4d7daae3a05c00aad63e92faf07..60c58a8f313b9a79cb481d3f570112f9ac32e236 100644 (file)
@@ -1629,8 +1629,10 @@ static void startDoResolve(void *p)
         else {
           dc->d_tcpConnection->state=TCPConnection::BYTE0;
           Utility::gettimeofday(&g_now, 0); // needs to be updated
-          t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection);
-          t_fdm->setReadTTD(dc->d_socket, g_now, g_tcpTimeout);
+          struct timeval ttd = g_now;
+          ttd.tv_sec += g_tcpTimeout;
+
+          t_fdm->addReadFD(dc->d_socket, handleRunningTCPQuestion, dc->d_tcpConnection, &ttd);
         }
       }
     }
@@ -2050,11 +2052,11 @@ static void handleNewTCPQuestion(int fd, FDMultiplexer::funcparam_t& )
     std::shared_ptr<TCPConnection> tc = std::make_shared<TCPConnection>(newsock, addr);
     tc->state=TCPConnection::BYTE0;
 
-    t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc);
+    struct timeval ttd;
+    Utility::gettimeofday(&ttd, 0);
+    ttd.tv_sec += g_tcpTimeout;
 
-    struct timeval now;
-    Utility::gettimeofday(&now, 0);
-    t_fdm->setReadTTD(tc->getFD(), now, g_tcpTimeout);
+    t_fdm->addReadFD(tc->getFD(), handleRunningTCPQuestion, tc, &ttd);
   }
 }
 
index 312c4bfece6919adb7db0e84cf0f50e8014d2064..be1d0c9f62b04a8dda8cd47a14ca34ae8a517347 100644 (file)
@@ -37,7 +37,7 @@ public:
   virtual int run(struct timeval* tv, int timeout=500) override;
   virtual void getAvailableFDs(std::vector<int>& fds, int timeout) override;
 
-  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter) override;
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr) override;
   virtual void removeFD(callbackmap_t& cbmap, int fd) override;
 
   string getName() const override
@@ -60,15 +60,9 @@ static struct RegisterOurselves
   }
 } doIt;
 
-void PollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter)
+void PollFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter, const struct timeval* ttd)
 {
-  Callback cb;
-  cb.d_callback=toDo;
-  cb.d_parameter=parameter;
-  memset(&cb.d_ttd, 0, sizeof(cb.d_ttd));
-  if(cbmap.count(fd))
-    throw FDMultiplexerException("Tried to add fd "+std::to_string(fd)+ " to multiplexer twice");
-  cbmap[fd]=cb;
+  accountingAddFD(cbmap, fd, toDo, parameter, ttd);
 }
 
 void PollFDMultiplexer::removeFD(callbackmap_t& cbmap, int fd)
index c6e91e60fc25d71e7a55e11890c36e8a59706120..8aaf71bc59462bdb92671670aa0e61e79b4e0678 100644 (file)
@@ -25,7 +25,7 @@ public:
 
   virtual int run(struct timeval* tv, int timeout=500);
 
-  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter);
+  virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter, const struct timeval* ttd=nullptr);
   virtual void removeFD(callbackmap_t& cbmap, int fd);
   string getName()
   {
@@ -59,9 +59,9 @@ PortsFDMultiplexer::PortsFDMultiplexer() : d_pevents(new port_event_t[s_maxevent
     throw FDMultiplexerException("Setting up port: "+stringerror());
 }
 
-void PortsFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter)
+void PortsFDMultiplexer::addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const boost::any& parameter, const struct timeval* ttd)
 {
-  accountingAddFD(cbmap, fd, toDo, parameter);
+  accountingAddFD(cbmap, fd, toDo, parameter, ttd);
 
   if(port_associate(d_portfd, PORT_SOURCE_FD, fd, (&cbmap == &d_readCallbacks) ? POLLIN : POLLOUT, 0) < 0) {
     cbmap.erase(fd);