import dns
import dns.message
+from proxyprotocol import ProxyProtocol
+
from eqdnsmessage import AssertEqualDNSMessageMixin
msg.flags += dns.flags.from_text('RD')
msg.use_edns(edns=0, ednsflags=dns.flags.edns_from_text(ednsflags))
return msg
+
+ @classmethod
+ def sendUDPQueryWithProxyProtocol(cls, query, v6, source, destination, sourcePort, destinationPort, values=[], timeout=2.0):
+ queryPayload = query.to_wire()
+ ppPayload = ProxyProtocol.getPayload(False, False, v6, source, destination, sourcePort, destinationPort, values)
+ payload = ppPayload + queryPayload
+
+ if timeout:
+ cls._sock.settimeout(timeout)
+
+ try:
+ cls._sock.send(payload)
+ data = cls._sock.recv(4096)
+ except socket.timeout:
+ data = None
+ finally:
+ if timeout:
+ cls._sock.settimeout(None)
+
+ message = None
+ if data:
+ message = dns.message.from_wire(data)
+ return message
+
+ @classmethod
+ def sendTCPQueryWithProxyProtocol(cls, query, v6, source, destination, sourcePort, destinationPort, values=[], timeout=2.0):
+ queryPayload = query.to_wire()
+ ppPayload = ProxyProtocol.getPayload(False, False, v6, source, destination, sourcePort, destinationPort, values)
+
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ if timeout:
+ sock.settimeout(timeout)
+
+ sock.connect(("127.0.0.1", cls._recursorPort))
+
+ try:
+ sock.send(ppPayload)
+ sock.send(struct.pack("!H", len(queryPayload)))
+ sock.send(queryPayload)
+ data = sock.recv(2)
+ if data:
+ (datalen,) = struct.unpack("!H", data)
+ data = sock.recv(datalen)
+ except socket.timeout as e:
+ print("Timeout: %s" % (str(e)))
+ data = None
+ except socket.error as e:
+ print("Network error: %s" % (str(e)))
+ data = None
+ finally:
+ sock.close()
+
+ message = None
+ if data:
+ message = dns.message.from_wire(data)
+ return message
query = dns.message.make_query(nameECS, 'TXT', 'IN', use_edns=True, options=[ecso], payload=512)
self.sendECSQuery(query, expected)
+class testECSWithProxyProtocoldRecursorTest(ECSTest):
+ _confdir = 'ECSWithProxyProtocol'
+ _config_template = """
+ ecs-add-for=2001:db8::1/128
+ edns-subnet-allow-list=ecs-echo.example.
+ forward-zones=ecs-echo.example=%s.21
+ proxy-protocol-from=127.0.0.1/32
+ allow-from=2001:db8::1/128
+""" % (os.environ['PREFIX'])
+
+ def testProxyProtocolPlusECS(self):
+ qname = nameECS
+ expected = dns.rrset.from_text(qname, 0, dns.rdataclass.IN, 'TXT', '2001:db8::/56')
+
+ query = dns.message.make_query(qname, 'TXT', use_edns=True)
+ for method in ("sendUDPQueryWithProxyProtocol", "sendTCPQueryWithProxyProtocol"):
+ sender = getattr(self, method)
+ res = sender(query, True, '2001:db8::1', '2001:db8::2', 0, 65535)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertRRsetInAnswer(res, expected)
+
class testTooLargeToAddZeroScope(RecursorTest):
_confdir = 'TooLargeToAddZeroScope'
pass
from recursortests import RecursorTest
-from proxyprotocol import ProxyProtocol
class ProxyProtocolRecursorTest(RecursorTest):
def tearDownClass(cls):
cls.tearDownRecursor()
- @classmethod
- def sendUDPQueryWithProxyProtocol(cls, query, v6, source, destination, sourcePort, destinationPort, values=[], timeout=2.0):
- queryPayload = query.to_wire()
- ppPayload = ProxyProtocol.getPayload(False, False, v6, source, destination, sourcePort, destinationPort, values)
- payload = ppPayload + queryPayload
-
- if timeout:
- cls._sock.settimeout(timeout)
-
- try:
- cls._sock.send(payload)
- data = cls._sock.recv(4096)
- except socket.timeout:
- data = None
- finally:
- if timeout:
- cls._sock.settimeout(None)
-
- message = None
- if data:
- message = dns.message.from_wire(data)
- return message
-
- @classmethod
- def sendTCPQueryWithProxyProtocol(cls, query, v6, source, destination, sourcePort, destinationPort, values=[], timeout=2.0):
- queryPayload = query.to_wire()
- ppPayload = ProxyProtocol.getPayload(False, False, v6, source, destination, sourcePort, destinationPort, values)
-
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- if timeout:
- sock.settimeout(timeout)
-
- sock.connect(("127.0.0.1", cls._recursorPort))
-
- try:
- sock.send(ppPayload)
- sock.send(struct.pack("!H", len(queryPayload)))
- sock.send(queryPayload)
- data = sock.recv(2)
- if data:
- (datalen,) = struct.unpack("!H", data)
- data = sock.recv(datalen)
- except socket.timeout as e:
- print("Timeout: %s" % (str(e)))
- data = None
- except socket.error as e:
- print("Network error: %s" % (str(e)))
- data = None
- finally:
- sock.close()
-
- message = None
- if data:
- message = dns.message.from_wire(data)
- return message
-
class ProxyProtocolAllowedRecursorTest(ProxyProtocolRecursorTest):
_confdir = 'ProxyProtocol'
_lua_dns_script_file = """