return (receivedQuery, message)
@classmethod
- def openTCPConnection(cls, timeout=None, port=None):
+ def openTCPConnection(cls, timeout=2.0, port=None):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if timeout:
return sock
@classmethod
- def openTLSConnection(cls, port, serverName, caCert=None, timeout=None, alpn=[]):
+ def openTLSConnection(cls, port, serverName, caCert=None, timeout=2.0, alpn=[]):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if timeout:
return message
@classmethod
- def sendDOTQuery(cls, port, serverName, query, response, caFile, useQueue=True):
- conn = cls.openTLSConnection(port, serverName, caFile)
- cls.sendTCPQueryOverConnection(conn, query, response=response)
+ def sendDOTQuery(cls, port, serverName, query, response, caFile, useQueue=True, timeout=None):
+ conn = cls.openTLSConnection(port, serverName, caFile, timeout=timeout)
+ cls.sendTCPQueryOverConnection(conn, query, response=response, timeout=timeout)
if useQueue:
- return cls.recvTCPResponseOverConnection(conn, useQueue=useQueue)
- return None, cls.recvTCPResponseOverConnection(conn, useQueue=useQueue)
+ return cls.recvTCPResponseOverConnection(conn, useQueue=useQueue, timeout=timeout)
+ return None, cls.recvTCPResponseOverConnection(conn, useQueue=useQueue, timeout=timeout)
@classmethod
def sendTCPQuery(cls, query, response, useQueue=True, timeout=2.0, rawQuery=False):
conn.setopt(pycurl.HTTPHEADER, ["Content-type: application/dns-message",
"Accept: application/dns-message"])
+ if timeout:
+ conn.setopt(pycurl.TIMEOUT_MS, int(timeout*1000))
+
return conn
@classmethod
cls._response_headers = response_headers.getvalue()
return (receivedQuery, message)
- def sendDOHQueryWrapper(self, query, response, useQueue=True):
- return self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert, useQueue=useQueue)
+ def sendDOHQueryWrapper(self, query, response, useQueue=True, timeout=2):
+ return self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert, useQueue=useQueue, timeout=timeout)
- def sendDOHWithNGHTTP2QueryWrapper(self, query, response, useQueue=True):
- return self.sendDOHQuery(self._dohWithNGHTTP2ServerPort, self._serverName, self._dohWithNGHTTP2BaseURL, query, response=response, caFile=self._caCert, useQueue=useQueue)
+ def sendDOHWithNGHTTP2QueryWrapper(self, query, response, useQueue=True, timeout=2):
+ return self.sendDOHQuery(self._dohWithNGHTTP2ServerPort, self._serverName, self._dohWithNGHTTP2BaseURL, query, response=response, caFile=self._caCert, useQueue=useQueue, timeout=timeout)
- def sendDOHWithH2OQueryWrapper(self, query, response, useQueue=True):
- return self.sendDOHQuery(self._dohWithH2OServerPort, self._serverName, self._dohWithH2OBaseURL, query, response=response, caFile=self._caCert, useQueue=useQueue)
+ def sendDOHWithH2OQueryWrapper(self, query, response, useQueue=True, timeout=2):
+ return self.sendDOHQuery(self._dohWithH2OServerPort, self._serverName, self._dohWithH2OBaseURL, query, response=response, caFile=self._caCert, useQueue=useQueue, timeout=timeout)
- def sendDOTQueryWrapper(self, query, response, useQueue=True):
- return self.sendDOTQuery(self._tlsServerPort, self._serverName, query, response, self._caCert, useQueue=useQueue)
+ def sendDOTQueryWrapper(self, query, response, useQueue=True, timeout=2):
+ return self.sendDOTQuery(self._tlsServerPort, self._serverName, query, response, self._caCert, useQueue=useQueue, timeout=timeout)
- def sendDOQQueryWrapper(self, query, response, useQueue=True):
- return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName)
+ def sendDOQQueryWrapper(self, query, response, useQueue=True, timeout=2):
+ return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, timeout=timeout)
- def sendDOH3QueryWrapper(self, query, response, useQueue=True):
- return self.sendDOH3Query(self._doh3ServerPort, self._dohBaseURL, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName)
+ def sendDOH3QueryWrapper(self, query, response, useQueue=True, timeout=2):
+ return self.sendDOH3Query(self._doh3ServerPort, self._dohBaseURL, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, timeout=timeout)
@classmethod
def getDOQConnection(cls, port, caFile=None, source=None, source_port=0):
--- /dev/null
+#!/usr/bin/env python
+import base64
+import dns
+import os
+import unittest
+import pycurl
+
+from dnsdisttests import DNSDistTest, pickAvailablePort
+
+@unittest.skipUnless('ENABLE_SUDO_TESTS' in os.environ, "sudo is not available")
+class TestSimpleEBPF(DNSDistTest):
+
+ _consoleKey = DNSDistTest.generateConsoleKey()
+ _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+
+ _serverKey = 'server.key'
+ _serverCert = 'server.chain'
+ _serverName = 'tls.tests.dnsdist.org'
+ _caCert = 'ca.pem'
+ _tlsServerPort = pickAvailablePort()
+ _dohWithNGHTTP2ServerPort = pickAvailablePort()
+ _doqServerPort = pickAvailablePort()
+ _doh3ServerPort = pickAvailablePort()
+ _dohWithNGHTTP2BaseURL = ("https://%s:%d/" % (_serverName, _dohWithNGHTTP2ServerPort))
+ _dohBaseURL = ("https://%s:%d/" % (_serverName, _doh3ServerPort))
+
+ _config_template = """
+ setKey("%s")
+ controlSocket("127.0.0.1:%s")
+ newServer{address="127.0.0.1:%d"}
+
+ bpf = newBPFFilter({ipv4MaxItems=10, ipv6MaxItems=10, qnamesMaxItems=10})
+ setDefaultBPFFilter(bpf)
+ bpf:blockQName(newDNSName("blocked.ebpf.tests.powerdns.com."), 255)
+
+ addTLSLocal("127.0.0.1:%d", "%s", "%s", { provider="openssl" })
+ addDOHLocal("127.0.0.1:%d", "%s", "%s", {"/"}, {library="nghttp2"})
+ addDOQLocal("127.0.0.1:%d", "%s", "%s")
+ addDOH3Local("127.0.0.1:%d", "%s", "%s")
+
+ """
+ _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey', '_dohWithNGHTTP2ServerPort', '_serverCert', '_serverKey', '_doqServerPort', '_serverCert', '_serverKey', '_doh3ServerPort', '_serverCert', '_serverKey']
+ _sudoMode = True
+
+ def testNotBlocked(self):
+ # unblock 127.0.0.1, just in case
+ self.sendConsoleCommand('bpf:unblock(newCA("127.0.0.1"))')
+
+ name = 'simplea.ebpf.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+ for method in ["sendUDPQuery", "sendTCPQuery", "sendDOTQueryWrapper", "sendDOHWithNGHTTP2QueryWrapper", "sendDOQQueryWrapper", "sendDOH3QueryWrapper"]:
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response, timeout=1)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ if method == 'sendDOQQueryWrapper':
+ # dnspython sets the ID to 0
+ receivedResponse.id = response.id
+ self.assertEqual(response, receivedResponse)
+
+ def testQNameBlocked(self):
+ # unblock 127.0.0.1, just in case
+ self.sendConsoleCommand('bpf:unblock(newCA("127.0.0.1"))')
+
+ name = 'blocked.ebpf.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+ # should be blocked over Do53 UDP
+ for method in ["sendUDPQuery"]:
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(query, response=None, useQueue=False, timeout=0.5)
+ self.assertEqual(receivedResponse, None)
+
+ # not over over protocols
+ for method in ["sendTCPQuery", "sendDOTQueryWrapper", "sendDOHWithNGHTTP2QueryWrapper", "sendDOQQueryWrapper", "sendDOH3QueryWrapper"]:
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response, timeout=1)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ if method == 'sendDOQQueryWrapper':
+ # dnspython sets the ID to 0
+ receivedResponse.id = response.id
+ self.assertEqual(response, receivedResponse)
+
+ def testClientIPBlocked(self):
+ # block 127.0.0.1
+ self.sendConsoleCommand('bpf:block(newCA("127.0.0.1"))')
+
+ name = 'ip-blocked.ebpf.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+ # should be blocked over Do53 UDP, Do53 TCP, DoH
+ for method in ["sendUDPQuery", "sendTCPQuery", "sendDOTQueryWrapper", "sendDOHWithNGHTTP2QueryWrapper"]:
+ sender = getattr(self, method)
+ try:
+ (_, receivedResponse) = sender(query, response=None, useQueue=False, timeout=0.5)
+ self.assertEqual(receivedResponse, None)
+ except TimeoutError:
+ pass
+ except pycurl.error:
+ pass
+
+ # not over over QUIC-based protocols
+ for method in ["sendDOQQueryWrapper", "sendDOH3QueryWrapper"]:
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(query, response, timeout=1)
+ receivedQuery.id = query.id
+ self.assertEqual(query, receivedQuery)
+ if method == 'sendDOQQueryWrapper':
+ # dnspython sets the ID to 0
+ receivedResponse.id = response.id
+ self.assertEqual(response, receivedResponse)
+
+ # unblock 127.0.0.1
+ self.sendConsoleCommand('bpf:unblock(newCA("127.0.0.1"))')