2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 #ifndef PDNS_MPLEXER_HH
23 #define PDNS_MPLEXER_HH
24 #include <boost/function.hpp>
25 #include <boost/any.hpp>
26 #include <boost/shared_array.hpp>
27 #include <boost/tuple/tuple.hpp>
28 #include <boost/tuple/tuple_comparison.hpp>
29 #include <boost/multi_index_container.hpp>
30 #include <boost/multi_index/ordered_index.hpp>
31 #include <boost/multi_index/hashed_index.hpp>
32 #include <boost/multi_index/key_extractors.hpp>
39 using namespace ::boost::multi_index;
41 class FDMultiplexerException : public std::runtime_error
44 FDMultiplexerException(const std::string& str) : std::runtime_error(str)
49 /** Very simple FD multiplexer, based on callbacks and boost::any parameters
50 As a special service, this parameter is kept around and can be modified,
51 allowing for state to be stored inside the multiplexer.
53 It has some "interesting" semantics
59 typedef boost::any funcparam_t;
60 typedef boost::function< void(int, funcparam_t&) > callbackfunc_t;
65 callbackfunc_t d_callback;
66 mutable funcparam_t d_parameter;
72 FDMultiplexer() : d_inrun(false)
74 virtual ~FDMultiplexer()
77 static FDMultiplexer* getMultiplexerSilent();
79 /* tv will be updated to 'now' before run returns */
80 /* timeout is in ms */
81 virtual int run(struct timeval* tv, int timeout=500) = 0;
83 /* timeout is in ms, 0 will return immediatly, -1 will block until at least one FD is ready */
84 virtual void getAvailableFDs(std::vector<int>& fds, int timeout) = 0;
86 //! Add an fd to the read watch list - currently an fd can only be on one list at a time!
87 virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t(), const struct timeval* ttd=nullptr)
89 this->addFD(d_readCallbacks, fd, toDo, parameter, ttd);
92 //! Add an fd to the write watch list - currently an fd can only be on one list at a time!
93 virtual void addWriteFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t(), const struct timeval* ttd=nullptr)
95 this->addFD(d_writeCallbacks, fd, toDo, parameter, ttd);
98 //! Remove an fd from the read watch list. You can't call this function on an fd that is closed already!
99 /** WARNING: references to 'parameter' become invalid after this function! */
100 virtual void removeReadFD(int fd)
102 this->removeFD(d_readCallbacks, fd);
105 //! Remove an fd from the write watch list. You can't call this function on an fd that is closed already!
106 /** WARNING: references to 'parameter' become invalid after this function! */
107 virtual void removeWriteFD(int fd)
109 this->removeFD(d_writeCallbacks, fd);
112 virtual void setReadTTD(int fd, struct timeval tv, int timeout)
114 const auto& it = d_readCallbacks.find(fd);
115 if (it == d_readCallbacks.end()) {
116 throw FDMultiplexerException("attempt to timestamp fd not in the multiplexer");
120 tv.tv_sec += timeout;
122 d_readCallbacks.replace(it, newEntry);
125 virtual void setWriteTTD(int fd, struct timeval tv, int timeout)
127 const auto& it = d_writeCallbacks.find(fd);
128 if (it == d_writeCallbacks.end()) {
129 throw FDMultiplexerException("attempt to timestamp fd not in the multiplexer");
133 tv.tv_sec += timeout;
135 d_writeCallbacks.replace(it, newEntry);
138 virtual std::vector<std::pair<int, funcparam_t> > getTimeouts(const struct timeval& tv, bool writes=false)
140 std::vector<std::pair<int, funcparam_t> > ret;
141 const auto tied = boost::tie(tv.tv_sec, tv.tv_usec);
142 auto& idx = writes ? d_writeCallbacks.get<TTDOrderedTag>() : d_readCallbacks.get<TTDOrderedTag>();
144 for (auto it = idx.begin(); it != idx.end(); ++it) {
145 if (it->d_ttd.tv_sec == 0 || tied <= boost::tie(it->d_ttd.tv_sec, it->d_ttd.tv_usec)) {
148 ret.push_back(std::make_pair(it->d_fd, it->d_parameter));
154 typedef FDMultiplexer* getMultiplexer_t();
155 typedef std::multimap<int, getMultiplexer_t*> FDMultiplexermap_t;
157 static FDMultiplexermap_t& getMultiplexerMap()
159 static FDMultiplexermap_t theMap;
163 virtual std::string getName() const = 0;
165 size_t getWatchedFDCount(bool writeFDs) const
167 return writeFDs ? d_writeCallbacks.size() : d_readCallbacks.size();
171 struct FDBasedTag {};
172 struct TTDOrderedTag {};
175 /* we want a 0 TTD (no timeout) to come _after_ everything else */
176 bool operator() (const struct timeval& lhs, const struct timeval& rhs) const
178 /* special treatment if at least one of the TTD is 0,
179 normal comparison otherwise */
180 if (lhs.tv_sec == 0 && rhs.tv_sec == 0) {
183 if (lhs.tv_sec == 0 && rhs.tv_sec != 0) {
186 if (lhs.tv_sec != 0 && rhs.tv_sec == 0) {
190 return std::tie(lhs.tv_sec, lhs.tv_usec) < std::tie(rhs.tv_sec, rhs.tv_usec);
194 typedef multi_index_container<
197 hashed_unique<tag<FDBasedTag>,
198 member<Callback,int,&Callback::d_fd>
200 ordered_non_unique<tag<TTDOrderedTag>,
201 member<Callback,struct timeval,&Callback::d_ttd>,
207 callbackmap_t d_readCallbacks, d_writeCallbacks;
209 virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr)=0;
210 virtual void removeFD(callbackmap_t& cbmap, int fd)=0;
212 callbackmap_t::iterator d_iter;
214 void accountingAddFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr)
219 cb.d_parameter=parameter;
220 memset(&cb.d_ttd, 0, sizeof(cb.d_ttd));
225 auto pair = cbmap.insert(cb);
227 throw FDMultiplexerException("Tried to add fd "+std::to_string(fd)+ " to multiplexer twice");
231 void accountingRemoveFD(callbackmap_t& cbmap, int fd)
233 if(!cbmap.erase(fd)) {
234 throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer");