]>
Commit | Line | Data |
---|---|---|
fc7f729f OM |
1 | import dns |
2 | import dnsmessage_pb2 | |
3 | import os | |
4 | import socket | |
5 | import struct | |
6 | import sys | |
7 | import threading | |
8 | import time | |
9 | ||
10 | import dns | |
11 | import dnstap_pb2 | |
12 | ||
13 | FSTRM_CONTROL_ACCEPT = 0x01 | |
14 | FSTRM_CONTROL_START = 0x02 | |
15 | FSTRM_CONTROL_STOP = 0x03 | |
16 | FSTRM_CONTROL_READY = 0x04 | |
17 | FSTRM_CONTROL_FINISH = 0x05 | |
18 | ||
19 | # Python2/3 compatibility hacks | |
20 | try: | |
21 | from queue import Queue | |
22 | except ImportError: | |
23 | from Queue import Queue | |
24 | ||
25 | try: | |
26 | range = xrange | |
27 | except NameError: | |
28 | pass | |
29 | ||
30 | from nose import SkipTest | |
31 | from recursortests import RecursorTest | |
32 | ||
33 | def checkDnstapBase(testinstance, dnstap, protocol, initiator): | |
34 | testinstance.assertTrue(dnstap) | |
35 | testinstance.assertTrue(dnstap.HasField('identity')) | |
36 | #testinstance.assertEqual(dnstap.identity, b'a.server') | |
37 | testinstance.assertTrue(dnstap.HasField('version')) | |
38 | #testinstance.assertIn(b'dnsdist ', dnstap.version) | |
39 | testinstance.assertTrue(dnstap.HasField('type')) | |
40 | testinstance.assertEqual(dnstap.type, dnstap.MESSAGE) | |
41 | testinstance.assertTrue(dnstap.HasField('message')) | |
42 | testinstance.assertTrue(dnstap.message.HasField('socket_protocol')) | |
43 | testinstance.assertEqual(dnstap.message.socket_protocol, protocol) | |
44 | testinstance.assertTrue(dnstap.message.HasField('socket_family')) | |
45 | testinstance.assertEquals(dnstap.message.socket_family, dnstap_pb2.INET) | |
3f9f84c6 OM |
46 | # |
47 | # We cannot check the query address and port since we only log outgoing queries via dnstap | |
48 | # | |
fc7f729f OM |
49 | #testinstance.assertTrue(dnstap.message.HasField('query_address')) |
50 | #testinstance.assertEquals(socket.inet_ntop(socket.AF_INET, dnstap.message.query_address), initiator) | |
51 | testinstance.assertTrue(dnstap.message.HasField('response_address')) | |
52 | testinstance.assertEquals(socket.inet_ntop(socket.AF_INET, dnstap.message.response_address), initiator) | |
53 | testinstance.assertTrue(dnstap.message.HasField('response_port')) | |
54 | testinstance.assertEquals(dnstap.message.response_port, 53) | |
55 | ||
56 | ||
57 | def checkDnstapQuery(testinstance, dnstap, protocol, query, initiator='127.0.0.1'): | |
58 | testinstance.assertEquals(dnstap.message.type, dnstap_pb2.Message.RESOLVER_QUERY) | |
59 | checkDnstapBase(testinstance, dnstap, protocol, initiator) | |
60 | ||
61 | testinstance.assertTrue(dnstap.message.HasField('query_time_sec')) | |
62 | testinstance.assertTrue(dnstap.message.HasField('query_time_nsec')) | |
63 | ||
64 | testinstance.assertTrue(dnstap.message.HasField('query_message')) | |
3f9f84c6 OM |
65 | # |
66 | # We cannot compare the incoming query with the outgoing one | |
67 | # The IDs and some other fields will be different | |
68 | # | |
fc7f729f OM |
69 | wire_message = dns.message.from_wire(dnstap.message.query_message) |
70 | #testinstance.assertEqual(wire_message, query) | |
71 | ||
72 | ||
73 | def checkDnstapExtra(testinstance, dnstap, expected): | |
74 | testinstance.assertTrue(dnstap.HasField('extra')) | |
75 | testinstance.assertEqual(dnstap.extra, expected) | |
76 | ||
77 | ||
78 | def checkDnstapNoExtra(testinstance, dnstap): | |
79 | testinstance.assertFalse(dnstap.HasField('extra')) | |
80 | ||
81 | ||
82 | def checkDnstapResponse(testinstance, dnstap, protocol, response, initiator='127.0.0.1'): | |
83 | testinstance.assertEquals(dnstap.message.type, dnstap_pb2.Message.RESOLVER_RESPONSE) | |
84 | checkDnstapBase(testinstance, dnstap, protocol, initiator) | |
85 | ||
86 | testinstance.assertTrue(dnstap.message.HasField('query_time_sec')) | |
87 | testinstance.assertTrue(dnstap.message.HasField('query_time_nsec')) | |
88 | ||
89 | testinstance.assertTrue(dnstap.message.HasField('response_time_sec')) | |
90 | testinstance.assertTrue(dnstap.message.HasField('response_time_nsec')) | |
91 | ||
92 | testinstance.assertTrue(dnstap.message.response_time_sec > dnstap.message.query_time_sec or \ | |
93 | dnstap.message.response_time_nsec > dnstap.message.query_time_nsec) | |
94 | ||
95 | testinstance.assertTrue(dnstap.message.HasField('response_message')) | |
96 | wire_message = dns.message.from_wire(dnstap.message.response_message) | |
97 | testinstance.assertEqual(wire_message, response) | |
98 | ||
99 | def fstrm_get_control_frame_type(data): | |
100 | (t,) = struct.unpack("!L", data[0:4]) | |
101 | return t | |
102 | ||
103 | ||
104 | def fstrm_make_control_frame_reply(cft, data): | |
105 | if cft == FSTRM_CONTROL_READY: | |
106 | # Reply with ACCEPT frame and content-type | |
107 | contenttype = b'protobuf:dnstap.Dnstap' | |
108 | frame = struct.pack('!LLL', FSTRM_CONTROL_ACCEPT, 1, | |
109 | len(contenttype)) + contenttype | |
110 | buf = struct.pack("!LL", 0, len(frame)) + frame | |
111 | return buf | |
112 | elif cft == FSTRM_CONTROL_START: | |
113 | return None | |
114 | else: | |
115 | raise Exception('unhandled control frame ' + cft) | |
116 | ||
117 | ||
118 | def fstrm_read_and_dispatch_control_frame(conn): | |
119 | data = conn.recv(4) | |
120 | if not data: | |
121 | raise Exception('length of control frame payload could not be read') | |
122 | (datalen,) = struct.unpack("!L", data) | |
123 | data = conn.recv(datalen) | |
124 | cft = fstrm_get_control_frame_type(data) | |
125 | reply = fstrm_make_control_frame_reply(cft, data) | |
126 | if reply: | |
127 | conn.send(reply) | |
128 | return cft | |
129 | ||
130 | ||
131 | def fstrm_handle_bidir_connection(conn, on_data): | |
132 | data = None | |
133 | while True: | |
134 | data = conn.recv(4) | |
135 | if not data: | |
136 | break | |
137 | (datalen,) = struct.unpack("!L", data) | |
138 | if datalen == 0: | |
139 | # control frame length follows | |
140 | cft = fstrm_read_and_dispatch_control_frame(conn) | |
141 | if cft == FSTRM_CONTROL_STOP: | |
142 | break | |
143 | else: | |
144 | # data frame | |
145 | data = conn.recv(datalen) | |
146 | if not data: | |
147 | break | |
148 | ||
149 | on_data(data) | |
150 | ||
151 | ||
152 | ||
153 | class DNSTapServerParams: | |
154 | def __init__(self, port): | |
155 | self.queue = Queue() | |
156 | self.port = port | |
157 | ||
158 | ||
159 | DNSTapServerParameters = DNSTapServerParams(4243) | |
160 | DNSTapListeners = [] | |
161 | ||
162 | class TestRecursorDNSTap(RecursorTest): | |
163 | @classmethod | |
164 | def FrameStreamUnixListener(cls, conn, param): | |
165 | while True: | |
166 | try: | |
167 | fstrm_handle_bidir_connection(conn, lambda data: \ | |
168 | param.queue.put(data, True, timeout=2.0)) | |
169 | except socket.error as e: | |
170 | if e.errno == 9: | |
171 | break | |
172 | printf("Unexpected socket error %d", e) | |
173 | sys.exit(1) | |
174 | conn.close() | |
175 | ||
176 | @classmethod | |
177 | def FrameStreamUnixListenerMain(cls, param): | |
178 | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
179 | sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) | |
180 | try: | |
181 | sock.bind(("127.0.0.1", param.port)) | |
182 | except socket.error as e: | |
183 | print("Error binding in the framestream listener: %s" % str(e)) | |
184 | sys.exit(1) | |
185 | DNSTapListeners.append(sock) | |
186 | sock.listen(100) | |
187 | while True: | |
188 | (conn, _) = sock.accept() | |
189 | print("Accepting connection") | |
190 | listener = threading.Thread(name='DNSTap Worker', target=cls.FrameStreamUnixListener, args=[conn, param]) | |
191 | listener.setDaemon(True) | |
192 | listener.start() | |
193 | ||
194 | sock.close() | |
195 | ||
196 | @classmethod | |
197 | def setUpClass(cls): | |
198 | ||
199 | if os.environ.get("NODNSTAPTESTS") == "1": | |
200 | raise SkipTest("Not Yet Supported") | |
201 | ||
202 | cls.setUpSockets() | |
203 | ||
204 | cls.startResponders() | |
205 | ||
206 | listener = threading.Thread(name='DNSTap Listener', target=cls.FrameStreamUnixListenerMain, args=[DNSTapServerParameters]) | |
207 | listener.setDaemon(True) | |
208 | listener.start() | |
209 | ||
210 | ||
211 | confdir = os.path.join('configs', cls._confdir) | |
212 | cls.createConfigDir(confdir) | |
213 | ||
214 | cls.generateRecursorConfig(confdir) | |
215 | cls.startRecursor(confdir, cls._recursorPort) | |
216 | ||
217 | def setUp(self): | |
218 | # Make sure the queue is empty, in case | |
219 | # a previous test failed | |
220 | while not DNSTapServerParameters.queue.empty(): | |
221 | DNSTapServerParameters.queue.get(False) | |
222 | ||
223 | @classmethod | |
224 | def generateRecursorConfig(cls, confdir): | |
225 | authzonepath = os.path.join(confdir, 'example.zone') | |
226 | with open(authzonepath, 'w') as authzone: | |
227 | authzone.write("""$ORIGIN example. | |
228 | @ 3600 IN SOA {soa} | |
229 | a 3600 IN A 192.0.2.42 | |
230 | tagged 3600 IN A 192.0.2.84 | |
231 | query-selected 3600 IN A 192.0.2.84 | |
232 | answer-selected 3600 IN A 192.0.2.84 | |
233 | types 3600 IN A 192.0.2.84 | |
234 | types 3600 IN AAAA 2001:DB8::1 | |
235 | types 3600 IN TXT "Lorem ipsum dolor sit amet" | |
236 | types 3600 IN MX 10 a.example. | |
237 | types 3600 IN SPF "v=spf1 -all" | |
238 | types 3600 IN SRV 10 20 443 a.example. | |
239 | cname 3600 IN CNAME a.example. | |
240 | ||
241 | """.format(soa=cls._SOA)) | |
242 | super(TestRecursorDNSTap, cls).generateRecursorConfig(confdir) | |
243 | ||
244 | @classmethod | |
245 | def tearDownClass(cls): | |
246 | cls.tearDownRecursor() | |
247 | for listerner in DNSTapListeners: | |
248 | listerner.close() | |
249 | ||
250 | class DNSTapDefaultTest(TestRecursorDNSTap): | |
251 | """ | |
252 | This test makes sure that we correctly export outgoing queries over DNSTap. | |
253 | It must be improved and setup env so we can check for incoming responses, but makes sure for now | |
254 | that the recursor at least connects to the DNSTap server. | |
255 | """ | |
256 | ||
257 | _confdir = 'DNSTapDefault' | |
258 | _config_template = """ | |
259 | auth-zones=example=configs/%s/example.zone""" % _confdir | |
260 | _lua_config_file = """ | |
261 | dnstapFrameStreamServer({"127.0.0.1:%d"}) | |
262 | """ % (DNSTapServerParameters.port) | |
263 | ||
264 | def getFirstDnstap(self): | |
265 | data = DNSTapServerParameters.queue.get(True, timeout=2.0) | |
266 | self.assertTrue(data) | |
267 | dnstap = dnstap_pb2.Dnstap() | |
268 | dnstap.ParseFromString(data) | |
269 | return dnstap | |
270 | ||
271 | def testA(self): | |
272 | ||
273 | name = 'www.example.org.' | |
274 | query = dns.message.make_query(name, 'A', want_dnssec=True) | |
275 | query.flags |= dns.flags.RD | |
276 | res = self.sendUDPQuery(query) | |
277 | ||
278 | # check the DNSTap messages corresponding to the UDP query and answer | |
279 | # check the dnstap message corresponding to the UDP query | |
280 | dnstap = self.getFirstDnstap() | |
281 | ||
282 | checkDnstapQuery(self, dnstap, dnstap_pb2.UDP, query, '127.0.0.8') | |
283 | # We don't expect a response | |
284 | checkDnstapNoExtra(self, dnstap) | |
285 | ||
286 | class DNSTapLogNoQueriesTest(TestRecursorDNSTap): | |
287 | """ | |
288 | This test makes sure that we correctly export outgoing queries over DNSTap. | |
289 | It must be improved and setup env so we can check for incoming responses, but makes sure for now | |
290 | that the recursor at least connects to the DNSTap server. | |
291 | """ | |
292 | ||
293 | _confdir = 'DNSTapLogNoQueries' | |
294 | _config_template = """ | |
295 | auth-zones=example=configs/%s/example.zone""" % _confdir | |
296 | _lua_config_file = """ | |
297 | dnstapFrameStreamServer({"127.0.0.1:%d"}, {logQueries=false}) | |
298 | """ % (DNSTapServerParameters.port) | |
299 | ||
300 | def testA(self): | |
301 | name = 'www.example.org.' | |
302 | query = dns.message.make_query(name, 'A', want_dnssec=True) | |
303 | query.flags |= dns.flags.RD | |
304 | res = self.sendUDPQuery(query) | |
305 | ||
306 | # We don't expect anything | |
307 | self.assertTrue(DNSTapServerParameters.queue.empty()) | |
308 |