-from contextlib import contextmanager
-import random
import socket
import pytest
-from kresd import Kresd
-
-
-def is_port_free(port, ip=None, ip6=None):
- def check(family, type_, dest):
- sock = socket.socket(family, type_)
- sock.bind(dest)
- sock.close()
-
- try:
- if ip is not None:
- check(socket.AF_INET, socket.SOCK_STREAM, (ip, port))
- check(socket.AF_INET, socket.SOCK_DGRAM, (ip, port))
- if ip6 is not None:
- check(socket.AF_INET6, socket.SOCK_STREAM, (ip6, port, 0, 0))
- check(socket.AF_INET6, socket.SOCK_DGRAM, (ip6, port, 0, 0))
- except OSError as exc:
- if exc.errno == 98: # address alrady in use
- return False
- else:
- raise
- return True
-
-
-@contextmanager
-def make_kresd(workdir, certname=None):
- ip = '127.0.0.1'
- ip6 = '::1'
-
- def make_port():
- for _ in range(10): # max attempts
- port = random.randint(1024, 65535)
- if is_port_free(port, ip, ip6):
- return port
- raise RuntimeError("No available port found!")
-
- port = make_port()
- tls_port = make_port()
- with Kresd(workdir, port, tls_port, ip, ip6, certname) as kresd:
- yield kresd
- # TODO: add verbose option?
- # with open(kresd.logfile_path) as log:
- # print(log.read()) # display log for debugging purposes
+from kresd import make_kresd
@pytest.fixture
-from contextlib import ContextDecorator
+from contextlib import ContextDecorator, contextmanager
import os
+import random
import re
import socket
import subprocess
def ip6_tls_socket(self):
return self._tls_socket_with_retry(socket.AF_INET6)
+
+
+def is_port_free(port, ip=None, ip6=None):
+ def check(family, type_, dest):
+ sock = socket.socket(family, type_)
+ sock.bind(dest)
+ sock.close()
+
+ try:
+ if ip is not None:
+ check(socket.AF_INET, socket.SOCK_STREAM, (ip, port))
+ check(socket.AF_INET, socket.SOCK_DGRAM, (ip, port))
+ if ip6 is not None:
+ check(socket.AF_INET6, socket.SOCK_STREAM, (ip6, port, 0, 0))
+ check(socket.AF_INET6, socket.SOCK_DGRAM, (ip6, port, 0, 0))
+ except OSError as exc:
+ if exc.errno == 98: # address alrady in use
+ return False
+ else:
+ raise
+ return True
+
+
+def make_port(ip=None, ip6=None):
+ for _ in range(10): # max attempts
+ port = random.randint(1024, 65535)
+ if is_port_free(port, ip, ip6):
+ return port
+ raise RuntimeError("No available port found!")
+
+
+@contextmanager
+def make_kresd(workdir, certname=None, ip='127.0.0.1', ip6='::1'):
+ port = make_port(ip, ip6)
+ tls_port = make_port(ip, ip6)
+ with Kresd(workdir, port, tls_port, ip, ip6, certname) as kresd:
+ yield kresd
+ # TODO: add verbose option?
+ # with open(kresd.logfile_path) as log:
+ # print(log.read()) # display log for debugging purposes
"""TLS-specific tests"""
+import itertools
+import os
+from socket import AF_INET, AF_INET6
import ssl
+import sys
import pytest
+from kresd import make_kresd
import utils
with pytest.raises(ssl.SSLError):
ssock.connect(dest)
+
+
+@pytest.mark.skipif(sys.version_info < (3, 6),
+ reason="requires python3.6 or higher")
+@pytest.mark.parametrize('sf1, sf2, sf3', itertools.product(
+ [AF_INET, AF_INET6], [AF_INET, AF_INET6], [AF_INET, AF_INET6]))
+def test_tls_session_resumption(tmpdir, sf1, sf2, sf3):
+ """Attempt TLS session resumption against the same kresd instance and a different one."""
+ # TODO ensure that session can't be resumed after session ticket key regeneration
+ # at the first kresd instance
+
+ def connect(kresd, ctx, sf, session=None):
+ sock, dest = kresd.stream_socket(sf, tls=True)
+ ssock = ctx.wrap_socket(
+ sock, server_hostname='transport-test-server.com', session=session)
+ ssock.connect(dest)
+ new_session = ssock.session
+ assert new_session.has_ticket
+ assert ssock.session_reused == (session is not None)
+ utils.ping_alive(ssock)
+ ssock.close()
+ return new_session
+
+ workdir = os.path.join(str(tmpdir), 'kresd')
+ os.makedirs(workdir)
+
+ with make_kresd(workdir, 'tt') as kresd:
+ ctx = utils.make_ssl_context(verify_location=kresd.tls_cert_path)
+ session = connect(kresd, ctx, sf1) # initial conn
+ connect(kresd, ctx, sf2, session) # resume session on the same instance
+
+ workdir2 = os.path.join(str(tmpdir), 'kresd2')
+ os.makedirs(workdir2)
+ with make_kresd(workdir2, 'tt') as kresd2:
+ connect(kresd2, ctx, sf3, session) # resume session on a different instance