]>
Commit | Line | Data |
---|---|---|
ca404e94 RG |
1 | #!/usr/bin/env python2 |
2 | ||
3 | import clientsubnetoption | |
4 | import dns | |
5 | import Queue | |
6 | import os | |
7 | import socket | |
8 | import struct | |
9 | import subprocess | |
10 | import sys | |
11 | import threading | |
12 | import time | |
13 | import unittest | |
14 | ||
15 | class DNSDistTest(unittest.TestCase): | |
16 | """ | |
17 | Set up a dnsdist instance and responder threads. | |
18 | Queries sent to dnsdist are relayed to the responder threads, | |
19 | who reply with the response provided by the tests themselves | |
20 | on a queue. Responder threads also queue the queries received | |
21 | from dnsdist on a separate queue, allowing the tests to check | |
22 | that the queries sent from dnsdist were as expected. | |
23 | """ | |
24 | _dnsDistPort = 5340 | |
25 | _testServerPort = 5350 | |
26 | _dnsdistcmd = (os.environ['DNSDISTBIN'] + " -C dnsdist.conf --acl 127.0.0.1/32 -l 127.0.0.1:" + str(_dnsDistPort) + " 127.0.0.1:" + str(_testServerPort)).split() | |
27 | _toResponderQueue = Queue.Queue() | |
28 | _fromResponderQueue = Queue.Queue() | |
29 | _dnsdist = None | |
30 | ||
31 | @classmethod | |
32 | def startResponders(cls): | |
33 | print("Launching responders..") | |
34 | cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[]) | |
35 | cls._UDPResponder.setDaemon(True) | |
36 | cls._UDPResponder.start() | |
37 | cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[]) | |
38 | cls._TCPResponder.setDaemon(True) | |
39 | cls._TCPResponder.start() | |
40 | ||
41 | @classmethod | |
42 | def startDNSDist(cls, shutUp=True): | |
43 | print("Launching dnsdist..") | |
44 | print(' '.join(cls._dnsdistcmd)) | |
45 | if shutUp: | |
46 | with open(os.devnull, 'w') as fdDevNull: | |
47 | cls._dnsdist = subprocess.Popen(cls._dnsdistcmd, close_fds=True, stdout=fdDevNull, stderr=fdDevNull) | |
48 | else: | |
49 | cls._dnsdist = subprocess.Popen(cls._dnsdistcmd, close_fds=True) | |
50 | ||
51 | time.sleep(1) | |
52 | ||
53 | if cls._dnsdist.poll() is not None: | |
54 | cls._dnsdist.terminate() | |
55 | cls._dnsdist.wait() | |
56 | sys.exit(cls._dnsdist.returncode) | |
57 | ||
58 | @classmethod | |
59 | def setUpSockets(cls): | |
60 | print("Setting up UDP socket..") | |
61 | cls._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
62 | cls._sock.connect(("127.0.0.1", cls._dnsDistPort)) | |
63 | ||
64 | @classmethod | |
65 | def setUpClass(cls): | |
66 | ||
67 | cls.startResponders() | |
68 | cls.startDNSDist() | |
69 | cls.setUpSockets() | |
70 | ||
71 | print("Launching tests..") | |
72 | ||
73 | @classmethod | |
74 | def tearDownClass(cls): | |
75 | if cls._dnsdist: | |
76 | cls._dnsdist.terminate() | |
77 | cls._dnsdist.wait() | |
78 | ||
79 | @classmethod | |
80 | def UDPResponder(cls): | |
81 | sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
82 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) | |
83 | sock.bind(("127.0.0.1", cls._testServerPort)) | |
84 | while True: | |
85 | data, addr = sock.recvfrom(4096) | |
86 | request = dns.message.from_wire(data) | |
87 | if len(request.question) != 1: | |
88 | print("Skipping query with question count %d" % (len(request.question))) | |
89 | continue | |
90 | if str(request.question[0].name).endswith('tests.powerdns.com.') and not cls._toResponderQueue.empty(): | |
91 | response = cls._toResponderQueue.get() | |
92 | response.id = request.id | |
93 | cls._fromResponderQueue.put(request) | |
94 | else: | |
95 | # unexpected query, or health check | |
96 | response = dns.message.make_response(request) | |
97 | rrset = dns.rrset.from_text(request.question[0].name, | |
98 | 3600, | |
99 | request.question[0].rdclass, | |
100 | request.question[0].rdtype, | |
101 | '127.0.0.1') | |
102 | response.answer.append(rrset) | |
103 | ||
104 | sock.sendto(response.to_wire(), addr) | |
105 | sock.close() | |
106 | ||
107 | @classmethod | |
108 | def TCPResponder(cls): | |
109 | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
110 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) | |
111 | try: | |
112 | sock.bind(("127.0.0.1", cls._testServerPort)) | |
113 | except socket.error as e: | |
114 | print("Error binding in the TCP responder: %s" % str(e)) | |
115 | sys.exit(1) | |
116 | ||
117 | sock.listen(100) | |
118 | while True: | |
119 | (conn, address) = sock.accept() | |
120 | data = conn.recv(2) | |
121 | (datalen,) = struct.unpack("!H", data) | |
122 | data = conn.recv(datalen) | |
123 | request = dns.message.from_wire(data) | |
124 | if len(request.question) != 1: | |
125 | print("Skipping query with question count %d" % (len(request.question))) | |
126 | continue | |
127 | if str(request.question[0].name).endswith('tests.powerdns.com.') and not cls._toResponderQueue.empty(): | |
128 | response = cls._toResponderQueue.get() | |
129 | response.id = request.id | |
130 | cls._fromResponderQueue.put(request) | |
131 | else: | |
132 | # unexpected query, or health check | |
133 | response = dns.message.make_response(request) | |
134 | rrset = dns.rrset.from_text(request.question[0].name, | |
135 | 3600, | |
136 | request.question[0].rdclass, | |
137 | request.question[0].rdtype, | |
138 | '127.0.0.1') | |
139 | response.answer.append(rrset) | |
140 | ||
141 | wire = response.to_wire() | |
142 | conn.send(struct.pack("!H", len(wire))) | |
143 | conn.send(wire) | |
144 | conn.close() | |
145 | sock.close() | |
146 | ||
147 | @classmethod | |
148 | def sendUDPQuery(cls, query, response, useQueue=True, timeout=2.0): | |
149 | if useQueue: | |
150 | cls._toResponderQueue.put(response) | |
151 | ||
152 | if timeout: | |
153 | cls._sock.settimeout(timeout) | |
154 | ||
155 | try: | |
156 | cls._sock.send(query.to_wire()) | |
157 | data = cls._sock.recv(4096) | |
158 | except socket.timeout as e: | |
159 | data = None | |
160 | finally: | |
161 | if timeout: | |
162 | cls._sock.settimeout(None) | |
163 | ||
164 | receivedQuery = None | |
165 | message = None | |
166 | if useQueue and not cls._fromResponderQueue.empty(): | |
167 | receivedQuery = cls._fromResponderQueue.get(query) | |
168 | if data: | |
169 | message = dns.message.from_wire(data) | |
170 | return (receivedQuery, message) | |
171 | ||
172 | @classmethod | |
173 | def sendTCPQuery(cls, query, response, useQueue=True, timeout=2.0): | |
174 | if useQueue: | |
175 | cls._toResponderQueue.put(response) | |
176 | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
177 | sock.connect(("127.0.0.1", cls._dnsDistPort)) | |
178 | ||
179 | if timeout: | |
180 | sock.settimeout(timeout) | |
181 | ||
182 | try: | |
183 | wire = query.to_wire() | |
184 | sock.send(struct.pack("!H", len(wire))) | |
185 | sock.send(wire) | |
186 | data = sock.recv(2) | |
187 | if data: | |
188 | (datalen,) = struct.unpack("!H", data) | |
189 | data = sock.recv(datalen) | |
190 | except socket.timeout as e: | |
191 | print("Timeout: %s" % (str(e))) | |
192 | data = None | |
193 | except socket.error as e: | |
194 | print("Network error: %s" % (str(e))) | |
195 | data = None | |
196 | finally: | |
197 | sock.close() | |
198 | ||
199 | receivedQuery = None | |
200 | message = None | |
201 | if useQueue and not cls._fromResponderQueue.empty(): | |
202 | receivedQuery = cls._fromResponderQueue.get(query) | |
203 | if data: | |
204 | message = dns.message.from_wire(data) | |
205 | return (receivedQuery, message) |