return f'dnsdistsettings::{rust_type}'
-def get_rust_struct_fields_from_definition(name, keys, default_functions, indent_spaces, special_serde_object=False):
+def get_rust_struct_fields_from_definition(name, keys, default_functions, indent, special_serde_object=False):
if not 'parameters' in keys:
return ''
output = ''
- indent = ' '*indent_spaces
for parameter in keys['parameters']:
parameter_name = get_rust_field_name(parameter['name']) if not 'rename' in parameter else parameter['rename']
rust_type = parameter['type']
'''
indent_spaces += 4
indent = ' '*indent_spaces
- output += get_rust_struct_fields_from_definition(name, keys, default_functions, indent_spaces, special_serde_object=special_serde_object)
+ output += get_rust_struct_fields_from_definition(name, keys, default_functions, indent, special_serde_object=special_serde_object)
output += ' }\n'
if special_serde_object or not 'skip-serde' in keys or not keys['skip-serde']:
default_functions.append(write_rust_default_trait_impl(f'{obj_name}Configuration{name_suffix}', special_serde_object))
action_buffer += f'{indent}#[serde(default, skip_serializing_if = "crate::is_default")]\n'
action_buffer += f'{indent}name: String,\n'
- action_buffer += get_rust_struct_fields_from_definition(struct_name, action, default_functions, 8)
+ action_buffer += get_rust_struct_fields_from_definition(struct_name, action, default_functions, indent)
action_buffer += ' }\n\n'
selector_buffer += f'{indent}#[serde(default, skip_serializing_if = "crate::is_default")]\n'
selector_buffer += f'{indent}name: String,\n'
- selector_buffer += get_rust_struct_fields_from_definition(struct_name, selector, default_functions, 8)
+ selector_buffer += get_rust_struct_fields_from_definition(struct_name, selector, default_functions, indent)
selector_buffer += ' }\n\n'
def generateNewCertificateAndKey(filePrefix):
# generate and sign a new cert
cmd = ['openssl', 'req', '-new', '-newkey', 'rsa:2048', '-nodes', '-keyout', filePrefix + '.key', '-out', filePrefix + '.csr', '-config', 'configServer.conf']
- output = None
try:
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
- output = process.communicate(input='')
+ process.communicate(input='')
except subprocess.CalledProcessError as exc:
raise AssertionError('openssl req failed (%d): %s' % (exc.returncode, exc.output))
cmd = ['openssl', 'x509', '-req', '-days', '1', '-CA', 'ca.pem', '-CAkey', 'ca.key', '-CAcreateserial', '-in', filePrefix + '.csr', '-out', filePrefix + '.pem', '-extfile', 'configServer.conf', '-extensions', 'v3_req']
- output = None
try:
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
- output = process.communicate(input='')
+ process.communicate(input='')
except subprocess.CalledProcessError as exc:
raise AssertionError('openssl x509 failed (%d): %s' % (exc.returncode, exc.output))
outFile.write(inFile.read())
cmd = ['openssl', 'pkcs12', '-export', '-passout', 'pass:passw0rd', '-clcerts', '-in', filePrefix + '.pem', '-CAfile', 'ca.pem', '-inkey', filePrefix + '.key', '-out', filePrefix + '.p12']
- output = None
try:
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
- output = process.communicate(input='')
+ process.communicate(input='')
except subprocess.CalledProcessError as exc:
raise AssertionError('openssl pkcs12 failed (%d): %s' % (exc.returncode, exc.output))
import async_timeout
from collections import deque
-from typing import BinaryIO, Callable, Deque, Dict, List, Optional, Union, cast
+from typing import BinaryIO, Callable, Deque, Dict, List, Optional, Tuple, Union, cast
from urllib.parse import urlparse
import aioquic
include: bool,
output_dir: Optional[str],
additional_headers: Optional[Dict] = None,
-) -> None:
+) -> Tuple[str, Dict[str, str]]:
# perform request
- start = time.time()
if data is not None:
headers = copy.deepcopy(additional_headers) if additional_headers else {}
headers["content-length"] = str(len(data))
data=data,
headers=headers,
)
- method = "POST"
else:
http_events = await client.get(url, headers=additional_headers)
- method = "GET"
- elapsed = time.time() - start
result = bytes()
headers = {}
post: bool,
create_protocol=HttpClient,
additional_headers: Optional[Dict] = None,
-) -> None:
+) -> Union[Tuple[str, Dict[str, str]], Tuple[asyncio.TimeoutError, Dict[str, str]]]:
url = baseurl
if not post:
"""
name = 'drop.doq.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
- dropped = False
try:
(_, receivedResponse) = self.sendQUICQuery(query, response=None, useQueue=False)
self.assertTrue(False)
"""
name = 'no-backend.doq.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
- dropped = False
try:
(_, receivedResponse) = self.sendQUICQuery(query, response=None, useQueue=False)
self.assertTrue(False)
configFile = self._APIWriteDir + '/' + 'acl.conf'
self.assertTrue(os.path.isfile(configFile))
- fileContent = None
with open(configFile, 'rt') as f:
header = f.readline()
body = f.readline()
"""
Advanced: Drop empty queries
"""
- name = 'drop-empty-queries.advanced.tests.powerdns.com.'
query = dns.message.Message()
for method in ("sendUDPQuery", "sendTCPQuery"):
"""
Basic: NotImp on empty queries
"""
- name = 'notimp-empty-queries.basic.tests.powerdns.com.'
query = dns.message.Message()
response = dns.message.make_response(query)
client.refreshResolverCertificates()
cert = client.getResolverCertificate()
self.assertTrue(cert)
- secondSerial = cert.serial
self.assertGreater(cert.serial, serials[client])
name = 'automatic-rotation.dnscrypt.tests.powerdns.com.'
"""
DOH: Empty GET query
"""
- name = 'empty-get.doh.tests.powerdns.com.'
url = self._dohBaseURL
conn = self.openDOHConnection(self._dohServerPort, self._caCert, timeout=2.0)
conn.setopt(pycurl.URL, url)
conn.setopt(pycurl.SSL_VERIFYPEER, 1)
conn.setopt(pycurl.SSL_VERIFYHOST, 2)
conn.setopt(pycurl.CAINFO, self._caCert)
- data = conn.perform_rb()
+ conn.perform_rb()
rcode = conn.getinfo(pycurl.RESPONSE_CODE)
self.assertEqual(rcode, 400)
"""
if self._dohLibrary == 'h2o':
raise unittest.SkipTest('h2o tries to parse the qname early, so this check will fail')
- name = 'zero-qdcount.doh.tests.powerdns.com.'
query = dns.message.Message()
query.id = 0
query.flags &= ~dns.flags.RD
"""
DOH: Short path in GET query
"""
- name = 'short-path-get.doh.tests.powerdns.com.'
url = self._dohBaseURL + '/AA'
conn = self.openDOHConnection(self._dohServerPort, self._caCert, timeout=2.0)
conn.setopt(pycurl.URL, url)
conn.setopt(pycurl.SSL_VERIFYPEER, 1)
conn.setopt(pycurl.SSL_VERIFYHOST, 2)
conn.setopt(pycurl.CAINFO, self._caCert)
- data = conn.perform_rb()
+ conn.perform_rb()
rcode = conn.getinfo(pycurl.RESPONSE_CODE)
self.assertEqual(rcode, 404)
conn.setopt(pycurl.SSL_VERIFYPEER, 1)
conn.setopt(pycurl.SSL_VERIFYHOST, 2)
conn.setopt(pycurl.CAINFO, self._caCert)
- data = conn.perform_rb()
+ conn.perform_rb()
rcode = conn.getinfo(pycurl.RESPONSE_CODE)
self.assertEqual(rcode, 400)
"""
name = 'invalid-b64-get.doh.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
- wire = query.to_wire()
url = self._dohBaseURL + '?dns=' + '_-~~~~-_'
conn = self.openDOHConnection(self._dohServerPort, self._caCert, timeout=2.0)
conn.setopt(pycurl.URL, url)
conn.setopt(pycurl.SSL_VERIFYPEER, 1)
conn.setopt(pycurl.SSL_VERIFYHOST, 2)
conn.setopt(pycurl.CAINFO, self._caCert)
- data = conn.perform_rb()
+ conn.perform_rb()
rcode = conn.getinfo(pycurl.RESPONSE_CODE)
self.assertEqual(rcode, 400)
conn.setopt(pycurl.SSL_VERIFYPEER, 1)
conn.setopt(pycurl.SSL_VERIFYHOST, 2)
conn.setopt(pycurl.CAINFO, self._caCert)
- data = conn.perform_rb()
+ conn.perform_rb()
rcode = conn.getinfo(pycurl.RESPONSE_CODE)
self.assertEqual(rcode, 400)
conn.setopt(pycurl.SSL_VERIFYHOST, 2)
conn.setopt(pycurl.CAINFO, self._caCert)
conn.setopt(pycurl.CUSTOMREQUEST, 'PATCH')
- data = conn.perform_rb()
+ conn.perform_rb()
rcode = conn.getinfo(pycurl.RESPONSE_CODE)
self.assertEqual(rcode, 400)
conn.setopt(pycurl.SSL_VERIFYPEER, 1)
conn.setopt(pycurl.SSL_VERIFYHOST, 2)
conn.setopt(pycurl.CAINFO, self._caCert)
- data = conn.perform_rb()
+ conn.perform_rb()
rcode = conn.getinfo(pycurl.RESPONSE_CODE)
self.assertEqual(rcode, 403)
"""
DoH Frontend Limits: Maximum number of conns per DoH frontend
"""
- name = 'maxconnsperfrontend.doh.tests.powerdns.com.'
query = b"GET / HTTP/1.0\r\n\r\n"
conns = []
failed = 0
for conn in conns:
try:
- data = conn.perform_rb()
- rcode = conn.getinfo(pycurl.RESPONSE_CODE)
+ conn.perform_rb()
+ conn.getinfo(pycurl.RESPONSE_CODE)
count = count + 1
except:
failed = failed + 1
name = 'xfr.doh.tests.powerdns.com.'
for xfrType in [dns.rdatatype.AXFR, dns.rdatatype.IXFR]:
query = dns.message.make_query(name, xfrType, 'IN')
- url = self.getDOHGetURL(self._dohBaseURL, query)
expectedResponse = dns.message.make_response(query)
expectedResponse.set_rcode(dns.rcode.NOTIMP)
expectedQuery.id = 0
try:
- message = quic_bogus_query(query, '127.0.0.1', 2.0, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
+ quic_bogus_query(query, '127.0.0.1', 2.0, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
self.assertFalse(True)
except doqclient.StreamResetError as e :
self.assertEqual(e.error, 2);
while not self._protobufQueue.empty():
msg = self.getFirstProtobufMessage()
count = count + 1
- pbMessageType = dnsmessage_pb2.PBDNSMessage.TCP
self.checkProtobufResponse(msg, dnsmessage_pb2.PBDNSMessage.TCP, responses[count-1])
expected = responses[count-1].answer[0]
toProxyQueue.put(response, True, 2.0)
- wire = query.to_wire()
-
reverseProxyPort = pickAvailablePort()
reverseProxy = threading.Thread(name='Mock Proxy Protocol Reverse Proxy', target=MockTCPReverseProxyAddingProxyProtocol, args=[reverseProxyPort, self._dohServerPPOutsidePort])
reverseProxy.start()
toProxyQueue.put(response, True, 2.0)
- wire = query.to_wire()
-
reverseProxyPort = pickAvailablePort()
tlsContext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
tlsContext.load_cert_chain(self._serverCert, self._serverKey)
toProxyQueue.put(response, True, 2.0)
- wire = query.to_wire()
-
reverseProxyPort = pickAvailablePort()
reverseProxy = threading.Thread(name='Mock Proxy Protocol Reverse Proxy', target=MockTCPReverseProxyAddingProxyProtocol, args=[reverseProxyPort, self._dotServerPPOutsidePort])
reverseProxy.start()
toProxyQueue.put(response, True, 2.0)
- wire = query.to_wire()
-
reverseProxyPort = pickAvailablePort()
tlsContext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
tlsContext.load_cert_chain(self._serverCert, self._serverKey)
query = dns.message.make_query(name, 'A', 'IN')
# dnsdist set RA = RD for spoofed responses
query.flags &= ~dns.flags.RD
- response = dns.message.make_response(query)
expectedResponse = dns.message.make_response(query)
rrset = dns.rrset.from_text(name,
"""
Routing: Round Robin with all servers down
"""
- numberOfQueries = 10
name = 'alldown.rr.routing.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
response = dns.message.make_response(query)
self.assertEqual(query, receivedQuery)
self.assertEqual(response, receivedResponse)
- total = 0
if 'UDP Responder' in self._responsesCounter:
self.assertEqual(self._responsesCounter['UDP Responder'], 0)
self.assertEqual(self._responsesCounter['UDP Responder 2'], numberOfQueries)
numQueriesPerConn = 4
conns = []
- start = time.time()
for idx in range(numConns):
conns.append(self.openTCPConnection())
"""
TCP Limits: Maximum duration
"""
- name = 'duration.tcp.tests.powerdns.com.'
start = time.time()
conn = self.openTCPConnection()