+import clientsubnetoption
import dns
import os
import socket
launch=bind
any-to-tcp=no
proxy-protocol-from=127.0.0.1
+edns-subnet-processing=yes
"""
_zones = {
ns1.example.org. 3600 IN A {prefix}.10
ns2.example.org. 3600 IN A {prefix}.11
-myip.example.org. 3600 IN LUA A "who:toString()"
+myip.example.org. 3600 IN LUA TXT "who:toString()..'/'..bestwho:toString()"
"""
}
See if LUA who picks up the inner address from the PROXY protocol
"""
- # first test with an unproxied query - should get ignored
- query = dns.message.make_query('myip.example.org', 'A')
+ for testWithECS in True, False:
+ # first test with an unproxied query - should get ignored
- res = self.sendUDPQuery(query)
+ options = []
+ expectedText = '192.0.2.1/192.0.2.1'
- self.assertEqual(res, None) # query was ignored correctly
+ if testWithECS:
+ ecso = clientsubnetoption.ClientSubnetOption('192.0.2.5', 32)
+ options.append(ecso)
+ expectedText = '192.0.2.1/192.0.2.5'
+ query = dns.message.make_query('myip.example.org', 'TXT', 'IN', use_edns=testWithECS, options=options, payload=512)
- # now send a proxied query
- queryPayload = query.to_wire()
- ppPayload = ProxyProtocol.getPayload(False, False, False, "192.0.2.1", "10.1.2.3", 12345, 53, [])
- payload = ppPayload + queryPayload
+ res = self.sendUDPQuery(query)
- # UDP
- self._sock.settimeout(2.0)
+ self.assertEqual(res, None) # query was ignored correctly
- try:
- self._sock.send(payload)
- data = self._sock.recv(4096)
- except socket.timeout:
- data = None
- finally:
- self._sock.settimeout(None)
-
- res = None
- if data:
- res = dns.message.from_wire(data)
-
- expected = [dns.rrset.from_text('myip.example.org.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')]
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertEqual(res.answer, expected)
-
- # TCP
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(2.0)
- sock.connect(("127.0.0.1", self._authPort))
- try:
- sock.send(ppPayload)
- sock.send(struct.pack("!H", len(queryPayload)))
- sock.send(queryPayload)
- data = sock.recv(2)
+ # now send a proxied query
+ queryPayload = query.to_wire()
+ ppPayload = ProxyProtocol.getPayload(False, False, False, "192.0.2.1", "10.1.2.3", 12345, 53, [])
+ payload = ppPayload + queryPayload
+
+ # UDP
+ self._sock.settimeout(2.0)
+
+ try:
+ self._sock.send(payload)
+ data = self._sock.recv(4096)
+ except socket.timeout:
+ data = None
+ finally:
+ self._sock.settimeout(None)
+
+ res = None
if data:
- (datalen,) = struct.unpack("!H", data)
- data = sock.recv(datalen)
- except socket.timeout as e:
- print("Timeout: %s" % (str(e)))
- data = None
- except socket.error as e:
- print("Network error: %s" % (str(e)))
- data = None
- finally:
- sock.close()
-
- res = None
- if data:
- res = dns.message.from_wire(data)
-
- self.assertRcodeEqual(res, dns.rcode.NOERROR)
- self.assertEqual(res.answer, expected)
+ res = dns.message.from_wire(data)
+
+ expected = [dns.rrset.from_text('myip.example.org.', 0, dns.rdataclass.IN, 'TXT', expectedText)]
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertEqual(res.answer, expected)
+
+ # TCP
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.settimeout(2.0)
+ sock.connect(("127.0.0.1", self._authPort))
+
+ try:
+ sock.send(ppPayload)
+ sock.send(struct.pack("!H", len(queryPayload)))
+ sock.send(queryPayload)
+ data = sock.recv(2)
+ if data:
+ (datalen,) = struct.unpack("!H", data)
+ data = sock.recv(datalen)
+ except socket.timeout as e:
+ print("Timeout: %s" % (str(e)))
+ data = None
+ except socket.error as e:
+ print("Network error: %s" % (str(e)))
+ data = None
+ finally:
+ sock.close()
+
+ res = None
+ if data:
+ res = dns.message.from_wire(data)
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertEqual(res.answer, expected)
class TestProxyProtocolNOTIFY(AuthTest):
_config_template = """