]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/mplexer.hh
Merge pull request #7559 from rgacogne/dnsdist-tcp-refactor-clean
[thirdparty/pdns.git] / pdns / mplexer.hh
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #ifndef PDNS_MPLEXER_HH
23 #define PDNS_MPLEXER_HH
24 #include <boost/function.hpp>
25 #include <boost/any.hpp>
26 #include <boost/shared_array.hpp>
27 #include <boost/tuple/tuple.hpp>
28 #include <boost/tuple/tuple_comparison.hpp>
29 #include <boost/multi_index_container.hpp>
30 #include <boost/multi_index/ordered_index.hpp>
31 #include <boost/multi_index/hashed_index.hpp>
32 #include <boost/multi_index/key_extractors.hpp>
33 #include <vector>
34 #include <map>
35 #include <stdexcept>
36 #include <string>
37 #include <sys/time.h>
38
39 using namespace ::boost::multi_index;
40
41 class FDMultiplexerException : public std::runtime_error
42 {
43 public:
44 FDMultiplexerException(const std::string& str) : std::runtime_error(str)
45 {}
46 };
47
48
49 /** Very simple FD multiplexer, based on callbacks and boost::any parameters
50 As a special service, this parameter is kept around and can be modified,
51 allowing for state to be stored inside the multiplexer.
52
53 It has some "interesting" semantics
54 */
55
56 class FDMultiplexer
57 {
58 public:
59 typedef boost::any funcparam_t;
60 typedef boost::function< void(int, funcparam_t&) > callbackfunc_t;
61 protected:
62
63 struct Callback
64 {
65 callbackfunc_t d_callback;
66 mutable funcparam_t d_parameter;
67 struct timeval d_ttd;
68 int d_fd;
69 };
70
71 public:
72 FDMultiplexer() : d_inrun(false)
73 {}
74 virtual ~FDMultiplexer()
75 {}
76
77 static FDMultiplexer* getMultiplexerSilent();
78
79 /* tv will be updated to 'now' before run returns */
80 /* timeout is in ms */
81 virtual int run(struct timeval* tv, int timeout=500) = 0;
82
83 /* timeout is in ms, 0 will return immediatly, -1 will block until at least one FD is ready */
84 virtual void getAvailableFDs(std::vector<int>& fds, int timeout) = 0;
85
86 //! Add an fd to the read watch list - currently an fd can only be on one list at a time!
87 virtual void addReadFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t(), const struct timeval* ttd=nullptr)
88 {
89 this->addFD(d_readCallbacks, fd, toDo, parameter, ttd);
90 }
91
92 //! Add an fd to the write watch list - currently an fd can only be on one list at a time!
93 virtual void addWriteFD(int fd, callbackfunc_t toDo, const funcparam_t& parameter=funcparam_t(), const struct timeval* ttd=nullptr)
94 {
95 this->addFD(d_writeCallbacks, fd, toDo, parameter, ttd);
96 }
97
98 //! Remove an fd from the read watch list. You can't call this function on an fd that is closed already!
99 /** WARNING: references to 'parameter' become invalid after this function! */
100 virtual void removeReadFD(int fd)
101 {
102 this->removeFD(d_readCallbacks, fd);
103 }
104
105 //! Remove an fd from the write watch list. You can't call this function on an fd that is closed already!
106 /** WARNING: references to 'parameter' become invalid after this function! */
107 virtual void removeWriteFD(int fd)
108 {
109 this->removeFD(d_writeCallbacks, fd);
110 }
111
112 virtual void setReadTTD(int fd, struct timeval tv, int timeout)
113 {
114 const auto& it = d_readCallbacks.find(fd);
115 if (it == d_readCallbacks.end()) {
116 throw FDMultiplexerException("attempt to timestamp fd not in the multiplexer");
117 }
118
119 auto newEntry = *it;
120 tv.tv_sec += timeout;
121 newEntry.d_ttd = tv;
122 d_readCallbacks.replace(it, newEntry);
123 }
124
125 virtual void setWriteTTD(int fd, struct timeval tv, int timeout)
126 {
127 const auto& it = d_writeCallbacks.find(fd);
128 if (it == d_writeCallbacks.end()) {
129 throw FDMultiplexerException("attempt to timestamp fd not in the multiplexer");
130 }
131
132 auto newEntry = *it;
133 tv.tv_sec += timeout;
134 newEntry.d_ttd = tv;
135 d_writeCallbacks.replace(it, newEntry);
136 }
137
138 virtual std::vector<std::pair<int, funcparam_t> > getTimeouts(const struct timeval& tv, bool writes=false)
139 {
140 std::vector<std::pair<int, funcparam_t> > ret;
141 const auto tied = boost::tie(tv.tv_sec, tv.tv_usec);
142 auto& idx = writes ? d_writeCallbacks.get<TTDOrderedTag>() : d_readCallbacks.get<TTDOrderedTag>();
143
144 for (auto it = idx.begin(); it != idx.end(); ++it) {
145 if (it->d_ttd.tv_sec == 0 || tied <= boost::tie(it->d_ttd.tv_sec, it->d_ttd.tv_usec)) {
146 break;
147 }
148 ret.push_back(std::make_pair(it->d_fd, it->d_parameter));
149 }
150
151 return ret;
152 }
153
154 typedef FDMultiplexer* getMultiplexer_t();
155 typedef std::multimap<int, getMultiplexer_t*> FDMultiplexermap_t;
156
157 static FDMultiplexermap_t& getMultiplexerMap()
158 {
159 static FDMultiplexermap_t theMap;
160 return theMap;
161 }
162
163 virtual std::string getName() const = 0;
164
165 size_t getWatchedFDCount(bool writeFDs) const
166 {
167 return writeFDs ? d_writeCallbacks.size() : d_readCallbacks.size();
168 }
169
170 protected:
171 struct FDBasedTag {};
172 struct TTDOrderedTag {};
173 struct ttd_compare
174 {
175 /* we want a 0 TTD (no timeout) to come _after_ everything else */
176 bool operator() (const struct timeval& lhs, const struct timeval& rhs) const
177 {
178 /* special treatment if at least one of the TTD is 0,
179 normal comparison otherwise */
180 if (lhs.tv_sec == 0 && rhs.tv_sec == 0) {
181 return false;
182 }
183 if (lhs.tv_sec == 0 && rhs.tv_sec != 0) {
184 return false;
185 }
186 if (lhs.tv_sec != 0 && rhs.tv_sec == 0) {
187 return true;
188 }
189
190 return std::tie(lhs.tv_sec, lhs.tv_usec) < std::tie(rhs.tv_sec, rhs.tv_usec);
191 }
192 };
193
194 typedef multi_index_container<
195 Callback,
196 indexed_by <
197 hashed_unique<tag<FDBasedTag>,
198 member<Callback,int,&Callback::d_fd>
199 >,
200 ordered_non_unique<tag<TTDOrderedTag>,
201 member<Callback,struct timeval,&Callback::d_ttd>,
202 ttd_compare
203 >
204 >
205 > callbackmap_t;
206
207 callbackmap_t d_readCallbacks, d_writeCallbacks;
208
209 virtual void addFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr)=0;
210 virtual void removeFD(callbackmap_t& cbmap, int fd)=0;
211 bool d_inrun;
212 callbackmap_t::iterator d_iter;
213
214 void accountingAddFD(callbackmap_t& cbmap, int fd, callbackfunc_t toDo, const funcparam_t& parameter, const struct timeval* ttd=nullptr)
215 {
216 Callback cb;
217 cb.d_fd = fd;
218 cb.d_callback=toDo;
219 cb.d_parameter=parameter;
220 memset(&cb.d_ttd, 0, sizeof(cb.d_ttd));
221 if (ttd) {
222 cb.d_ttd = *ttd;
223 }
224
225 auto pair = cbmap.insert(cb);
226 if (!pair.second) {
227 throw FDMultiplexerException("Tried to add fd "+std::to_string(fd)+ " to multiplexer twice");
228 }
229 }
230
231 void accountingRemoveFD(callbackmap_t& cbmap, int fd)
232 {
233 if(!cbmap.erase(fd)) {
234 throw FDMultiplexerException("Tried to remove unlisted fd "+std::to_string(fd)+ " from multiplexer");
235 }
236 }
237 };
238
239
240 #endif
241