]> git.ipfire.org Git - thirdparty/pdns.git/blame - pdns/delaypipe.cc
Merge pull request #14021 from Habbie/auth-lua-join-whitespace
[thirdparty/pdns.git] / pdns / delaypipe.cc
CommitLineData
12471842
PL
1/*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
7b3865cd 22#include "delaypipe.hh"
23#include "misc.hh"
85c7ca75 24#include "gettime.hh"
7b3865cd 25#include <thread>
519f5484 26#include "threadname.hh"
7b3865cd 27
28template<class T>
29ObjectPipe<T>::ObjectPipe()
30{
c1d76521 31 auto [sender, receiver] = pdns::channel::createObjectQueue<T>(pdns::channel::SenderBlockingMode::SenderBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, 0, false);
5841c6fd
RG
32 d_sender = std::move(sender);
33 d_receiver = std::move(receiver);
7b3865cd 34}
35
36template<class T>
37void ObjectPipe<T>::close()
38{
5841c6fd 39 d_sender.close();
7b3865cd 40}
41
42template<class T>
43void ObjectPipe<T>::write(T& t)
44{
5841c6fd
RG
45 auto ptr = std::make_unique<T>(t);
46 if (!d_sender.send(std::move(ptr))) {
47 unixDie("writing to the DelayPipe");
a82f68f0 48 }
7b3865cd 49}
50
7b3865cd 51template<class T>
4081de56 52int ObjectPipe<T>::readTimeout(T* t, double msec)
7b3865cd 53{
f58cb6a3 54 while (true) {
5841c6fd 55 int ret = waitForData(d_receiver.getDescriptor(), 0, 1000*msec);
f58cb6a3
RG
56 if (ret < 0) {
57 if (errno == EINTR) {
58 continue;
59 }
60 unixDie("waiting for data in object pipe");
61 }
62 else if (ret == 0) {
63 return -1;
64 }
65
5841c6fd
RG
66 try {
67 auto tmp = d_receiver.receive();
68 if (!tmp) {
69 if (d_receiver.isClosed()) {
70 return 0;
71 }
f58cb6a3
RG
72 continue;
73 }
f58cb6a3 74
5841c6fd
RG
75 *t = **tmp;
76 return 1;
77 }
78 catch (const std::exception& e) {
79 throw std::runtime_error("reading from the delay pipe: " + std::string(e.what()));
f58cb6a3 80 }
f58cb6a3 81 }
7b3865cd 82}
83
84
85template<class T>
86DelayPipe<T>::DelayPipe() : d_thread(&DelayPipe<T>::worker, this)
87{
88}
89
8f4a3c16 90template<class T>
91void DelayPipe<T>::gettime(struct timespec* ts)
92{
85c7ca75 93 ::gettime(ts);
8f4a3c16 94}
95
96
7b3865cd 97template<class T>
98void DelayPipe<T>::submit(T& t, int msec)
99{
100 struct timespec now;
8f4a3c16 101 gettime(&now);
7b3865cd 102 now.tv_nsec += msec*1e6;
74519f6f 103 while(now.tv_nsec > 1e9) {
7b3865cd 104 now.tv_sec++;
105 now.tv_nsec-=1e9;
106 }
107 Combo c{t, now};
108 d_pipe.write(c);
109}
110
111template<class T>
112DelayPipe<T>::~DelayPipe()
113{
114 d_pipe.close();
115 d_thread.join();
116}
117
4081de56 118
119
7b3865cd 120template<class T>
121void DelayPipe<T>::worker()
122{
519f5484 123 setThreadName("dnsdist/delayPi");
7b3865cd 124 Combo c;
125 for(;;) {
74519f6f 126 /* this code is slightly too subtle, but I don't see how it could be any simpler.
127 So we have a set of work to do, and we need to wait until the time arrives to do it.
128 Simultaneously new work might come in. So we try to combine both of these things by
129 setting a timeout on listening to the pipe over which new work comes in. This timeout
130 is equal to the wait until the first thing that needs to be done.
131
132 Two additional cases exist: we have no work to wait for, so we can wait infinitely long.
133 The other special case is that the first we have to do.. is in the past, so we need to do it
134 immediately. */
135
5841c6fd 136
4081de56 137 double delay=-1; // infinite
138 struct timespec now;
139 if(!d_work.empty()) {
8f4a3c16 140 gettime(&now);
4081de56 141 delay=1000*tsdelta(d_work.begin()->first, now);
142 if(delay < 0) {
74519f6f 143 delay=0; // don't wait - we have work that is late already!
4081de56 144 }
7b3865cd 145 }
4081de56 146 if(delay != 0 ) {
5841c6fd 147 int ret = d_pipe.readTimeout(&c, delay);
4081de56 148 if(ret > 0) { // we got an object
e32a8d46 149 d_work.emplace(c.when, c.what);
4081de56 150 }
151 else if(ret==0) { // EOF
152 break;
153 }
154 else {
155 ;
156 }
8f4a3c16 157 gettime(&now);
7b3865cd 158 }
4081de56 159
7b3865cd 160 tscomp cmp;
4081de56 161
7b3865cd 162 for(auto iter = d_work.begin() ; iter != d_work.end(); ) { // do the needful
163 if(cmp(iter->first, now)) {
164 iter->second();
165 d_work.erase(iter++);
166 }
4081de56 167 else {
168 break;
169 }
7b3865cd 170 }
171 }
172}