]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/calidns.cc
Fix error handling in poll loop.
[thirdparty/pdns.git] / pdns / calidns.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 #include <atomic>
27 #include <iostream>
28 #include <fstream>
29 #include <memory>
30 #include <poll.h>
31 #include <thread>
32
33 #include <boost/program_options.hpp>
34
35 #include "dns_random.hh"
36 #include "dnsparser.hh"
37 #include "dnswriter.hh"
38 #include "dnsrecords.hh"
39 #include "ednsoptions.hh"
40 #include "ednssubnet.hh"
41 #include "misc.hh"
42 #include "sstuff.hh"
43 #include "statbag.hh"
44
45 using std::thread;
46 using std::unique_ptr;
47
48 StatBag S;
49
50 static std::atomic<unsigned int> g_recvcounter, g_recvbytes;
51 static volatile bool g_done;
52
53 namespace po = boost::program_options;
54 static po::variables_map g_vm;
55
56 static bool g_quiet;
57
58 static void* recvThread(const vector<Socket*>* sockets)
59 {
60 vector<pollfd> rfds, fds;
61 for(const auto& s : *sockets) {
62 struct pollfd pfd;
63 pfd.fd = s->getHandle();
64 pfd.events = POLLIN;
65 pfd.revents = 0;
66 rfds.push_back(pfd);
67 }
68
69 int err;
70
71 #if HAVE_RECVMMSG
72 vector<struct mmsghdr> buf(100);
73 for(auto& m : buf) {
74 fillMSGHdr(&m.msg_hdr, new struct iovec, new char[512], 512, new char[1500], 1500, new ComboAddress("127.0.0.1"));
75 }
76 #else
77 struct msghdr buf;
78 fillMSGHdr(&buf, new struct iovec, new char[512], 512, new char[1500], 1500, new ComboAddress("127.0.0.1"));
79 #endif
80
81 while(!g_done) {
82 fds=rfds;
83
84 err = poll(&fds[0], fds.size(), -1);
85 if (err < 0) {
86 if (errno == EINTR)
87 continue;
88 unixDie("Unable to poll for new UDP events");
89 }
90
91 for(auto &pfd : fds) {
92 if (pfd.revents & POLLIN) {
93 #if HAVE_RECVMMSG
94 if ((err=recvmmsg(pfd.fd, &buf[0], buf.size(), MSG_WAITFORONE, 0)) < 0 ) {
95 if(errno != EAGAIN)
96 unixDie("recvmmsg");
97 continue;
98 }
99 g_recvcounter+=err;
100 for(int n=0; n < err; ++n)
101 g_recvbytes += buf[n].msg_len;
102 #else
103 if ((err = recvmsg(pfd.fd, &buf, 0)) < 0) {
104 if (errno != EAGAIN)
105 unixDie("recvmsg");
106 continue;
107 }
108 g_recvcounter++;
109 for (int i = 0; i < buf.msg_iovlen; i++)
110 g_recvbytes += buf.msg_iov[i].iov_len;
111 #endif
112 }
113 }
114 }
115 return 0;
116 }
117
118 static void setSocketBuffer(int fd, int optname, uint32_t size)
119 {
120 uint32_t psize=0;
121 socklen_t len=sizeof(psize);
122
123 if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
124 if (!g_quiet) {
125 cerr<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
126 }
127 return;
128 }
129
130 if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 ) {
131 if (!g_quiet) {
132 cerr<<"Warning: unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
133 }
134 }
135 }
136
137
138 static void setSocketReceiveBuffer(int fd, uint32_t size)
139 {
140 setSocketBuffer(fd, SO_RCVBUF, size);
141 }
142
143 static void setSocketSendBuffer(int fd, uint32_t size)
144 {
145 setSocketBuffer(fd, SO_SNDBUF, size);
146 }
147
148 static ComboAddress getRandomAddressFromRange(const Netmask& ecsRange)
149 {
150 ComboAddress result = ecsRange.getMaskedNetwork();
151 uint8_t bits = ecsRange.getBits();
152 uint32_t mod = 1 << (32 - bits);
153 result.sin4.sin_addr.s_addr = result.sin4.sin_addr.s_addr + ntohl(dns_random(mod));
154 return result;
155 }
156
157 static void replaceEDNSClientSubnet(vector<uint8_t>* packet, const Netmask& ecsRange)
158 {
159 /* the last 4 bytes of the packet are the IPv4 address */
160 ComboAddress rnd = getRandomAddressFromRange(ecsRange);
161 uint32_t addr = rnd.sin4.sin_addr.s_addr;
162
163 const auto packetSize = packet->size();
164 if (packetSize < sizeof(addr)) {
165 return;
166 }
167
168 memcpy(&packet->at(packetSize - sizeof(addr)), &addr, sizeof(addr));
169 }
170
171 static void sendPackets(const vector<Socket*>* sockets, const vector<vector<uint8_t>* >& packets, int qps, ComboAddress dest, const Netmask& ecsRange)
172 {
173 unsigned int burst=100;
174 const auto nsecPerBurst=1*(unsigned long)(burst*1000000000.0/qps);
175 struct timespec nsec;
176 nsec.tv_sec=0;
177 nsec.tv_nsec=0;
178 int count=0;
179 unsigned int nBursts=0;
180 DTime dt;
181 dt.set();
182
183 struct Unit {
184 struct msghdr msgh;
185 struct iovec iov;
186 char cbuf[256];
187 };
188 vector<unique_ptr<Unit> > units;
189 int ret;
190
191 for(const auto& p : packets) {
192 count++;
193
194 Unit u;
195
196 if (!ecsRange.empty()) {
197 replaceEDNSClientSubnet(p, ecsRange);
198 }
199
200 fillMSGHdr(&u.msgh, &u.iov, nullptr, 0, (char*)&(*p)[0], p->size(), &dest);
201 if((ret=sendmsg((*sockets)[count % sockets->size()]->getHandle(),
202 &u.msgh, 0)))
203 if(ret < 0)
204 unixDie("sendmsg");
205
206
207 if(!(count%burst)) {
208 nBursts++;
209 // Calculate the time in nsec we need to sleep to the next burst.
210 // If this is negative, it means that we are not achieving the requested
211 // target rate, in which case we skip the sleep.
212 int toSleep = nBursts*nsecPerBurst - 1000*dt.udiffNoReset();
213 if (toSleep > 0) {
214 nsec.tv_nsec = toSleep;
215 nanosleep(&nsec, 0);
216 }
217 }
218 }
219 }
220
221 static void usage(po::options_description &desc) {
222 cerr<<"Syntax: calidns [OPTIONS] QUERY_FILE DESTINATION INITIAL_QPS HITRATE"<<endl;
223 cerr<<desc<<endl;
224 }
225
226 /*
227 New plan. Set cache hit percentage, which we achieve on a per second basis.
228 So we start with 10000 qps for example, and for 90% cache hit ratio means
229 we take 1000 unique queries and each send them 10 times.
230
231 We then move the 1000 unique queries to the 'known' pool.
232
233 For the next second, say 20000 qps, we know we are going to need 2000 new queries,
234 so we take 2000 from the unknown pool. Then we need 18000 cache hits. We can get 1000 from
235 the known pool, leaving us down 17000. Or, we have 3000 in total now and we need 2000. We simply
236 repeat the 3000 mix we have ~7 times. The 2000 can now go to the known pool too.
237
238 For the next second, say 30000 qps, we'll need 3000 cache misses, which we get from
239 the unknown pool. To this we add 3000 queries from the known pool. Next up we repeat this batch 5
240 times.
241
242 In general the algorithm therefore is:
243
244 1) Calculate number of cache misses required, get them from the unknown pool
245 2) Move those to the known pool
246 3) Fill up to amount of queries we need with random picks from the known pool
247
248 */
249
250 int main(int argc, char** argv)
251 try
252 {
253 po::options_description desc("Options");
254 desc.add_options()
255 ("help,h", "Show this helpful message")
256 ("version", "Show the version number")
257 ("ecs", po::value<string>(), "Add EDNS Client Subnet option to outgoing queries using random addresses from the specified range (IPv4 only)")
258 ("ecs-from-file", "Read IP or subnet values from the query file and add them as EDNS Client Subnet options to outgoing queries")
259 ("increment", po::value<float>()->default_value(1.1), "Set the factor to increase the QPS load per run")
260 ("maximum-qps", po::value<uint32_t>(), "Stop incrementing once this rate has been reached, to provide a stable load")
261 ("minimum-success-rate", po::value<double>()->default_value(0), "Stop the test as soon as the success rate drops below this value, in percent")
262 ("plot-file", po::value<string>(), "Write results to the specific file")
263 ("quiet", "Whether to run quietly, outputting only the maximum QPS reached. This option is mostly useful when used with --minimum-success-rate")
264 ("want-recursion", "Set the Recursion Desired flag on queries");
265 po::options_description alloptions;
266 po::options_description hidden("hidden options");
267 hidden.add_options()
268 ("query-file", po::value<string>(), "File with queries")
269 ("destination", po::value<string>(), "Destination address")
270 ("initial-qps", po::value<uint32_t>(), "Initial number of queries per second")
271 ("hitrate", po::value<double>(), "Aim this percent cache hitrate");
272
273 alloptions.add(desc).add(hidden);
274 po::positional_options_description p;
275 p.add("query-file", 1);
276 p.add("destination", 1);
277 p.add("initial-qps", 1);
278 p.add("hitrate", 1);
279
280 po::store(po::command_line_parser(argc, argv).options(alloptions).positional(p).run(), g_vm);
281 po::notify(g_vm);
282
283 if (g_vm.count("help")) {
284 usage(desc);
285 return EXIT_SUCCESS;
286 }
287
288 if (g_vm.count("version")) {
289 cerr<<"calidns "<<VERSION<<endl;
290 return EXIT_SUCCESS;
291 }
292
293 if (!(g_vm.count("query-file") && g_vm.count("destination") && g_vm.count("initial-qps") && g_vm.count("hitrate"))) {
294 usage(desc);
295 return EXIT_FAILURE;
296 }
297
298 float increment = 1.1;
299 try {
300 increment = g_vm["increment"].as<float>();
301 }
302 catch(...) {
303 }
304
305 bool wantRecursion = g_vm.count("want-recursion");
306 bool useECSFromFile = g_vm.count("ecs-from-file");
307 g_quiet = g_vm.count("quiet");
308
309 double hitrate = g_vm["hitrate"].as<double>();
310 if (hitrate > 100 || hitrate < 0) {
311 cerr<<"hitrate must be between 0 and 100, not "<<hitrate<<endl;
312 return EXIT_FAILURE;
313 }
314 hitrate /= 100;
315 uint32_t qpsstart = g_vm["initial-qps"].as<uint32_t>();
316
317 uint32_t maximumQps = std::numeric_limits<uint32_t>::max();
318 if (g_vm.count("maximum-qps")) {
319 maximumQps = g_vm["maximum-qps"].as<uint32_t>();
320 }
321
322 double minimumSuccessRate = g_vm["minimum-success-rate"].as<double>();
323 if (minimumSuccessRate > 100.0 || minimumSuccessRate < 0.0) {
324 cerr<<"Minimum success rate must be between 0 and 100, not "<<minimumSuccessRate<<endl;
325 return EXIT_FAILURE;
326 }
327
328 Netmask ecsRange;
329 if (g_vm.count("ecs")) {
330 dns_random_init("0123456789abcdef");
331
332 try {
333 ecsRange = Netmask(g_vm["ecs"].as<string>());
334 if (!ecsRange.empty()) {
335
336 if (!ecsRange.isIpv4()) {
337 cerr<<"Only IPv4 ranges are supported for ECS at the moment!"<<endl;
338 return EXIT_FAILURE;
339 }
340
341 if (!g_quiet) {
342 cout<<"Adding ECS option to outgoing queries with random addresses from the "<<ecsRange.toString()<<" range"<<endl;
343 }
344 }
345 }
346 catch (const NetmaskException& e) {
347 cerr<<"Error while parsing the ECS netmask: "<<e.reason<<endl;
348 return EXIT_FAILURE;
349 }
350 }
351
352 struct sched_param param;
353 param.sched_priority=99;
354
355 #if HAVE_SCHED_SETSCHEDULER
356 if(sched_setscheduler(0, SCHED_FIFO, &param) < 0) {
357 if (!g_quiet) {
358 cerr<<"Unable to set SCHED_FIFO: "<<strerror(errno)<<endl;
359 }
360 }
361 #endif
362
363 ifstream ifs(g_vm["query-file"].as<string>());
364 string line;
365 reportAllTypes();
366 vector<std::shared_ptr<vector<uint8_t> > > unknown, known;
367 std::vector<std::string> fields;
368 fields.reserve(3);
369
370 while(getline(ifs, line)) {
371 vector<uint8_t> packet;
372 DNSPacketWriter::optvect_t ednsOptions;
373 boost::trim(line);
374 if (line.empty() || line.at(0) == '#') {
375 continue;
376 }
377
378 fields.clear();
379 stringtok(fields, line, "\t ");
380 if ((useECSFromFile && fields.size() < 3) || fields.size() < 2) {
381 cerr<<"Skipping invalid line '"<<line<<", it does not contain enough values"<<endl;
382 continue;
383 }
384
385 const std::string& qname = fields.at(0);
386 const std::string& qtype = fields.at(1);
387 std::string subnet;
388
389 if (useECSFromFile) {
390 subnet = fields.at(2);
391 }
392
393 DNSPacketWriter pw(packet, DNSName(qname), DNSRecordContent::TypeToNumber(qtype));
394 pw.getHeader()->rd=wantRecursion;
395 pw.getHeader()->id=dns_random(UINT16_MAX);
396
397 if(!subnet.empty() || !ecsRange.empty()) {
398 EDNSSubnetOpts opt;
399 opt.source = Netmask(subnet.empty() ? "0.0.0.0/32" : subnet);
400 ednsOptions.push_back(std::make_pair(EDNSOptionCode::ECS, makeEDNSSubnetOptsString(opt)));
401 }
402
403 if(!ednsOptions.empty() || pw.getHeader()->id % 2) {
404 pw.addOpt(1500, 0, EDNSOpts::DNSSECOK, ednsOptions);
405 pw.commit();
406 }
407 unknown.emplace_back(std::make_shared<vector<uint8_t>>(packet));
408 }
409 random_shuffle(unknown.begin(), unknown.end());
410 if (!g_quiet) {
411 cout<<"Generated "<<unknown.size()<<" ready to use queries"<<endl;
412 }
413
414 vector<Socket*> sockets;
415 ComboAddress dest;
416 try {
417 dest = ComboAddress(g_vm["destination"].as<string>(), 53);
418 }
419 catch (PDNSException &e) {
420 cerr<<e.reason<<endl;
421 return EXIT_FAILURE;
422 }
423 for(int i=0; i < 24; ++i) {
424 Socket *sock = new Socket(dest.sin4.sin_family, SOCK_DGRAM);
425 // sock->connect(dest);
426 setSocketSendBuffer(sock->getHandle(), 2000000);
427 setSocketReceiveBuffer(sock->getHandle(), 2000000);
428 sockets.push_back(sock);
429 }
430 new thread(recvThread, &sockets);
431 uint32_t qps;
432
433 ofstream plot;
434 if (g_vm.count("plot-file")) {
435 plot.open(g_vm["plot-file"].as<string>());
436 if (!plot) {
437 cerr<<"Error opening "<<g_vm["plot-file"].as<string>()<<" for writing: "<<strerror(errno)<<endl;
438 return EXIT_FAILURE;
439 }
440 }
441
442 double bestQPS = 0.0;
443 double bestPerfectQPS = 0.0;
444
445 for(qps=qpsstart;;) {
446 double seconds=1;
447 if (!g_quiet) {
448 cout<<"Aiming at "<<qps<< "qps (RD="<<wantRecursion<<") for "<<seconds<<" seconds at cache hitrate "<<100.0*hitrate<<"%";
449 }
450 unsigned int misses=(1-hitrate)*qps*seconds;
451 unsigned int total=qps*seconds;
452 if (misses == 0) {
453 misses = 1;
454 }
455 if (!g_quiet) {
456 cout<<", need "<<misses<<" misses, "<<total<<" queries, have "<<unknown.size()<<" unknown left!"<<endl;
457 }
458
459 if (misses > unknown.size()) {
460 cerr<<"Not enough queries remaining (need at least "<<misses<<" and got "<<unknown.size()<<", please add more to the query file), exiting."<<endl;
461 return EXIT_FAILURE;
462 }
463 vector<vector<uint8_t>*> toSend;
464 unsigned int n;
465 for(n=0; n < misses; ++n) {
466 auto ptr=unknown.back();
467 unknown.pop_back();
468 toSend.push_back(ptr.get());
469 known.push_back(ptr);
470 }
471 for(;n < total; ++n) {
472 toSend.push_back(known[dns_random(known.size())].get());
473 }
474 random_shuffle(toSend.begin(), toSend.end());
475 g_recvcounter.store(0);
476 g_recvbytes=0;
477 DTime dt;
478 dt.set();
479
480 sendPackets(&sockets, toSend, qps, dest, ecsRange);
481
482 const auto udiff = dt.udiffNoReset();
483 const auto realqps=toSend.size()/(udiff/1000000.0);
484 if (!g_quiet) {
485 cout<<"Achieved "<<realqps<<" qps over "<< udiff/1000000.0<<" seconds"<<endl;
486 }
487
488 usleep(50000);
489 const auto received = g_recvcounter.load();
490 const auto udiffReceived = dt.udiff();
491 const auto realReceivedQPS = received/(udiffReceived/1000000.0);
492 double perc=received*100.0/toSend.size();
493 if (!g_quiet) {
494 cout<<"Received "<<received<<" packets over "<< udiffReceived/1000000.0<<" seconds ("<<perc<<"%, adjusted received rate "<<realReceivedQPS<<" qps)"<<endl;
495 }
496
497 if (plot) {
498 plot<<qps<<" "<<realqps<<" "<<perc<<" "<<received/(udiff/1000000.0)<<" " << 8*g_recvbytes.load()/(udiff/1000000.0)<<endl;
499 plot.flush();
500 }
501
502 if (qps < maximumQps) {
503 qps *= increment;
504 }
505 else {
506 qps = maximumQps;
507 }
508
509 if (minimumSuccessRate > 0.0 && perc < minimumSuccessRate) {
510 if (g_quiet) {
511 cout<<bestQPS<<endl;
512 }
513 else {
514 cout<<"The latest success rate ("<<perc<<") dropped below the minimum success rate of "<<minimumSuccessRate<<", stopping."<<endl;
515 cout<<"The final rate reached before failing was "<<bestQPS<<" qps (best rate at 100% was "<<bestPerfectQPS<<" qps)"<<endl;
516 }
517 break;
518 }
519
520 bestQPS = std::max(bestQPS, realReceivedQPS);
521 if (perc >= 100.0) {
522 bestPerfectQPS = std::max(bestPerfectQPS, realReceivedQPS);
523 }
524 }
525
526 if (plot) {
527 plot.flush();
528 }
529
530 // t1.detach();
531 }
532 catch(std::exception& e)
533 {
534 cerr<<"Fatal error: "<<e.what()<<endl;
535 return EXIT_FAILURE;
536 }