From: Tomas Krizek Date: Fri, 16 Nov 2018 11:35:21 +0000 (+0100) Subject: pytests: import test_tls_session_resumption (test18) X-Git-Tag: v3.2.0~18^2~27 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4c42eea6db86116eb1eda7adae26944746cd018a;p=thirdparty%2Fknot-resolver.git pytests: import test_tls_session_resumption (test18) --- diff --git a/tests/pytests/conftest.py b/tests/pytests/conftest.py index 2681a42f7..7beed5050 100644 --- a/tests/pytests/conftest.py +++ b/tests/pytests/conftest.py @@ -1,52 +1,8 @@ -from contextlib import contextmanager -import random import socket import pytest -from kresd import Kresd - - -def is_port_free(port, ip=None, ip6=None): - def check(family, type_, dest): - sock = socket.socket(family, type_) - sock.bind(dest) - sock.close() - - try: - if ip is not None: - check(socket.AF_INET, socket.SOCK_STREAM, (ip, port)) - check(socket.AF_INET, socket.SOCK_DGRAM, (ip, port)) - if ip6 is not None: - check(socket.AF_INET6, socket.SOCK_STREAM, (ip6, port, 0, 0)) - check(socket.AF_INET6, socket.SOCK_DGRAM, (ip6, port, 0, 0)) - except OSError as exc: - if exc.errno == 98: # address alrady in use - return False - else: - raise - return True - - -@contextmanager -def make_kresd(workdir, certname=None): - ip = '127.0.0.1' - ip6 = '::1' - - def make_port(): - for _ in range(10): # max attempts - port = random.randint(1024, 65535) - if is_port_free(port, ip, ip6): - return port - raise RuntimeError("No available port found!") - - port = make_port() - tls_port = make_port() - with Kresd(workdir, port, tls_port, ip, ip6, certname) as kresd: - yield kresd - # TODO: add verbose option? - # with open(kresd.logfile_path) as log: - # print(log.read()) # display log for debugging purposes +from kresd import make_kresd @pytest.fixture diff --git a/tests/pytests/kresd.py b/tests/pytests/kresd.py index 96c854eb4..1b737ef73 100644 --- a/tests/pytests/kresd.py +++ b/tests/pytests/kresd.py @@ -1,5 +1,6 @@ -from contextlib import ContextDecorator +from contextlib import ContextDecorator, contextmanager import os +import random import re import socket import subprocess @@ -170,3 +171,43 @@ class Kresd(ContextDecorator): def ip6_tls_socket(self): return self._tls_socket_with_retry(socket.AF_INET6) + + +def is_port_free(port, ip=None, ip6=None): + def check(family, type_, dest): + sock = socket.socket(family, type_) + sock.bind(dest) + sock.close() + + try: + if ip is not None: + check(socket.AF_INET, socket.SOCK_STREAM, (ip, port)) + check(socket.AF_INET, socket.SOCK_DGRAM, (ip, port)) + if ip6 is not None: + check(socket.AF_INET6, socket.SOCK_STREAM, (ip6, port, 0, 0)) + check(socket.AF_INET6, socket.SOCK_DGRAM, (ip6, port, 0, 0)) + except OSError as exc: + if exc.errno == 98: # address alrady in use + return False + else: + raise + return True + + +def make_port(ip=None, ip6=None): + for _ in range(10): # max attempts + port = random.randint(1024, 65535) + if is_port_free(port, ip, ip6): + return port + raise RuntimeError("No available port found!") + + +@contextmanager +def make_kresd(workdir, certname=None, ip='127.0.0.1', ip6='::1'): + port = make_port(ip, ip6) + tls_port = make_port(ip, ip6) + with Kresd(workdir, port, tls_port, ip, ip6, certname) as kresd: + yield kresd + # TODO: add verbose option? + # with open(kresd.logfile_path) as log: + # print(log.read()) # display log for debugging purposes diff --git a/tests/pytests/test_tls.py b/tests/pytests/test_tls.py index fd73cc0d1..a32156b2d 100644 --- a/tests/pytests/test_tls.py +++ b/tests/pytests/test_tls.py @@ -1,9 +1,14 @@ """TLS-specific tests""" +import itertools +import os +from socket import AF_INET, AF_INET6 import ssl +import sys import pytest +from kresd import make_kresd import utils @@ -45,3 +50,38 @@ def test_tls_cert_expired(kresd_tt_expired, sock_family): with pytest.raises(ssl.SSLError): ssock.connect(dest) + + +@pytest.mark.skipif(sys.version_info < (3, 6), + reason="requires python3.6 or higher") +@pytest.mark.parametrize('sf1, sf2, sf3', itertools.product( + [AF_INET, AF_INET6], [AF_INET, AF_INET6], [AF_INET, AF_INET6])) +def test_tls_session_resumption(tmpdir, sf1, sf2, sf3): + """Attempt TLS session resumption against the same kresd instance and a different one.""" + # TODO ensure that session can't be resumed after session ticket key regeneration + # at the first kresd instance + + def connect(kresd, ctx, sf, session=None): + sock, dest = kresd.stream_socket(sf, tls=True) + ssock = ctx.wrap_socket( + sock, server_hostname='transport-test-server.com', session=session) + ssock.connect(dest) + new_session = ssock.session + assert new_session.has_ticket + assert ssock.session_reused == (session is not None) + utils.ping_alive(ssock) + ssock.close() + return new_session + + workdir = os.path.join(str(tmpdir), 'kresd') + os.makedirs(workdir) + + with make_kresd(workdir, 'tt') as kresd: + ctx = utils.make_ssl_context(verify_location=kresd.tls_cert_path) + session = connect(kresd, ctx, sf1) # initial conn + connect(kresd, ctx, sf2, session) # resume session on the same instance + + workdir2 = os.path.join(str(tmpdir), 'kresd2') + os.makedirs(workdir2) + with make_kresd(workdir2, 'tt') as kresd2: + connect(kresd2, ctx, sf3, session) # resume session on a different instance