]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.recursor-dnssec/test_KeepOpenTCP.py
rec: dnspython's API changed wrt NSID, apply (version dependent) fix in regression...
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_KeepOpenTCP.py
CommitLineData
9487e89b
O
1import dns
2import os
3import socket
4import struct
5
6from recursortests import RecursorTest
7
8class testKeepOpenTCP(RecursorTest):
9 _confdir = 'KeepOpenTCP'
10
11 _config_template = """dnssec=validate
12packetcache-ttl=10
13packetcache-servfail-ttl=10
14auth-zones=authzone.example=configs/%s/authzone.zone""" % _confdir
15
16 @classmethod
17 def generateRecursorConfig(cls, confdir):
18 authzonepath = os.path.join(confdir, 'authzone.zone')
19 with open(authzonepath, 'w') as authzone:
20 authzone.write("""$ORIGIN authzone.example.
21@ 3600 IN SOA {soa}
22@ 3600 IN A 192.0.2.88
23""".format(soa=cls._SOA))
24 super(testKeepOpenTCP, cls).generateRecursorConfig(confdir)
25
26 def sendTCPQueryKeepOpen(cls, sock, query, timeout=2.0):
27 try:
28 wire = query.to_wire()
29 sock.send(struct.pack("!H", len(wire)))
30 sock.send(wire)
31 data = sock.recv(2)
32 if data:
33 (datalen,) = struct.unpack("!H", data)
34 data = sock.recv(datalen)
35 except socket.timeout as e:
36 print("Timeout: %s" % (str(e)))
37 data = None
38 except socket.error as e:
39 print("Network error: %s" % (str(e)))
40 data = None
41
42 message = None
43 if data:
44 message = dns.message.from_wire(data)
45 return message
46
47 def testNoTrailingData(self):
48 count = 10
49 sock = [None] * count
50 expected = dns.rrset.from_text('ns.secure.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.9'.format(prefix=self._PREFIX))
51 query = dns.message.make_query('ns.secure.example', 'A', want_dnssec=True)
52 query.flags |= dns.flags.AD
53 for i in range(count):
54 sock[i] = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
55 sock[i].settimeout(2.0)
56 sock[i].connect(("127.0.0.1", self._recursorPort))
57
58 res = self.sendTCPQueryKeepOpen(sock[i], query)
59 self.assertMessageIsAuthenticated(res)
60 self.assertRRsetInAnswer(res, expected)
61 self.assertMatchingRRSIGInAnswer(res, expected)
62 sock[i].settimeout(0.1)
63 try:
64 data = sock[i].recv(1)
65 self.assertTrue(False)
66 except socket.timeout as e:
67 print("ok")
68
69 for i in range(count):
70 sock[i].settimeout(2.0)
71 res = self.sendTCPQueryKeepOpen(sock[i], query)
72 self.assertMessageIsAuthenticated(res)
73 self.assertRRsetInAnswer(res, expected)
74 self.assertMatchingRRSIGInAnswer(res, expected)
75 sock[i].settimeout(0.1)
76 try:
77 data = sock[i].recv(1)
78 self.assertTrue(False)
79 except socket.timeout as e:
80 print("ok")
81 for i in range(count):
82 sock[i].close()
83