]>
Commit | Line | Data |
---|---|---|
12471842 PL |
1 | /* |
2 | * This file is part of PowerDNS or dnsdist. | |
3 | * Copyright -- PowerDNS.COM B.V. and its contributors | |
4 | * | |
5 | * This program is free software; you can redistribute it and/or modify | |
6 | * it under the terms of version 2 of the GNU General Public License as | |
7 | * published by the Free Software Foundation. | |
8 | * | |
9 | * In addition, for the avoidance of any doubt, permission is granted to | |
10 | * link this program with OpenSSL and to (re)distribute the binaries | |
11 | * produced as the result of such linking. | |
12 | * | |
13 | * This program is distributed in the hope that it will be useful, | |
14 | * but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 | * GNU General Public License for more details. | |
17 | * | |
18 | * You should have received a copy of the GNU General Public License | |
19 | * along with this program; if not, write to the Free Software | |
20 | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
21 | */ | |
7b3865cd | 22 | #include "delaypipe.hh" |
23 | #include "misc.hh" | |
85c7ca75 | 24 | #include "gettime.hh" |
7b3865cd | 25 | #include <thread> |
519f5484 | 26 | #include "threadname.hh" |
7b3865cd | 27 | |
28 | template<class T> | |
29 | ObjectPipe<T>::ObjectPipe() | |
30 | { | |
c1d76521 | 31 | auto [sender, receiver] = pdns::channel::createObjectQueue<T>(pdns::channel::SenderBlockingMode::SenderBlocking, pdns::channel::ReceiverBlockingMode::ReceiverNonBlocking, 0, false); |
5841c6fd RG |
32 | d_sender = std::move(sender); |
33 | d_receiver = std::move(receiver); | |
7b3865cd | 34 | } |
35 | ||
36 | template<class T> | |
37 | void ObjectPipe<T>::close() | |
38 | { | |
5841c6fd | 39 | d_sender.close(); |
7b3865cd | 40 | } |
41 | ||
42 | template<class T> | |
43 | void ObjectPipe<T>::write(T& t) | |
44 | { | |
5841c6fd RG |
45 | auto ptr = std::make_unique<T>(t); |
46 | if (!d_sender.send(std::move(ptr))) { | |
47 | unixDie("writing to the DelayPipe"); | |
a82f68f0 | 48 | } |
7b3865cd | 49 | } |
50 | ||
7b3865cd | 51 | template<class T> |
4081de56 | 52 | int ObjectPipe<T>::readTimeout(T* t, double msec) |
7b3865cd | 53 | { |
f58cb6a3 | 54 | while (true) { |
5841c6fd | 55 | int ret = waitForData(d_receiver.getDescriptor(), 0, 1000*msec); |
f58cb6a3 RG |
56 | if (ret < 0) { |
57 | if (errno == EINTR) { | |
58 | continue; | |
59 | } | |
60 | unixDie("waiting for data in object pipe"); | |
61 | } | |
62 | else if (ret == 0) { | |
63 | return -1; | |
64 | } | |
65 | ||
5841c6fd RG |
66 | try { |
67 | auto tmp = d_receiver.receive(); | |
68 | if (!tmp) { | |
69 | if (d_receiver.isClosed()) { | |
70 | return 0; | |
71 | } | |
f58cb6a3 RG |
72 | continue; |
73 | } | |
f58cb6a3 | 74 | |
5841c6fd RG |
75 | *t = **tmp; |
76 | return 1; | |
77 | } | |
78 | catch (const std::exception& e) { | |
79 | throw std::runtime_error("reading from the delay pipe: " + std::string(e.what())); | |
f58cb6a3 | 80 | } |
f58cb6a3 | 81 | } |
7b3865cd | 82 | } |
83 | ||
84 | ||
85 | template<class T> | |
86 | DelayPipe<T>::DelayPipe() : d_thread(&DelayPipe<T>::worker, this) | |
87 | { | |
88 | } | |
89 | ||
8f4a3c16 | 90 | template<class T> |
91 | void DelayPipe<T>::gettime(struct timespec* ts) | |
92 | { | |
85c7ca75 | 93 | ::gettime(ts); |
8f4a3c16 | 94 | } |
95 | ||
96 | ||
7b3865cd | 97 | template<class T> |
98 | void DelayPipe<T>::submit(T& t, int msec) | |
99 | { | |
100 | struct timespec now; | |
8f4a3c16 | 101 | gettime(&now); |
7b3865cd | 102 | now.tv_nsec += msec*1e6; |
74519f6f | 103 | while(now.tv_nsec > 1e9) { |
7b3865cd | 104 | now.tv_sec++; |
105 | now.tv_nsec-=1e9; | |
106 | } | |
107 | Combo c{t, now}; | |
108 | d_pipe.write(c); | |
109 | } | |
110 | ||
111 | template<class T> | |
112 | DelayPipe<T>::~DelayPipe() | |
113 | { | |
114 | d_pipe.close(); | |
115 | d_thread.join(); | |
116 | } | |
117 | ||
4081de56 | 118 | |
119 | ||
7b3865cd | 120 | template<class T> |
121 | void DelayPipe<T>::worker() | |
122 | { | |
519f5484 | 123 | setThreadName("dnsdist/delayPi"); |
7b3865cd | 124 | Combo c; |
125 | for(;;) { | |
74519f6f | 126 | /* this code is slightly too subtle, but I don't see how it could be any simpler. |
127 | So we have a set of work to do, and we need to wait until the time arrives to do it. | |
128 | Simultaneously new work might come in. So we try to combine both of these things by | |
129 | setting a timeout on listening to the pipe over which new work comes in. This timeout | |
130 | is equal to the wait until the first thing that needs to be done. | |
131 | ||
132 | Two additional cases exist: we have no work to wait for, so we can wait infinitely long. | |
133 | The other special case is that the first we have to do.. is in the past, so we need to do it | |
134 | immediately. */ | |
135 | ||
5841c6fd | 136 | |
4081de56 | 137 | double delay=-1; // infinite |
138 | struct timespec now; | |
139 | if(!d_work.empty()) { | |
8f4a3c16 | 140 | gettime(&now); |
4081de56 | 141 | delay=1000*tsdelta(d_work.begin()->first, now); |
142 | if(delay < 0) { | |
74519f6f | 143 | delay=0; // don't wait - we have work that is late already! |
4081de56 | 144 | } |
7b3865cd | 145 | } |
4081de56 | 146 | if(delay != 0 ) { |
5841c6fd | 147 | int ret = d_pipe.readTimeout(&c, delay); |
4081de56 | 148 | if(ret > 0) { // we got an object |
e32a8d46 | 149 | d_work.emplace(c.when, c.what); |
4081de56 | 150 | } |
151 | else if(ret==0) { // EOF | |
152 | break; | |
153 | } | |
154 | else { | |
155 | ; | |
156 | } | |
8f4a3c16 | 157 | gettime(&now); |
7b3865cd | 158 | } |
4081de56 | 159 | |
7b3865cd | 160 | tscomp cmp; |
4081de56 | 161 | |
7b3865cd | 162 | for(auto iter = d_work.begin() ; iter != d_work.end(); ) { // do the needful |
163 | if(cmp(iter->first, now)) { | |
164 | iter->second(); | |
165 | d_work.erase(iter++); | |
166 | } | |
4081de56 | 167 | else { |
168 | break; | |
169 | } | |
7b3865cd | 170 | } |
171 | } | |
172 | } |