]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_TLS.py
3 from dnsdisttests
import DNSDistTest
5 class TestTLS(DNSDistTest
):
7 _serverKey
= 'server.key'
8 _serverCert
= 'server.chain'
9 _serverName
= 'tls.tests.dnsdist.org'
12 _config_template
= """
13 newServer{address="127.0.0.1:%s"}
14 addTLSLocal("127.0.0.1:%s", "%s", "%s")
15 addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
17 _config_params
= ['_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
19 def testTLSSimple(self
):
23 name
= 'single.tls.tests.powerdns.com.'
24 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
25 response
= dns
.message
.make_response(query
)
26 rrset
= dns
.rrset
.from_text(name
,
31 response
.answer
.append(rrset
)
33 conn
= self
.openTLSConnection(self
._tlsServerPort
, self
._serverName
, self
._caCert
)
35 self
.sendTCPQueryOverConnection(conn
, query
, response
=response
)
36 (receivedQuery
, receivedResponse
) = self
.recvTCPResponseOverConnection(conn
, useQueue
=True)
37 self
.assertTrue(receivedQuery
)
38 self
.assertTrue(receivedResponse
)
39 receivedQuery
.id = query
.id
40 self
.assertEquals(query
, receivedQuery
)
41 self
.assertEquals(response
, receivedResponse
)
45 TLS: Several queries over the same connection
47 name
= 'ka.tls.tests.powerdns.com.'
48 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
49 response
= dns
.message
.make_response(query
)
50 rrset
= dns
.rrset
.from_text(name
,
55 response
.answer
.append(rrset
)
57 conn
= self
.openTLSConnection(self
._tlsServerPort
, self
._serverName
, self
._caCert
)
60 self
.sendTCPQueryOverConnection(conn
, query
, response
=response
)
61 (receivedQuery
, receivedResponse
) = self
.recvTCPResponseOverConnection(conn
, useQueue
=True)
62 self
.assertTrue(receivedQuery
)
63 self
.assertTrue(receivedResponse
)
64 receivedQuery
.id = query
.id
65 self
.assertEquals(query
, receivedQuery
)
66 self
.assertEquals(response
, receivedResponse
)
68 def testTLSPipelining(self
):
70 TLS: Several queries over the same connection without waiting for the responses
72 name
= 'pipelining.tls.tests.powerdns.com.'
73 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
74 response
= dns
.message
.make_response(query
)
75 rrset
= dns
.rrset
.from_text(name
,
80 response
.answer
.append(rrset
)
82 conn
= self
.openTLSConnection(self
._tlsServerPort
, self
._serverName
, self
._caCert
)
84 for idx
in range(100):
85 self
.sendTCPQueryOverConnection(conn
, query
, response
=response
)
87 for idx
in range(100):
88 (receivedQuery
, receivedResponse
) = self
.recvTCPResponseOverConnection(conn
, useQueue
=True)
89 self
.assertTrue(receivedQuery
)
90 self
.assertTrue(receivedResponse
)
91 receivedQuery
.id = query
.id
92 self
.assertEquals(query
, receivedQuery
)
93 self
.assertEquals(response
, receivedResponse
)
95 def testTLSSNIRouting(self
):
99 name
= 'sni.tls.tests.powerdns.com.'
100 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
101 query
.flags
&= ~dns
.flags
.RD
102 response
= dns
.message
.make_response(query
)
103 rrset
= dns
.rrset
.from_text(name
,
108 response
.answer
.append(rrset
)
109 expectedResponse
= dns
.message
.make_response(query
)
110 rrset
= dns
.rrset
.from_text(name
,
115 expectedResponse
.answer
.append(rrset
)
117 # this SNI should match so we should get a spoofed answer
118 conn
= self
.openTLSConnection(self
._tlsServerPort
, 'powerdns.com', self
._caCert
)
120 self
.sendTCPQueryOverConnection(conn
, query
, response
=None)
121 receivedResponse
= self
.recvTCPResponseOverConnection(conn
, useQueue
=False)
122 self
.assertTrue(receivedResponse
)
123 self
.assertEquals(expectedResponse
, receivedResponse
)
125 # this one should not
126 conn
= self
.openTLSConnection(self
._tlsServerPort
, self
._serverName
, self
._caCert
)
128 self
.sendTCPQueryOverConnection(conn
, query
, response
=response
)
129 (receivedQuery
, receivedResponse
) = self
.recvTCPResponseOverConnection(conn
, useQueue
=True)
130 self
.assertTrue(receivedQuery
)
131 self
.assertTrue(receivedResponse
)
132 receivedQuery
.id = query
.id
133 self
.assertEquals(query
, receivedQuery
)
134 self
.assertEquals(response
, receivedResponse
)