]> git.ipfire.org Git - thirdparty/pdns.git/blob - pdns/calidns.cc
rec: Don't account chained queries more than once
[thirdparty/pdns.git] / pdns / calidns.cc
1 /*
2 * This file is part of PowerDNS or dnsdist.
3 * Copyright -- PowerDNS.COM B.V. and its contributors
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of version 2 of the GNU General Public License as
7 * published by the Free Software Foundation.
8 *
9 * In addition, for the avoidance of any doubt, permission is granted to
10 * link this program with OpenSSL and to (re)distribute the binaries
11 * produced as the result of such linking.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21 */
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25 #include <iostream>
26 #include "dnsparser.hh"
27 #include "sstuff.hh"
28 #include "misc.hh"
29 #include "dnswriter.hh"
30 #include "dnsrecords.hh"
31 #include <thread>
32 #include <atomic>
33 #include "statbag.hh"
34 #include <fstream>
35 #include <poll.h>
36 #include <memory>
37 #include <boost/program_options.hpp>
38 using std::thread;
39 using std::unique_ptr;
40
41 StatBag S;
42
43 std::atomic<unsigned int> g_recvcounter, g_recvbytes;
44 volatile bool g_done;
45
46 namespace po = boost::program_options;
47 po::variables_map g_vm;
48
49 void* recvThread(const vector<Socket*>* sockets)
50 {
51 vector<pollfd> rfds, fds;
52 for(const auto& s : *sockets) {
53 struct pollfd pfd;
54 pfd.fd = s->getHandle();
55 pfd.events = POLLIN;
56 pfd.revents = 0;
57 rfds.push_back(pfd);
58 }
59
60 int err;
61
62 vector<struct mmsghdr> buf(100);
63 for(auto& m : buf) {
64 fillMSGHdr(&m.msg_hdr, new struct iovec, new char[512], 512, new char[1500], 1500, new ComboAddress("127.0.0.1"));
65 }
66
67 while(!g_done) {
68 fds=rfds;
69
70 err = poll(&fds[0], fds.size(), -1);
71 if(err < 0) {
72 if(errno==EINTR)
73 continue;
74 unixDie("Unable to poll for new UDP events");
75 }
76
77 for(auto &pfd : fds) {
78 if(pfd.revents & POLLIN) {
79
80 if((err=recvmmsg(pfd.fd, &buf[0], buf.size(), MSG_WAITFORONE, 0)) < 0 ) {
81 if(errno != EAGAIN)
82 cerr<<"recvfrom gave error, ignoring: "<<strerror(errno)<<endl;
83 unixDie("recvmmsg");
84 continue;
85 }
86 g_recvcounter+=err;
87 for(int n=0; n < err; ++n)
88 g_recvbytes += buf[n].msg_len;
89 }
90 }
91 }
92
93 return 0;
94 }
95
96
97 void setSocketBuffer(int fd, int optname, uint32_t size)
98 {
99 uint32_t psize=0;
100 socklen_t len=sizeof(psize);
101
102 if(!getsockopt(fd, SOL_SOCKET, optname, (char*)&psize, &len) && psize > size) {
103 cerr<<"Not decreasing socket buffer size from "<<psize<<" to "<<size<<endl;
104 return;
105 }
106
107 if (setsockopt(fd, SOL_SOCKET, optname, (char*)&size, sizeof(size)) < 0 )
108 cerr<<"Warning: unable to raise socket buffer size to "<<size<<": "<<strerror(errno)<<endl;
109 }
110
111
112 static void setSocketReceiveBuffer(int fd, uint32_t size)
113 {
114 setSocketBuffer(fd, SO_RCVBUF, size);
115 }
116
117 static void setSocketSendBuffer(int fd, uint32_t size)
118 {
119 setSocketBuffer(fd, SO_SNDBUF, size);
120 }
121
122 void sendPackets(const vector<Socket*>* sockets, const vector<vector<uint8_t>* >& packets, int qps, ComboAddress dest)
123 {
124 unsigned int burst=100;
125 struct timespec nsec;
126 nsec.tv_sec=0;
127 nsec.tv_nsec=1*(unsigned long)(burst*1000000000.0/qps);
128 int count=0;
129
130 struct Unit {
131 struct msghdr msgh;
132 struct iovec iov;
133 char cbuf[256];
134 };
135 vector<unique_ptr<Unit> > units;
136 int ret;
137
138 for(const auto& p : packets) {
139 count++;
140
141 Unit u;
142
143 fillMSGHdr(&u.msgh, &u.iov, u.cbuf, 0, (char*)&(*p)[0], p->size(), &dest);
144 if((ret=sendmsg((*sockets)[count % sockets->size()]->getHandle(),
145 &u.msgh, 0)))
146 if(ret < 0)
147 unixDie("sendmmsg");
148
149
150 if(!(count%burst))
151 nanosleep(&nsec, 0);
152 }
153 }
154
155 void usage(po::options_description &desc) {
156 cerr<<"Syntax: calidns [OPTIONS] QUERY_FILE DESTINATION INITIAL_QPS HITRATE"<<endl;
157 cerr<<desc<<endl;
158 }
159
160 /*
161 New plan. Set cache hit percentage, which we achieve on a per second basis.
162 So we start with 10000 qps for example, and for 90% cache hit ratio means
163 we take 1000 unique queries and each send them 10 times.
164
165 We then move the 1000 unique queries to the 'known' pool.
166
167 For the next second, say 20000 qps, we know we are going to need 2000 new queries,
168 so we take 2000 from the unknown pool. Then we need 18000 cache hits. We can get 1000 from
169 the known pool, leaving us down 17000. Or, we have 3000 in total now and we need 2000. We simply
170 repeat the 3000 mix we have ~7 times. The 2000 can now go to the known pool too.
171
172 For the next second, say 30000 qps, we'll need 3000 cache misses, which we get from
173 the unknown pool. To this we add 3000 queries from the known pool. Next up we repeat this batch 5
174 times.
175
176 In general the algorithm therefore is:
177
178 1) Calculate number of cache misses required, get them from the unknown pool
179 2) Move those to the known pool
180 3) Fill up to amount of queries we need with random picks from the known pool
181
182 */
183
184 int main(int argc, char** argv)
185 try
186 {
187 po::options_description desc("Options");
188 desc.add_options()
189 ("help,h", "Show this helpful message")
190 ("version", "Show the version number")
191 ("increment", po::value<float>()->default_value(1.1), "Set the factor to increase the QPS load per run")
192 ("want-recursion", "Set the Recursion Desired flag on queries");
193 po::options_description alloptions;
194 po::options_description hidden("hidden options");
195 hidden.add_options()
196 ("query-file", po::value<string>(), "File with queries")
197 ("destination", po::value<string>(), "Destination address")
198 ("initial-qps", po::value<uint32_t>(), "Initial number of queries per second")
199 ("hitrate", po::value<double>(), "Aim this percent cache hitrate");
200
201 alloptions.add(desc).add(hidden);
202 po::positional_options_description p;
203 p.add("query-file", 1);
204 p.add("destination", 1);
205 p.add("initial-qps", 1);
206 p.add("hitrate", 1);
207
208 po::store(po::command_line_parser(argc, argv).options(alloptions).positional(p).run(), g_vm);
209 po::notify(g_vm);
210
211 if (g_vm.count("help")) {
212 usage(desc);
213 return EXIT_SUCCESS;
214 }
215
216 if (g_vm.count("version")) {
217 cerr<<"calidns "<<VERSION<<endl;
218 return EXIT_SUCCESS;
219 }
220
221 if (!(g_vm.count("query-file") && g_vm.count("destination") && g_vm.count("initial-qps") && g_vm.count("hitrate"))) {
222 usage(desc);
223 return EXIT_FAILURE;
224 }
225
226 float increment = 1.1;
227 try {
228 increment = g_vm["increment"].as<float>();
229 }
230 catch(...) {
231 }
232
233 bool wantRecursion = g_vm.count("want-recursion");
234
235 double hitrate = g_vm["hitrate"].as<double>();
236 if (hitrate > 100 || hitrate < 0) {
237 cerr<<"hitrate must be between 0 and 100, not "<<hitrate<<endl;
238 return EXIT_FAILURE;
239 }
240 hitrate /= 100;
241 uint32_t qpsstart = g_vm["initial-qps"].as<uint32_t>();
242
243 struct sched_param param;
244 param.sched_priority=99;
245
246 if(sched_setscheduler(0, SCHED_FIFO, &param) < 0)
247 cerr<<"Unable to set SCHED_FIFO: "<<strerror(errno)<<endl;
248
249 ifstream ifs(g_vm["query-file"].as<string>());
250 string line;
251 reportAllTypes();
252 vector<std::shared_ptr<vector<uint8_t> > > unknown, known;
253 while(getline(ifs, line)) {
254 vector<uint8_t> packet;
255 boost::trim(line);
256 const auto fields = splitField(line, ' ');
257 DNSPacketWriter pw(packet, DNSName(fields.first), DNSRecordContent::TypeToNumber(fields.second));
258 pw.getHeader()->rd=wantRecursion;
259 pw.getHeader()->id=random();
260 if(pw.getHeader()->id % 2) {
261 pw.addOpt(1500, 0, EDNSOpts::DNSSECOK);
262 pw.commit();
263 }
264 unknown.emplace_back(std::make_shared<vector<uint8_t>>(packet));
265 }
266 random_shuffle(unknown.begin(), unknown.end());
267 cout<<"Generated "<<unknown.size()<<" ready to use queries"<<endl;
268
269 vector<Socket*> sockets;
270 ComboAddress dest;
271 try {
272 dest = ComboAddress(g_vm["destination"].as<string>(), 53);
273 }
274 catch (PDNSException &e) {
275 cerr<<e.reason<<endl;
276 return EXIT_FAILURE;
277 }
278 for(int i=0; i < 24; ++i) {
279 Socket *sock = new Socket(dest.sin4.sin_family, SOCK_DGRAM);
280 // sock->connect(dest);
281 setSocketSendBuffer(sock->getHandle(), 2000000);
282 setSocketReceiveBuffer(sock->getHandle(), 2000000);
283 sockets.push_back(sock);
284 }
285 new thread(recvThread, &sockets);
286 int qps;
287
288 ofstream plot("plot");
289 for(qps=qpsstart;;qps *= increment) {
290 double seconds=1;
291 cout<<"Aiming at "<<qps<< "qps (RD="<<wantRecursion<<") for "<<seconds<<" seconds at cache hitrate "<<100.0*hitrate<<"%";
292 unsigned int misses=(1-hitrate)*qps*seconds;
293 unsigned int total=qps*seconds;
294 if (misses == 0) {
295 misses = 1;
296 }
297 cout<<", need "<<misses<<" misses, "<<total<<" queries, have "<<unknown.size()<<" unknown left!"<<endl;
298
299 if (misses > unknown.size()) {
300 cerr<<"Not enough queries remaining (need at least "<<misses<<" and got "<<unknown.size()<<", please add more to the query file), exiting."<<endl;
301 exit(1);
302 }
303 vector<vector<uint8_t>*> toSend;
304 unsigned int n;
305 for(n=0; n < misses; ++n) {
306 auto ptr=unknown.back();
307 unknown.pop_back();
308 toSend.push_back(ptr.get());
309 known.push_back(ptr);
310 }
311 for(;n < total; ++n) {
312 toSend.push_back(known[random()%known.size()].get());
313 }
314 random_shuffle(toSend.begin(), toSend.end());
315 g_recvcounter.store(0);
316 g_recvbytes=0;
317 DTime dt;
318 dt.set();
319
320 sendPackets(&sockets, toSend, qps, dest);
321
322 auto udiff = dt.udiff();
323 auto realqps=toSend.size()/(udiff/1000000.0);
324 cout<<"Achieved "<<realqps<<"qps"<< " over "<< udiff/1000000.0<<" seconds"<<endl;
325
326 usleep(50000);
327 double perc=g_recvcounter.load()*100.0/toSend.size();
328 cout<<"Received "<<g_recvcounter.load()<<" packets ("<<perc<<"%)"<<endl;
329 plot<<qps<<" "<<realqps<<" "<<perc<<" "<<g_recvcounter.load()/(udiff/1000000.0)<<" " << 8*g_recvbytes.load()/(udiff/1000000.0)<<endl;
330 plot.flush();
331 }
332 plot.flush();
333 // t1.detach();
334 }
335 catch(std::exception& e)
336 {
337 cerr<<"Fatal error: "<<e.what()<<endl;
338 }