]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
pytests: import test_tls_session_resumption (test18)
authorTomas Krizek <tomas.krizek@nic.cz>
Fri, 16 Nov 2018 11:35:21 +0000 (12:35 +0100)
committerTomas Krizek <tomas.krizek@nic.cz>
Tue, 4 Dec 2018 16:13:42 +0000 (17:13 +0100)
tests/pytests/conftest.py
tests/pytests/kresd.py
tests/pytests/test_tls.py

index 2681a42f731381f86a77d96ec81bf9d2d71b2fce..7beed5050885d2aa3b036902d34b4488fe855f43 100644 (file)
@@ -1,52 +1,8 @@
-from contextlib import contextmanager
-import random
 import socket
 
 import pytest
 
-from kresd import Kresd
-
-
-def is_port_free(port, ip=None, ip6=None):
-    def check(family, type_, dest):
-        sock = socket.socket(family, type_)
-        sock.bind(dest)
-        sock.close()
-
-    try:
-        if ip is not None:
-            check(socket.AF_INET, socket.SOCK_STREAM, (ip, port))
-            check(socket.AF_INET, socket.SOCK_DGRAM, (ip, port))
-        if ip6 is not None:
-            check(socket.AF_INET6, socket.SOCK_STREAM, (ip6, port, 0, 0))
-            check(socket.AF_INET6, socket.SOCK_DGRAM, (ip6, port, 0, 0))
-    except OSError as exc:
-        if exc.errno == 98:  # address alrady in use
-            return False
-        else:
-            raise
-    return True
-
-
-@contextmanager
-def make_kresd(workdir, certname=None):
-    ip = '127.0.0.1'
-    ip6 = '::1'
-
-    def make_port():
-        for _ in range(10):  # max attempts
-            port = random.randint(1024, 65535)
-            if is_port_free(port, ip, ip6):
-                return port
-        raise RuntimeError("No available port found!")
-
-    port = make_port()
-    tls_port = make_port()
-    with Kresd(workdir, port, tls_port, ip, ip6, certname) as kresd:
-        yield kresd
-        # TODO: add verbose option?
-        # with open(kresd.logfile_path) as log:
-        #     print(log.read())  # display log for debugging purposes
+from kresd import make_kresd
 
 
 @pytest.fixture
index 96c854eb49400183ef4ef9e4a63aec66df2a4cf7..1b737ef73ab8adf48cafca2e6185db64f219a62d 100644 (file)
@@ -1,5 +1,6 @@
-from contextlib import ContextDecorator
+from contextlib import ContextDecorator, contextmanager
 import os
+import random
 import re
 import socket
 import subprocess
@@ -170,3 +171,43 @@ class Kresd(ContextDecorator):
 
     def ip6_tls_socket(self):
         return self._tls_socket_with_retry(socket.AF_INET6)
+
+
+def is_port_free(port, ip=None, ip6=None):
+    def check(family, type_, dest):
+        sock = socket.socket(family, type_)
+        sock.bind(dest)
+        sock.close()
+
+    try:
+        if ip is not None:
+            check(socket.AF_INET, socket.SOCK_STREAM, (ip, port))
+            check(socket.AF_INET, socket.SOCK_DGRAM, (ip, port))
+        if ip6 is not None:
+            check(socket.AF_INET6, socket.SOCK_STREAM, (ip6, port, 0, 0))
+            check(socket.AF_INET6, socket.SOCK_DGRAM, (ip6, port, 0, 0))
+    except OSError as exc:
+        if exc.errno == 98:  # address alrady in use
+            return False
+        else:
+            raise
+    return True
+
+
+def make_port(ip=None, ip6=None):
+    for _ in range(10):  # max attempts
+        port = random.randint(1024, 65535)
+        if is_port_free(port, ip, ip6):
+            return port
+    raise RuntimeError("No available port found!")
+
+
+@contextmanager
+def make_kresd(workdir, certname=None, ip='127.0.0.1', ip6='::1'):
+    port = make_port(ip, ip6)
+    tls_port = make_port(ip, ip6)
+    with Kresd(workdir, port, tls_port, ip, ip6, certname) as kresd:
+        yield kresd
+        # TODO: add verbose option?
+        # with open(kresd.logfile_path) as log:
+        #     print(log.read())  # display log for debugging purposes
index fd73cc0d1062b9d2cc46e6fe29ebc6cda70337d9..a32156b2de2baa3d6f9543d9acd9839686011c19 100644 (file)
@@ -1,9 +1,14 @@
 """TLS-specific tests"""
 
+import itertools
+import os
+from socket import AF_INET, AF_INET6
 import ssl
+import sys
 
 import pytest
 
+from kresd import make_kresd
 import utils
 
 
@@ -45,3 +50,38 @@ def test_tls_cert_expired(kresd_tt_expired, sock_family):
 
     with pytest.raises(ssl.SSLError):
         ssock.connect(dest)
+
+
+@pytest.mark.skipif(sys.version_info < (3, 6),
+                    reason="requires python3.6 or higher")
+@pytest.mark.parametrize('sf1, sf2, sf3', itertools.product(
+    [AF_INET, AF_INET6], [AF_INET, AF_INET6], [AF_INET, AF_INET6]))
+def test_tls_session_resumption(tmpdir, sf1, sf2, sf3):
+    """Attempt TLS session resumption against the same kresd instance and a different one."""
+    # TODO ensure that session can't be resumed after session ticket key regeneration
+    # at the first kresd instance
+
+    def connect(kresd, ctx, sf, session=None):
+        sock, dest = kresd.stream_socket(sf, tls=True)
+        ssock = ctx.wrap_socket(
+            sock, server_hostname='transport-test-server.com', session=session)
+        ssock.connect(dest)
+        new_session = ssock.session
+        assert new_session.has_ticket
+        assert ssock.session_reused == (session is not None)
+        utils.ping_alive(ssock)
+        ssock.close()
+        return new_session
+
+    workdir = os.path.join(str(tmpdir), 'kresd')
+    os.makedirs(workdir)
+
+    with make_kresd(workdir, 'tt') as kresd:
+        ctx = utils.make_ssl_context(verify_location=kresd.tls_cert_path)
+        session = connect(kresd, ctx, sf1)  # initial conn
+        connect(kresd, ctx, sf2, session)  # resume session on the same instance
+
+    workdir2 = os.path.join(str(tmpdir), 'kresd2')
+    os.makedirs(workdir2)
+    with make_kresd(workdir2, 'tt') as kresd2:
+        connect(kresd2, ctx, sf3, session)  # resume session on a different instance