]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/delaypipe.cc
Standardize license text in all PDNS files
[thirdparty/pdns.git] / pdns / delaypipe.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #include "delaypipe.hh"
23 #include "misc.hh"
24 #include "gettime.hh"
25 #include <thread>
26
27 template<class T>
28 ObjectPipe<T>::ObjectPipe()
29 {
30 if(pipe(d_fds))
31 unixDie("pipe");
32 }
33
34 template<class T>
35 ObjectPipe<T>::~ObjectPipe()
36 {
37 ::close(d_fds[0]);
38 if(d_fds[1] >= 0)
39 ::close(d_fds[1]);
40 }
41
42 template<class T>
43 void ObjectPipe<T>::close()
44 {
45 if(d_fds[1] < 0)
46 return;
47 ::close(d_fds[1]); // the writing side
48 d_fds[1]=-1;
49 }
50
51 template<class T>
52 void ObjectPipe<T>::write(T& t)
53 {
54 auto ptr = new T(t);
55 if(::write(d_fds[1], &ptr, sizeof(ptr)) != sizeof(ptr)) {
56 delete ptr;
57 unixDie("write");
58 }
59 }
60
61 template<class T>
62 bool ObjectPipe<T>::read(T* t)
63 {
64 T* ptr;
65 int ret = ::read(d_fds[0], &ptr, sizeof(ptr));
66
67 if(ret < 0)
68 unixDie("read");
69 if(ret==0)
70 return false;
71 if(ret != sizeof(ptr))
72 throw std::runtime_error("Partial read, should not happen");
73 *t=*ptr;
74 delete ptr;
75 return true;
76 }
77
78 template<class T>
79 int ObjectPipe<T>::readTimeout(T* t, double msec)
80 {
81 T* ptr;
82
83 int ret = waitForData(d_fds[0], 0, 1000*msec);
84 if(ret < 0)
85 unixDie("waiting for data in object pipe");
86 if(ret == 0)
87 return -1;
88
89 ret = ::read(d_fds[0], &ptr, sizeof(ptr)); // this is BLOCKING!
90
91 if(ret < 0)
92 unixDie("read");
93 if(ret==0)
94 return false;
95 if(ret != sizeof(ptr))
96 throw std::runtime_error("Partial read, should not happen 2");
97 *t=*ptr;
98 delete ptr;
99 return 1;
100 }
101
102
103 template<class T>
104 DelayPipe<T>::DelayPipe() : d_thread(&DelayPipe<T>::worker, this)
105 {
106 }
107
108 template<class T>
109 void DelayPipe<T>::gettime(struct timespec* ts)
110 {
111 ::gettime(ts);
112 }
113
114
115 template<class T>
116 void DelayPipe<T>::submit(T& t, int msec)
117 {
118 struct timespec now;
119 gettime(&now);
120 now.tv_nsec += msec*1e6;
121 while(now.tv_nsec > 1e9) {
122 now.tv_sec++;
123 now.tv_nsec-=1e9;
124 }
125 Combo c{t, now};
126 d_pipe.write(c);
127 }
128
129 template<class T>
130 DelayPipe<T>::~DelayPipe()
131 {
132 d_pipe.close();
133 d_thread.join();
134 }
135
136
137
138 template<class T>
139 void DelayPipe<T>::worker()
140 {
141 Combo c;
142 for(;;) {
143 /* this code is slightly too subtle, but I don't see how it could be any simpler.
144 So we have a set of work to do, and we need to wait until the time arrives to do it.
145 Simultaneously new work might come in. So we try to combine both of these things by
146 setting a timeout on listening to the pipe over which new work comes in. This timeout
147 is equal to the wait until the first thing that needs to be done.
148
149 Two additional cases exist: we have no work to wait for, so we can wait infinitely long.
150 The other special case is that the first we have to do.. is in the past, so we need to do it
151 immediately. */
152
153
154 double delay=-1; // infinite
155 struct timespec now;
156 if(!d_work.empty()) {
157 gettime(&now);
158 delay=1000*tsdelta(d_work.begin()->first, now);
159 if(delay < 0) {
160 delay=0; // don't wait - we have work that is late already!
161 }
162 }
163 if(delay != 0 ) {
164 int ret = d_pipe.readTimeout(&c, delay);
165 if(ret > 0) { // we got an object
166 d_work.insert(make_pair(c.when, c.what));
167 }
168 else if(ret==0) { // EOF
169 break;
170 }
171 else {
172 ;
173 }
174 gettime(&now);
175 }
176
177 tscomp cmp;
178
179 for(auto iter = d_work.begin() ; iter != d_work.end(); ) { // do the needful
180 if(cmp(iter->first, now)) {
181 iter->second();
182 d_work.erase(iter++);
183 }
184 else {
185 break;
186 }
187 }
188 }
189 }