]>
Commit | Line | Data |
---|---|---|
cfe4b655 | 1 | |
1c2d079d | 2 | #ifndef BOOST_TEST_DYN_LINK |
cfe4b655 | 3 | #define BOOST_TEST_DYN_LINK |
1c2d079d FM |
4 | #endif |
5 | ||
cfe4b655 RG |
6 | #define BOOST_TEST_NO_MAIN |
7 | ||
8 | #include <thread> | |
9 | #include <boost/test/unit_test.hpp> | |
10 | ||
11 | #include "dnsdist-rings.hh" | |
12 | #include "gettime.hh" | |
13 | ||
14 | BOOST_AUTO_TEST_SUITE(dnsdistrings_cc) | |
15 | ||
16 | static void test_ring(size_t maxEntries, size_t numberOfShards, size_t nbLockTries) | |
17 | { | |
18 | Rings rings(maxEntries, numberOfShards, nbLockTries); | |
2ed3021e | 19 | rings.init(); |
cfe4b655 RG |
20 | size_t entriesPerShard = maxEntries / numberOfShards; |
21 | ||
22 | BOOST_CHECK_EQUAL(rings.getNumberOfShards(), numberOfShards); | |
690b86b7 OM |
23 | BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), 0U); |
24 | BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0U); | |
cfe4b655 RG |
25 | BOOST_CHECK_EQUAL(rings.d_shards.size(), rings.getNumberOfShards()); |
26 | for (const auto& shard : rings.d_shards) { | |
27 | BOOST_CHECK(shard != nullptr); | |
28 | } | |
29 | ||
30 | dnsheader dh; | |
e3542673 | 31 | memset(&dh, 0, sizeof(dh)); |
cfe4b655 RG |
32 | DNSName qname("rings.powerdns.com."); |
33 | ComboAddress requestor1("192.0.2.1"); | |
34 | ComboAddress requestor2("192.0.2.2"); | |
35 | uint16_t qtype = QType::AAAA; | |
36 | uint16_t size = 42; | |
426ccc67 RG |
37 | dnsdist::Protocol protocol = dnsdist::Protocol::DoUDP; |
38 | dnsdist::Protocol outgoingProtocol = dnsdist::Protocol::DoUDP; | |
cfe4b655 RG |
39 | struct timespec now; |
40 | gettime(&now); | |
41 | ||
42 | /* fill the query ring */ | |
43 | for (size_t idx = 0; idx < maxEntries; idx++) { | |
8110a0aa | 44 | rings.insertQuery(now, requestor1, qname, qtype, size, dh, protocol); |
cfe4b655 RG |
45 | } |
46 | BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries); | |
690b86b7 | 47 | BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0U); |
cfe4b655 | 48 | for (const auto& shard : rings.d_shards) { |
ecab77e5 RG |
49 | auto ring = shard->queryRing.lock(); |
50 | BOOST_CHECK_EQUAL(ring->size(), entriesPerShard); | |
51 | for (const auto& entry : *ring) { | |
cfe4b655 RG |
52 | BOOST_CHECK_EQUAL(entry.name, qname); |
53 | BOOST_CHECK_EQUAL(entry.qtype, qtype); | |
54 | BOOST_CHECK_EQUAL(entry.size, size); | |
55 | BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); | |
56 | BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor1.toStringWithPort()); | |
57 | } | |
58 | } | |
59 | ||
60 | /* push enough queries to get rid of the existing ones */ | |
61 | for (size_t idx = 0; idx < maxEntries; idx++) { | |
8110a0aa | 62 | rings.insertQuery(now, requestor2, qname, qtype, size, dh, protocol); |
cfe4b655 RG |
63 | } |
64 | BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries); | |
690b86b7 | 65 | BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), 0U); |
cfe4b655 | 66 | for (const auto& shard : rings.d_shards) { |
ecab77e5 RG |
67 | auto ring = shard->queryRing.lock(); |
68 | BOOST_CHECK_EQUAL(ring->size(), entriesPerShard); | |
69 | for (const auto& entry : *ring) { | |
cfe4b655 RG |
70 | BOOST_CHECK_EQUAL(entry.name, qname); |
71 | BOOST_CHECK_EQUAL(entry.qtype, qtype); | |
72 | BOOST_CHECK_EQUAL(entry.size, size); | |
73 | BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); | |
74 | BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor2.toStringWithPort()); | |
75 | } | |
76 | } | |
77 | ||
78 | ComboAddress server("192.0.2.42"); | |
79 | unsigned int latency = 100; | |
80 | ||
81 | /* fill the response ring */ | |
82 | for (size_t idx = 0; idx < maxEntries; idx++) { | |
8110a0aa | 83 | rings.insertResponse(now, requestor1, qname, qtype, latency, size, dh, server, outgoingProtocol); |
cfe4b655 RG |
84 | } |
85 | BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries); | |
86 | BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries); | |
87 | for (const auto& shard : rings.d_shards) { | |
ecab77e5 RG |
88 | auto ring = shard->respRing.lock(); |
89 | BOOST_CHECK_EQUAL(ring->size(), entriesPerShard); | |
90 | for (const auto& entry : *ring) { | |
cfe4b655 RG |
91 | BOOST_CHECK_EQUAL(entry.name, qname); |
92 | BOOST_CHECK_EQUAL(entry.qtype, qtype); | |
93 | BOOST_CHECK_EQUAL(entry.size, size); | |
94 | BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); | |
95 | BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor1.toStringWithPort()); | |
96 | BOOST_CHECK_EQUAL(entry.usec, latency); | |
97 | BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort()); | |
98 | } | |
99 | } | |
100 | ||
101 | /* push enough responses to get rid of the existing ones */ | |
102 | for (size_t idx = 0; idx < maxEntries; idx++) { | |
8110a0aa | 103 | rings.insertResponse(now, requestor2, qname, qtype, latency, size, dh, server, outgoingProtocol); |
cfe4b655 RG |
104 | } |
105 | BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), maxEntries); | |
106 | BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), maxEntries); | |
107 | for (const auto& shard : rings.d_shards) { | |
ecab77e5 RG |
108 | auto ring = shard->respRing.lock(); |
109 | BOOST_CHECK_EQUAL(ring->size(), entriesPerShard); | |
110 | for (const auto& entry : *ring) { | |
cfe4b655 RG |
111 | BOOST_CHECK_EQUAL(entry.name, qname); |
112 | BOOST_CHECK_EQUAL(entry.qtype, qtype); | |
113 | BOOST_CHECK_EQUAL(entry.size, size); | |
114 | BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); | |
115 | BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor2.toStringWithPort()); | |
116 | BOOST_CHECK_EQUAL(entry.usec, latency); | |
117 | BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort()); | |
118 | } | |
119 | } | |
120 | } | |
121 | ||
122 | ||
123 | BOOST_AUTO_TEST_CASE(test_Rings_Simple) { | |
124 | ||
125 | /* 5 entries over 1 shard */ | |
126 | test_ring(5, 1, 0); | |
127 | /* 500 entries over 10 shards */ | |
128 | test_ring(500, 10, 0); | |
129 | /* 5000 entries over 100 shards, max 5 try-lock attempts */ | |
130 | test_ring(500, 100, 5); | |
131 | } | |
132 | ||
133 | static void ringReaderThread(Rings& rings, std::atomic<bool>& done, size_t numberOfEntries, uint16_t qtype) | |
134 | { | |
135 | size_t iterationsDone = 0; | |
136 | ||
137 | while (done == false) { | |
138 | size_t numberOfQueries = 0; | |
139 | size_t numberOfResponses = 0; | |
140 | ||
cdcf7eeb | 141 | for (const auto& shard : rings.d_shards) { |
cfe4b655 | 142 | { |
ecab77e5 RG |
143 | auto rl = shard->queryRing.lock(); |
144 | for(const auto& c : *rl) { | |
cfe4b655 RG |
145 | numberOfQueries++; |
146 | // BOOST_CHECK* is slow as hell.. | |
147 | if(c.qtype != qtype) { | |
148 | cerr<<"Invalid query QType!"<<endl; | |
149 | return; | |
150 | } | |
151 | } | |
152 | } | |
153 | { | |
ecab77e5 RG |
154 | auto rl = shard->respRing.lock(); |
155 | for(const auto& c : *rl) { | |
cfe4b655 RG |
156 | if(c.qtype != qtype) { |
157 | cerr<<"Invalid response QType!"<<endl; | |
158 | return; | |
159 | } | |
160 | numberOfResponses++; | |
161 | } | |
162 | } | |
163 | } | |
164 | ||
165 | BOOST_CHECK_LE(numberOfQueries, numberOfEntries); | |
166 | BOOST_CHECK_LE(numberOfResponses, numberOfEntries); | |
167 | iterationsDone++; | |
168 | usleep(10000); | |
169 | } | |
170 | ||
690b86b7 | 171 | BOOST_CHECK_GT(iterationsDone, 1U); |
cfe4b655 RG |
172 | #if 0 |
173 | cerr<<"Done "<<iterationsDone<<" reading iterations"<<endl; | |
174 | #endif | |
175 | } | |
176 | ||
e3542673 | 177 | static void ringWriterThread(Rings& rings, size_t numberOfEntries, const Rings::Query& query, const Rings::Response& response) |
cfe4b655 RG |
178 | { |
179 | for (size_t idx = 0; idx < numberOfEntries; idx++) { | |
8110a0aa CHB |
180 | rings.insertQuery(query.when, query.requestor, query.name, query.qtype, query.size, query.dh, query.protocol); |
181 | rings.insertResponse(response.when, response.requestor, response.name, response.qtype, response.usec, response.size, response.dh, response.ds, response.protocol); | |
cfe4b655 RG |
182 | } |
183 | } | |
184 | ||
185 | BOOST_AUTO_TEST_CASE(test_Rings_Threaded) { | |
186 | size_t numberOfEntries = 1000000; | |
187 | size_t numberOfShards = 50; | |
188 | size_t lockAttempts = 5; | |
189 | size_t numberOfWriterThreads = 4; | |
190 | size_t entriesPerShard = numberOfEntries / numberOfShards; | |
191 | ||
192 | struct timespec now; | |
193 | gettime(&now); | |
194 | dnsheader dh; | |
e3542673 | 195 | memset(&dh, 0, sizeof(dh)); |
cfe4b655 RG |
196 | dh.id = htons(4242); |
197 | dh.qr = 0; | |
198 | dh.tc = 0; | |
199 | dh.rd = 0; | |
200 | dh.rcode = 0; | |
201 | dh.qdcount = htons(1); | |
202 | DNSName qname("rings.powerdns.com."); | |
203 | ComboAddress requestor("192.0.2.1"); | |
204 | ComboAddress server("192.0.2.42"); | |
205 | unsigned int latency = 100; | |
206 | uint16_t qtype = QType::AAAA; | |
207 | uint16_t size = 42; | |
426ccc67 RG |
208 | dnsdist::Protocol protocol = dnsdist::Protocol::DoUDP; |
209 | dnsdist::Protocol outgoingProtocol = dnsdist::Protocol::DoUDP; | |
cfe4b655 RG |
210 | |
211 | Rings rings(numberOfEntries, numberOfShards, lockAttempts, true); | |
2ed3021e | 212 | rings.init(); |
6f238a33 | 213 | #if defined(DNSDIST_RINGS_WITH_MACADDRESS) |
f3d17229 | 214 | Rings::Query query({requestor, qname, now, dh, size, qtype, protocol, dnsdist::MacAddress(), false}); |
6f238a33 | 215 | #else |
8110a0aa | 216 | Rings::Query query({requestor, qname, now, dh, size, qtype, protocol}); |
6f238a33 | 217 | #endif |
8110a0aa | 218 | Rings::Response response({requestor, server, qname, now, dh, latency, size, qtype, outgoingProtocol}); |
cfe4b655 | 219 | |
c46284c9 | 220 | std::atomic<bool> done(false); |
cfe4b655 RG |
221 | std::vector<std::thread> writerThreads; |
222 | std::thread readerThread(ringReaderThread, std::ref(rings), std::ref(done), numberOfEntries, qtype); | |
223 | ||
224 | /* we need to overcommit a bit to account for the fact that due to contention, | |
225 | we might not perfectly distribute the entries over the shards, | |
226 | so some of them might get full while other still have some place left */ | |
227 | size_t insertionsPerThread = (1.2 * numberOfEntries) / numberOfWriterThreads; | |
228 | for (size_t idx = 0; idx < numberOfWriterThreads; idx++) { | |
229 | writerThreads.push_back(std::thread(ringWriterThread, std::ref(rings), insertionsPerThread, query, response)); | |
230 | } | |
231 | ||
232 | /* wait for the writers to be finished */ | |
233 | for (auto& t : writerThreads) { | |
234 | t.join(); | |
235 | } | |
236 | ||
237 | /* we can stop the reader thread now */ | |
238 | done = true; | |
239 | readerThread.join(); | |
240 | ||
241 | BOOST_CHECK_EQUAL(rings.getNumberOfShards(), numberOfShards); | |
cfe4b655 | 242 | BOOST_CHECK_EQUAL(rings.d_shards.size(), rings.getNumberOfShards()); |
8c01fd0c | 243 | BOOST_CHECK_LE(rings.getNumberOfQueryEntries(), numberOfEntries); |
2454ebe3 | 244 | BOOST_CHECK_GT(rings.getNumberOfQueryEntries(), numberOfEntries * 0.75); |
9e76b301 | 245 | BOOST_WARN_GT(rings.getNumberOfQueryEntries(), numberOfEntries * 0.99); |
8c01fd0c | 246 | BOOST_CHECK_LE(rings.getNumberOfResponseEntries(), numberOfEntries); |
2454ebe3 | 247 | BOOST_CHECK_GT(rings.getNumberOfResponseEntries(), numberOfEntries * 0.75); |
9e76b301 | 248 | BOOST_WARN_GT(rings.getNumberOfResponseEntries(), numberOfEntries * 0.99); |
cfe4b655 RG |
249 | |
250 | size_t totalQueries = 0; | |
251 | size_t totalResponses = 0; | |
252 | for (const auto& shard : rings.d_shards) { | |
ecab77e5 RG |
253 | { |
254 | auto ring = shard->queryRing.lock(); | |
255 | BOOST_CHECK_LE(ring->size(), entriesPerShard); | |
256 | // verify that the shard is not empty | |
257 | BOOST_CHECK_GT(ring->size(), (entriesPerShard * 0.5) + 1); | |
258 | // this would be optimal | |
259 | BOOST_WARN_GT(ring->size(), entriesPerShard * 0.95); | |
260 | totalQueries += ring->size(); | |
261 | for (const auto& entry : *ring) { | |
262 | BOOST_CHECK_EQUAL(entry.name, qname); | |
263 | BOOST_CHECK_EQUAL(entry.qtype, qtype); | |
264 | BOOST_CHECK_EQUAL(entry.size, size); | |
265 | BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); | |
266 | BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort()); | |
267 | } | |
cfe4b655 | 268 | } |
ecab77e5 RG |
269 | { |
270 | auto ring = shard->respRing.lock(); | |
271 | BOOST_CHECK_LE(ring->size(), entriesPerShard); | |
272 | // verify that the shard is not empty | |
273 | BOOST_CHECK_GT(ring->size(), (entriesPerShard * 0.5) + 1); | |
274 | // this would be optimal | |
275 | BOOST_WARN_GT(ring->size(), entriesPerShard * 0.95); | |
276 | totalResponses += ring->size(); | |
277 | for (const auto& entry : *ring) { | |
278 | BOOST_CHECK_EQUAL(entry.name, qname); | |
279 | BOOST_CHECK_EQUAL(entry.qtype, qtype); | |
280 | BOOST_CHECK_EQUAL(entry.size, size); | |
281 | BOOST_CHECK_EQUAL(entry.when.tv_sec, now.tv_sec); | |
282 | BOOST_CHECK_EQUAL(entry.requestor.toStringWithPort(), requestor.toStringWithPort()); | |
283 | BOOST_CHECK_EQUAL(entry.usec, latency); | |
284 | BOOST_CHECK_EQUAL(entry.ds.toStringWithPort(), server.toStringWithPort()); | |
285 | } | |
cfe4b655 RG |
286 | } |
287 | } | |
288 | BOOST_CHECK_EQUAL(rings.getNumberOfQueryEntries(), totalQueries); | |
289 | BOOST_CHECK_EQUAL(rings.getNumberOfResponseEntries(), totalResponses); | |
290 | #if 0 | |
291 | cerr<<"Done "<<(insertionsPerThread*numberOfWriterThreads)<<" insertions"<<endl; | |
292 | cerr<<"Got "<<rings.d_deferredQueryInserts<<" deferred query insertions"<<endl; | |
293 | cerr<<"Got "<<rings.d_blockingQueryInserts<<" blocking query insertions"<<endl; | |
294 | cerr<<"Got "<<rings.d_deferredResponseInserts<<" deferred response insertions"<<endl; | |
295 | cerr<<"Got "<<rings.d_blockingResponseInserts<<" blocking response insertions"<<endl; | |
296 | #endif | |
297 | } | |
298 | ||
299 | BOOST_AUTO_TEST_SUITE_END() |