]>
git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/delaypipe.cc
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
22 #include "delaypipe.hh"
28 ObjectPipe
<T
>::ObjectPipe()
35 ObjectPipe
<T
>::~ObjectPipe()
43 void ObjectPipe
<T
>::close()
47 ::close(d_fds
[1]); // the writing side
52 void ObjectPipe
<T
>::write(T
& t
)
55 if(::write(d_fds
[1], &ptr
, sizeof(ptr
)) != sizeof(ptr
)) {
62 bool ObjectPipe
<T
>::read(T
* t
)
65 int ret
= ::read(d_fds
[0], &ptr
, sizeof(ptr
));
71 if(ret
!= sizeof(ptr
))
72 throw std::runtime_error("Partial read, should not happen");
79 int ObjectPipe
<T
>::readTimeout(T
* t
, double msec
)
83 int ret
= waitForData(d_fds
[0], 0, 1000*msec
);
85 unixDie("waiting for data in object pipe");
89 ret
= ::read(d_fds
[0], &ptr
, sizeof(ptr
)); // this is BLOCKING!
95 if(ret
!= sizeof(ptr
))
96 throw std::runtime_error("Partial read, should not happen 2");
104 DelayPipe
<T
>::DelayPipe() : d_thread(&DelayPipe
<T
>::worker
, this)
109 void DelayPipe
<T
>::gettime(struct timespec
* ts
)
116 void DelayPipe
<T
>::submit(T
& t
, int msec
)
120 now
.tv_nsec
+= msec
*1e6
;
121 while(now
.tv_nsec
> 1e9
) {
130 DelayPipe
<T
>::~DelayPipe()
139 void DelayPipe
<T
>::worker()
143 /* this code is slightly too subtle, but I don't see how it could be any simpler.
144 So we have a set of work to do, and we need to wait until the time arrives to do it.
145 Simultaneously new work might come in. So we try to combine both of these things by
146 setting a timeout on listening to the pipe over which new work comes in. This timeout
147 is equal to the wait until the first thing that needs to be done.
149 Two additional cases exist: we have no work to wait for, so we can wait infinitely long.
150 The other special case is that the first we have to do.. is in the past, so we need to do it
154 double delay
=-1; // infinite
156 if(!d_work
.empty()) {
158 delay
=1000*tsdelta(d_work
.begin()->first
, now
);
160 delay
=0; // don't wait - we have work that is late already!
164 int ret
= d_pipe
.readTimeout(&c
, delay
);
165 if(ret
> 0) { // we got an object
166 d_work
.insert(make_pair(c
.when
, c
.what
));
168 else if(ret
==0) { // EOF
179 for(auto iter
= d_work
.begin() ; iter
!= d_work
.end(); ) { // do the needful
180 if(cmp(iter
->first
, now
)) {
182 d_work
.erase(iter
++);