def handshakes(self, proto: str) -> Dict[str, Any]:
props = {}
sample_size = 5
- self.info(f'TLS Handshake\n')
+ self.info('TLS Handshake\n')
for authority in [
'curl.se', 'google.com', 'cloudflare.com', 'nghttp2.org'
]:
samples = []
errors = []
profiles = []
- self.info(f'single...')
+ self.info('single...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
errors = []
profiles = []
url = f'{url}?[0-{count - 1}]'
- self.info(f'serial...')
+ self.info('serial...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
profiles = []
max_parallel = self._download_parallel if self._download_parallel > 0 else count
url = f'{url}?[0-{count - 1}]'
- self.info(f'parallel...')
+ self.info('parallel...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
count=count)
props['parallel'] = self.transfer_parallel(url=url, proto=proto,
count=count)
- self.info(f'ok.\n')
+ self.info('ok.\n')
return props
def downloads(self, proto: str, count: int,
samples = []
errors = []
profiles = []
- self.info(f'single...')
+ self.info('single...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=proto,
errors = []
profiles = []
url = f'{url}?id=[0-{count - 1}]'
- self.info(f'serial...')
+ self.info('serial...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=proto,
profiles = []
max_parallel = count
url = f'{url}?id=[0-{count - 1}]'
- self.info(f'parallel...')
+ self.info('parallel...')
for i in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=proto,
fpath=fpath, count=count)
props['parallel'] = self.upload_parallel(url=url, proto=proto,
fpath=fpath, count=count)
- self.info(f'ok.\n')
+ self.info('ok.\n')
return props
def uploads(self, proto: str, count: int,
else:
samples.append(count / r.duration.total_seconds())
non_200s = 0
- for l in r.stdout.splitlines():
- if not l.startswith('200,'):
+ for line in r.stdout.splitlines():
+ if not line.startswith('200,'):
non_200s += 1
if non_200s > 0:
errors.append(f'responses != 200: {non_200s}')
for m in [1, 6, 25, 50, 100, 300]:
props[str(m)] = self.do_requests(url=url, proto=proto, count=count,
max_parallel=m)
- self.info(f'ok.\n')
+ self.info('ok.\n')
return props
def requests(self, proto: str, req_count) -> Dict[str, Any]:
print('Downloads')
print(f' {"Server":<8} {"Size":>8}', end='')
- for m in measures: print(f' {m_names[m]:>{mcol_width}} {"[cpu/rss]":<{mcol_sw}}', end='')
+ for m in measures:
+ print(f' {m_names[m]:>{mcol_width}} {"[cpu/rss]":<{mcol_sw}}', end='')
print(f' {"Errors":^20}')
for server in score['downloads']:
print('Uploads')
print(f' {"Server":<8} {"Size":>8}', end='')
- for m in measures: print(f' {m_names[m]:>{mcol_width}} {"[cpu/rss]":<{mcol_sw}}', end='')
+ for m in measures:
+ print(f' {m_names[m]:>{mcol_width}} {"[cpu/rss]":<{mcol_sw}}', end='')
print(f' {"Errors":^20}')
for server in score['uploads']:
print('Requests, max in parallel')
print(f' {"Server":<8} {"Size":>6} {"Reqs":>6}', end='')
- for m in measures: print(f' {m_names[m]:>{mcol_width}} {"[cpu/rss]":<{mcol_sw}}', end='')
+ for m in measures:
+ print(f' {m_names[m]:>{mcol_width}} {"[cpu/rss]":<{mcol_sw}}', end='')
print(f' {"Errors":^10}')
for server in score['requests']:
assert r.json['server'] == env.domain1
# simple https: GET, any http version
- @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+ @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
def test_01_02_https_get(self, env: Env, httpd):
curl = CurlClient(env=env)
url = f'https://{env.domain1}:{env.https_port}/data.json'
assert r.json['server'] == env.domain1
# simple https: GET, h2 wanted and got
- @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+ @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
def test_01_03_h2_get(self, env: Env, httpd):
curl = CurlClient(env=env)
url = f'https://{env.domain1}:{env.https_port}/data.json'
assert r.json['server'] == env.domain1
# simple https: GET, h2 unsupported, fallback to h1
- @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+ @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
def test_01_04_h2_unsupported(self, env: Env, httpd):
curl = CurlClient(env=env)
url = f'https://{env.domain2}:{env.https_port}/data.json'
assert r.json['server'] == env.domain1
# simple download, check connect/handshake timings
- @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+ @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_01_06_timings(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
# simple https: HEAD
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
- @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+ @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
def test_01_07_head(self, env: Env, httpd, nghttpx, repeat, proto):
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
# setting smaller frame sizes. This is not released yet, we
# test if it works and back out if not.
httpd.set_extra_config(env.domain1, lines=[
- f'H2MaxDataFrameLen 1024',
+ 'H2MaxDataFrameLen 1024',
])
assert httpd.stop()
if not httpd.start():
# no, not supported, bail out
httpd.set_extra_config(env.domain1, lines=None)
assert httpd.start()
- pytest.skip(f'H2MaxDataFrameLen not supported')
+ pytest.skip('H2MaxDataFrameLen not supported')
# ok, make 100 downloads with 2 parallel running and they
# are expected to stumble into the issue when using `lib/http2.c`
# from curl 7.88.0
###########################################################################
#
import logging
-import os
from typing import Tuple, List, Dict
import pytest
#
###########################################################################
#
-import json
import logging
-from typing import Optional, Tuple, List, Dict
import pytest
-from testenv import Env, CurlClient, ExecResult
+from testenv import Env, CurlClient
log = logging.getLogger(__name__)
#
###########################################################################
#
-import json
import logging
-from typing import Optional, Tuple, List, Dict
import pytest
-from testenv import Env, CurlClient, ExecResult
+from testenv import Env, CurlClient
log = logging.getLogger(__name__)
httpd.reload()
# download using only HTTP/3 on working server
- @pytest.mark.skipif(condition=not Env.have_h3(), reason=f"missing HTTP/3 support")
+ @pytest.mark.skipif(condition=not Env.have_h3(), reason="missing HTTP/3 support")
def test_06_01_h3_only(self, env: Env, httpd, nghttpx, repeat):
curl = CurlClient(env=env)
urln = f'https://{env.authority_for(env.domain1, "h3")}/data.json'
assert r.stats[0]['http_version'] == '3'
# download using only HTTP/3 on missing server
- @pytest.mark.skipif(condition=not Env.have_h3(), reason=f"missing HTTP/3 support")
+ @pytest.mark.skipif(condition=not Env.have_h3(), reason="missing HTTP/3 support")
def test_06_02_h3_only(self, env: Env, httpd, nghttpx, repeat):
nghttpx.stop_if_running()
curl = CurlClient(env=env)
r.check_response(exitcode=7, http_status=None)
# download using HTTP/3 on missing server with fallback on h2
- @pytest.mark.skipif(condition=not Env.have_h3(), reason=f"missing HTTP/3 support")
+ @pytest.mark.skipif(condition=not Env.have_h3(), reason="missing HTTP/3 support")
def test_06_03_h3_fallback_h2(self, env: Env, httpd, nghttpx, repeat):
nghttpx.stop_if_running()
curl = CurlClient(env=env)
assert r.stats[0]['http_version'] == '2'
# download using HTTP/3 on missing server with fallback on http/1.1
- @pytest.mark.skipif(condition=not Env.have_h3(), reason=f"missing HTTP/3 support")
+ @pytest.mark.skipif(condition=not Env.have_h3(), reason="missing HTTP/3 support")
def test_06_04_h3_fallback_h1(self, env: Env, httpd, nghttpx, repeat):
nghttpx.stop_if_running()
curl = CurlClient(env=env)
# make https: to an invalid address
def test_06_12_stats_fail_tcp(self, env: Env, httpd, nghttpx, repeat):
curl = CurlClient(env=env)
- urln = f'https://not-valid.com:1/data.json'
+ urln = 'https://not-valid.com:1/data.json'
r = curl.http_download(urls=[urln], extra_args=[
'--resolve', f'not-valid.com:{1}:127.0.0.1'
])
import filecmp
import logging
import os
-import time
import pytest
from typing import List
])
r.check_stats(count=1, http_status=200, exitcode=0)
- # speed limited on echo handler
- @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
- def test_07_51_echo_speed_limit(self, env: Env, httpd, nghttpx, proto, repeat):
- if proto == 'h3' and not env.have_h3():
- pytest.skip("h3 not supported")
- count = 1
- fdata = os.path.join(env.gen_dir, 'data-100k')
- speed_limit = 50 * 1024
- curl = CurlClient(env=env)
- url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-0]'
- r = curl.http_upload(urls=[url], data=f'@{fdata}', alpn_proto=proto,
- with_headers=True, extra_args=[
- '--limit-rate', f'{speed_limit}'
- ])
- r.check_response(count=count, http_status=200)
- up_speed = r.stats[0]['speed_upload']
- assert (speed_limit * 0.5) <= up_speed <= (speed_limit * 1.5), f'{r.stats[0]}'
-
def check_downloads(self, client, source: List[str], count: int,
complete: bool = True):
for i in range(count):
log = logging.getLogger(__name__)
-@pytest.mark.skipif(condition=not Env.has_caddy(), reason=f"missing caddy")
-@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+@pytest.mark.skipif(condition=not Env.has_caddy(), reason="missing caddy")
+@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
class TestCaddy:
@pytest.fixture(autouse=True, scope='class')
env.make_data_file(indir=push_dir, fname="data2", fsize=1*1024)
env.make_data_file(indir=push_dir, fname="data3", fsize=1*1024)
httpd.set_extra_config(env.domain1, [
- f'H2EarlyHints on',
- f'<Location /push/data1>',
- f' H2PushResource /push/data2',
- f'</Location>',
- f'<Location /push/data2>',
- f' H2PushResource /push/data1',
- f' H2PushResource /push/data3',
- f'</Location>',
+ 'H2EarlyHints on',
+ '<Location /push/data1>',
+ ' H2PushResource /push/data2',
+ '</Location>',
+ '<Location /push/data2>',
+ ' H2PushResource /push/data1',
+ ' H2PushResource /push/data3',
+ '</Location>',
])
# activate the new config
httpd.reload()
httpd.reload()
def get_tunnel_proto_used(self, r: ExecResult):
- for l in r.trace_lines:
- m = re.match(r'.* CONNECT tunnel: (\S+) negotiated$', l)
+ for line in r.trace_lines:
+ m = re.match(r'.* CONNECT tunnel: (\S+) negotiated$', line)
if m:
return m.group(1)
assert False, f'tunnel protocol not found in:\n{"".join(r.trace_lines)}'
protocol='HTTP/2' if proto == 'h2' else 'HTTP/1.1')
# upload via https: with proto (no tunnel)
- @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+ @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("proto", ['http/1.1', 'h2'])
@pytest.mark.parametrize("fname, fcount", [
['data.json', 5],
# download https: with proto via http: proxytunnel
@pytest.mark.parametrize("proto", ['http/1.1', 'h2'])
- @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+ @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
def test_10_05_proxytunnel_http(self, env: Env, httpd, proto, repeat):
curl = CurlClient(env=env)
url = f'https://localhost:{env.https_port}/data.json'
assert filecmp.cmp(srcfile, dfile, shallow=False)
# download many https: with proto via https: proxytunnel
- @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+ @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("proto", ['http/1.1', 'h2'])
@pytest.mark.parametrize("tunnel", ['http/1.1', 'h2'])
@pytest.mark.parametrize("fname, fcount", [
assert r.total_connects == 1, r.dump_logs()
# upload many https: with proto via https: proxytunnel
- @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+ @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("proto", ['http/1.1', 'h2'])
@pytest.mark.parametrize("tunnel", ['http/1.1', 'h2'])
@pytest.mark.parametrize("fname, fcount", [
assert respdata == indata, f'resonse {i} differs'
assert r.total_connects == 1, r.dump_logs()
- @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+ @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("tunnel", ['http/1.1', 'h2'])
@pytest.mark.skipif(condition=not Env.have_nghttpx(), reason="no nghttpx available")
def test_10_09_reuse_ser(self, env: Env, httpd, nghttpx_fwd, tunnel, repeat):
else:
assert r.total_connects == 2
- @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+ @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("tunnel", ['http/1.1', 'h2'])
@pytest.mark.skipif(condition=not Env.have_nghttpx(), reason="no nghttpx available")
def test_10_10_reuse_proxy(self, env: Env, httpd, nghttpx_fwd, tunnel, repeat):
r2.check_response(count=2, http_status=200)
assert r2.total_connects == 1
- @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+ @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("tunnel", ['http/1.1', 'h2'])
@pytest.mark.skipif(condition=not Env.have_nghttpx(), reason="no nghttpx available")
@pytest.mark.skipif(condition=not Env.curl_uses_lib('openssl'), reason="tls13-ciphers not supported")
r2.check_response(count=2, http_status=200)
assert r2.total_connects == 2
- @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+ @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("tunnel", ['http/1.1', 'h2'])
@pytest.mark.skipif(condition=not Env.have_nghttpx(), reason="no nghttpx available")
@pytest.mark.skipif(condition=not Env.curl_uses_lib('openssl'), reason="tls13-ciphers not supported")
r2.check_response(count=2, http_status=200)
assert r2.total_connects == 2
- @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+ @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
@pytest.mark.parametrize("tunnel", ['http/1.1', 'h2'])
@pytest.mark.skipif(condition=not Env.have_nghttpx(), reason="no nghttpx available")
@pytest.mark.skipif(condition=not Env.curl_uses_lib('openssl'), reason="tls13-ciphers not supported")
try:
c, client_address = self._socket.accept()
try:
- data = c.recv(16)
+ c.recv(16)
c.sendall("""HTTP/1.1 200 Ok
Server: UdsFaker
Content-Type: application/json
r.check_response(count=1, http_status=200)
# download https: via Unix socket
- @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+ @pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
def test_11_02_unix_connect_http(self, env: Env, httpd, uds_faker, repeat):
curl = CurlClient(env=env)
url = f'https://{env.domain1}:{env.https_port}/data.json'
#
###########################################################################
#
-import difflib
-import filecmp
import logging
import os
from datetime import datetime, timedelta
@pytest.mark.skipif(condition=Env.curl_uses_lib('bearssl'), reason='BearSSL too slow')
-@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason=f"curl without SSL")
+@pytest.mark.skipif(condition=not Env.have_ssl_curl(), reason="curl without SSL")
class TestReuse:
# check if HTTP/1.1 handles 'Connection: close' correctly
httpd, nghttpx, repeat, proto):
httpd.clear_extra_configs()
httpd.set_extra_config('base', [
- f'MaxKeepAliveRequests 1',
+ 'MaxKeepAliveRequests 1',
])
httpd.reload()
count = 100
assert (count/2 - delta) < r.total_connects < (count/2 + delta)
@pytest.mark.skipif(condition=Env.httpd_is_at_least('2.5.0'),
- reason=f"httpd 2.5+ handles KeepAlives different")
+ reason="httpd 2.5+ handles KeepAlives different")
@pytest.mark.parametrize("proto", ['http/1.1'])
def test_12_02_h1_conn_timeout(self, env: Env,
httpd, nghttpx, repeat, proto):
httpd.clear_extra_configs()
httpd.set_extra_config('base', [
- f'KeepAliveTimeout 1',
+ 'KeepAliveTimeout 1',
])
httpd.reload()
count = 5
#
###########################################################################
#
-import filecmp
import logging
-import os
import re
-import time
import pytest
from testenv import Env, CurlClient, ExecResult
#
###########################################################################
#
-import difflib
-import filecmp
import logging
import os
import pytest
-from testenv import Env, CurlClient, LocalClient
+from testenv import Env, CurlClient
log = logging.getLogger(__name__)
#
###########################################################################
#
-import difflib
-import filecmp
import logging
import os
-from datetime import timedelta
import pytest
-from testenv import Env, CurlClient, LocalClient, ExecResult
+from testenv import Env, CurlClient
log = logging.getLogger(__name__)
#
###########################################################################
#
-import difflib
-import filecmp
import json
import logging
import os
-from datetime import timedelta
import pytest
-from testenv import Env, CurlClient, LocalClient, ExecResult
+from testenv import Env, CurlClient
log = logging.getLogger(__name__)
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
curl = CurlClient(env=env)
- domain = f'127.0.0.1'
+ domain = '127.0.0.1'
url = f'https://{env.authority_for(domain, proto)}/curltest/sslinfo'
r = curl.http_get(url=url, alpn_proto=proto)
assert r.exit_code == 0, f'{r}'
if proto == 'h3' and not env.have_h3():
pytest.skip("h3 not supported")
curl = CurlClient(env=env)
- domain = f'localhost'
+ domain = 'localhost'
url = f'https://{env.authority_for(domain, proto)}/curltest/sslinfo'
r = curl.http_get(url=url, alpn_proto=proto)
assert r.exit_code == 0, f'{r}'
not env.curl_uses_lib('quictls'):
pytest.skip("TLS library does not support --cert-status")
curl = CurlClient(env=env)
- domain = f'localhost'
+ domain = 'localhost'
url = f'https://{env.authority_for(domain, proto)}/'
r = curl.http_get(url=url, alpn_proto=proto, extra_args=[
'--cert-status'
#
###########################################################################
#
-import difflib
-import filecmp
import logging
-import os
-from datetime import timedelta
import pytest
-from testenv import Env, CurlClient, LocalClient
+from testenv import Env, CurlClient
log = logging.getLogger(__name__)
#
###########################################################################
#
-import difflib
-import filecmp
import logging
-import os
import re
-from datetime import timedelta
import pytest
from testenv import Env, CurlClient, LocalClient
])
r.check_response(http_status=200, count=2)
assert r.tcpdump
- assert len(r.tcpdump.stats) == 0, f'Unexpected TCP RSTs packets'
+ assert len(r.tcpdump.stats) == 0, 'Unexpected TCP RSTs packets'
# run downloads where the server closes the connection after each request
@pytest.mark.parametrize("proto", ['http/1.1'])
f'id=[0-{count-1}]&with_cl&close'
r = curl.http_download(urls=[url], alpn_proto=proto)
r.check_response(http_status=200, count=count)
- shutdowns = [l for l in r.trace_lines if re.match(r'.*CCACHE\] shutdown #\d+, done=1', l)]
+ shutdowns = [line for line in r.trace_lines
+ if re.match(r'.*CCACHE\] shutdown #\d+, done=1', line)]
assert len(shutdowns) == count, f'{shutdowns}'
# run downloads with CURLOPT_FORBID_REUSE set, meaning *we* close
'-n', f'{count}', '-f', '-V', proto, url
])
r.check_exit_code(0)
- shutdowns = [l for l in r.trace_lines if re.match(r'.*CCACHE\] shutdown #\d+, done=1', l)]
+ shutdowns = [line for line in r.trace_lines
+ if re.match(r'.*CCACHE\] shutdown #\d+, done=1', line)]
assert len(shutdowns) == count, f'{shutdowns}'
# run event-based downloads with CURLOPT_FORBID_REUSE set, meaning *we* close
])
r.check_response(http_status=200, count=count)
# check that we closed all connections
- closings = [l for l in r.trace_lines if re.match(r'.*CCACHE\] closing #\d+', l)]
+ closings = [line for line in r.trace_lines
+ if re.match(r'.*CCACHE\] closing #\d+', line)]
assert len(closings) == count, f'{closings}'
# check that all connection sockets were removed from event
- removes = [l for l in r.trace_lines if re.match(r'.*socket cb: socket \d+ REMOVED', l)]
+ removes = [line for line in r.trace_lines
+ if re.match(r'.*socket cb: socket \d+ REMOVED', line)]
assert len(removes) == count, f'{removes}'
# check graceful shutdown on multiplexed http
])
r.check_response(http_status=200, count=2)
# check connection cache closings
- shutdowns = [l for l in r.trace_lines if re.match(r'.*CCACHE\] shutdown #\d+, done=1', l)]
+ shutdowns = [line for line in r.trace_lines
+ if re.match(r'.*CCACHE\] shutdown #\d+, done=1', line)]
assert len(shutdowns) == 1, f'{shutdowns}'
log = logging.getLogger(__name__)
-@pytest.mark.skipif(condition=not Env.has_vsftpd(), reason=f"missing vsftpd")
+@pytest.mark.skipif(condition=not Env.has_vsftpd(), reason="missing vsftpd")
class TestVsFTPD:
@pytest.fixture(autouse=True, scope='class')
r = curl.ftp_get(urls=[url], with_stats=True, with_tcpdump=True)
r.check_stats(count=count, http_status=226)
assert r.tcpdump
- assert len(r.tcpdump.stats) == 0, f'Unexpected TCP RSTs packets'
+ assert len(r.tcpdump.stats) == 0, 'Unexpected TCP RSTs packets'
# check with `tcpdump` if curl causes any TCP RST packets
@pytest.mark.skipif(condition=not Env.tcpdump(), reason="tcpdump not available")
r = curl.ftp_upload(urls=[url], fupload=f'{srcfile}', with_stats=True, with_tcpdump=True)
r.check_stats(count=count, http_status=226)
assert r.tcpdump
- assert len(r.tcpdump.stats) == 0, f'Unexpected TCP RSTs packets'
+ assert len(r.tcpdump.stats) == 0, 'Unexpected TCP RSTs packets'
def test_30_08_active_download(self, env: Env, vsftpd: VsFTPD):
docname = 'data-10k'
log = logging.getLogger(__name__)
-@pytest.mark.skipif(condition=not Env.has_vsftpd(), reason=f"missing vsftpd")
+@pytest.mark.skipif(condition=not Env.has_vsftpd(), reason="missing vsftpd")
class TestVsFTPD:
SUPPORTS_SSL = True
r.check_stats(count=count, http_status=226)
# vsftp closes control connection without niceties,
# disregard RST packets it sent from its port to curl
- assert len(r.tcpdump.stats_excluding(src_port=env.ftps_port)) == 0, f'Unexpected TCP RSTs packets'
+ assert len(r.tcpdump.stats_excluding(src_port=env.ftps_port)) == 0, 'Unexpected TCP RSTs packets'
# check with `tcpdump` if curl causes any TCP RST packets
@pytest.mark.skipif(condition=not Env.tcpdump(), reason="tcpdump not available")
r.check_stats(count=count, http_status=226)
# vsftp closes control connection without niceties,
# disregard RST packets it sent from its port to curl
- assert len(r.tcpdump.stats_excluding(src_port=env.ftps_port)) == 0, f'Unexpected TCP RSTs packets'
+ assert len(r.tcpdump.stats_excluding(src_port=env.ftps_port)) == 0, 'Unexpected TCP RSTs packets'
def test_31_08_upload_ascii(self, env: Env, vsftpds: VsFTPD):
docname = 'upload-ascii'
# SPDX-License-Identifier: curl
#
###########################################################################
-#
+# ruff: noqa: F401, E402
import pytest
pytest.register_assert_rewrite("testenv.env", "testenv.curl", "testenv.caddy",
"testenv.httpd", "testenv.nghttpx")
from .httpd import Httpd
from .curl import CurlClient, ExecResult, RunProfile
from .client import LocalClient
-from .nghttpx import Nghttpx
from .nghttpx import Nghttpx, NghttpxQuic, NghttpxFwd
from .vsftpd import VsFTPD
fd.write(JSONEncoder().encode(data))
with open(self._conf_file, 'w') as fd:
conf = [ # base server config
- f'{{',
+ '{',
f' http_port {self.env.caddy_http_port}',
f' https_port {self.env.caddy_https_port}',
f' servers :{self.env.caddy_https_port} {{',
- f' protocols h3 h2 h1',
- f' }}',
- f'}}',
+ ' protocols h3 h2 h1',
+ ' }',
+ '}',
f'{domain1}:{self.env.caddy_https_port} {{',
- f' file_server * {{',
+ ' file_server * {',
f' root {self._docs_dir}',
- f' }}',
+ ' }',
f' tls {creds1.cert_file} {creds1.pkey_file}',
- f'}}',
+ '}',
f'{domain2} {{',
f' reverse_proxy /* http://localhost:{self.env.http_port} {{',
- f' }}',
+ ' }',
f' tls {creds2.cert_file} {creds2.pkey_file}',
- f'}}',
+ '}',
]
fd.write("\n".join(conf))
for name in domains:
try:
names.append(x509.IPAddress(ipaddress.ip_address(name)))
- except:
+ # TODO: specify specific exceptions here
+ except: # noqa: E722
names.append(x509.DNSName(name))
return csr.add_extension(
#
###########################################################################
#
-import pytest
-import json
import logging
import os
-import re
import shutil
import subprocess
-from datetime import timedelta, datetime
-from typing import List, Optional, Dict, Union
-from urllib.parse import urlparse
+from datetime import datetime
+from typing import Optional, Dict
from . import ExecResult
from .env import Env
if self._proc:
raise Exception('tcpdump still running')
lines = []
- for l in open(self._stdoutfile).readlines():
- if re.match(r'.* IP 127\.0\.0\.1\.\d+ [<>] 127\.0\.0\.1\.\d+:.*', l):
- lines.append(l)
+ for line in open(self._stdoutfile).readlines():
+ if re.match(r'.* IP 127\.0\.0\.1\.\d+ [<>] 127\.0\.0\.1\.\d+:.*', line):
+ lines.append(line)
return lines
def stats_excluding(self, src_port) -> Optional[List[str]]:
if self._proc:
raise Exception('tcpdump still running')
lines = []
- for l in self.stats:
- if not re.match(r'.* IP 127\.0\.0\.1\.' + str(src_port) + ' >.*', l):
- lines.append(l)
+ for line in self.stats:
+ if not re.match(r'.* IP 127\.0\.0\.1\.' + str(src_port) + ' >.*', line):
+ lines.append(line)
return lines
@property
def stderr(self) -> List[str]:
if self._proc:
raise Exception('tcpdump still running')
- lines = []
return open(self._stderrfile).readlines()
def sample(self):
try:
out = ''.join(self._stdout)
self._json_out = json.loads(out)
- except:
+ except: # noqa: E722
pass
def __repr__(self):
def _parse_stats(self):
self._stats = []
- for l in self._stdout:
+ for line in self._stdout:
try:
- self._stats.append(json.loads(l))
- except:
- log.error(f'not a JSON stat: {l}')
+ self._stats.append(json.loads(line))
+ # TODO: specify specific exceptions here
+ except: # noqa: E722
+ log.error(f'not a JSON stat: {line}')
break
@property
import shutil
import socket
import subprocess
-import sys
from configparser import ConfigParser, ExtendedInterpolation
-from datetime import timedelta
from typing import Optional
-import pytest
-
from .certs import CertificateSpec, TestCA, Credentials
from .ports import alloc_ports
assert False, f'{self.curl} -V failed with exit code: {p.returncode}'
if p.stderr.startswith('WARNING:'):
self.curl_is_debug = True
- for l in p.stdout.splitlines(keepends=False):
- if l.startswith('curl '):
- m = re.match(r'^curl (?P<version>\S+) (?P<os>\S+) (?P<libs>.*)$', l)
+ for line in p.stdout.splitlines(keepends=False):
+ if line.startswith('curl '):
+ m = re.match(r'^curl (?P<version>\S+) (?P<os>\S+) (?P<libs>.*)$', line)
if m:
self.curl_props['fullname'] = m.group(0)
self.curl_props['version'] = m.group('version')
self.curl_props['libs'] = [
re.sub(r'/.*', '', lib) for lib in self.curl_props['lib_versions']
]
- if l.startswith('Features: '):
+ if line.startswith('Features: '):
self.curl_props['features'] = [
- feat.lower() for feat in l[10:].split(' ')
+ feat.lower() for feat in line[10:].split(' ')
]
- if l.startswith('Protocols: '):
+ if line.startswith('Protocols: '):
self.curl_props['protocols'] = [
- prot.lower() for prot in l[11:].split(' ')
+ prot.lower() for prot in line[11:].split(' ')
]
self.ports = alloc_ports(port_specs={
self._caddy_version = m.group(1)
else:
raise f'Unable to determine cadd version from: {p.stdout}'
- except:
+ # TODO: specify specific exceptions here
+ except: # noqa: E722
self.caddy = None
self.vsftpd = self.config['vsftpd']['vsftpd']
self._vsftpd_version = 'unknown'
else:
raise Exception(f'Unable to determine VsFTPD version from: {p.stderr}')
- except Exception as e:
+ except Exception:
self.vsftpd = None
self._tcpdump = shutil.which('tcpdump')
def get_incomplete_reason(self) -> Optional[str]:
if self.httpd is None or len(self.httpd.strip()) == 0:
- return f'httpd not configured, see `--with-test-httpd=<path>`'
+ return 'httpd not configured, see `--with-test-httpd=<path>`'
if not os.path.isfile(self.httpd):
return f'httpd ({self.httpd}) not found'
if not os.path.isfile(self.apachectl):
return f'apachectl ({self.apachectl}) not found'
if self.apxs is None:
- return f"command apxs not found (commonly provided in apache2-dev)"
+ return "command apxs not found (commonly provided in apache2-dev)"
if not os.path.isfile(self.apxs):
return f"apxs ({self.apxs}) not found"
return None
raise Exception(f'{env.apxs} failed to query libexecdir: {p}')
self._mods_dir = p.stdout.strip()
if self._mods_dir is None:
- raise Exception(f'apache modules dir cannot be found')
+ raise Exception('apache modules dir cannot be found')
if not os.path.exists(self._mods_dir):
raise Exception(f'apache modules dir does not exist: {self._mods_dir}')
self._process = None
fd.write(f'LoadModule curltest_module \"{Httpd.MOD_CURLTEST}\"\n')
conf = [ # base server config
f'ServerRoot "{self._apache_dir}"',
- f'DefaultRuntimeDir logs',
- f'PidFile httpd.pid',
+ 'DefaultRuntimeDir logs',
+ 'PidFile httpd.pid',
f'ErrorLog {self._error_log}',
f'LogLevel {self._get_log_level()}',
- f'StartServers 4',
- f'ReadBufferSize 16000',
- f'H2MinWorkers 16',
- f'H2MaxWorkers 256',
+ 'StartServers 4',
+ 'ReadBufferSize 16000',
+ 'H2MinWorkers 16',
+ 'H2MaxWorkers 256',
f'Listen {self.env.http_port}',
f'Listen {self.env.https_port}',
f'Listen {self.env.proxy_port}',
f'Listen {self.env.proxys_port}',
f'TypesConfig "{self._conf_dir}/mime.types',
- f'SSLSessionCache "shmcb:ssl_gcache_data(32000)"',
+ 'SSLSessionCache "shmcb:ssl_gcache_data(32000)"',
]
if 'base' in self._extra_configs:
conf.extend(self._extra_configs['base'])
conf.extend([ # plain http host for domain1
f'<VirtualHost *:{self.env.http_port}>',
f' ServerName {domain1}',
- f' ServerAlias localhost',
+ ' ServerAlias localhost',
f' DocumentRoot "{self._docs_dir}"',
- f' Protocols h2c http/1.1',
- f' H2Direct on',
+ ' Protocols h2c http/1.1',
+ ' H2Direct on',
])
conf.extend(self._curltest_conf(domain1))
conf.extend([
- f'</VirtualHost>',
- f'',
+ '</VirtualHost>',
+ '',
])
conf.extend([ # https host for domain1, h1 + h2
f'<VirtualHost *:{self.env.https_port}>',
f' ServerName {domain1}',
- f' ServerAlias localhost',
- f' Protocols h2 http/1.1',
- f' SSLEngine on',
+ ' ServerAlias localhost',
+ ' Protocols h2 http/1.1',
+ ' SSLEngine on',
f' SSLCertificateFile {creds1.cert_file}',
f' SSLCertificateKeyFile {creds1.pkey_file}',
f' DocumentRoot "{self._docs_dir}"',
if domain1 in self._extra_configs:
conf.extend(self._extra_configs[domain1])
conf.extend([
- f'</VirtualHost>',
- f'',
+ '</VirtualHost>',
+ '',
])
# Alternate to domain1 with BROTLI compression
conf.extend([ # https host for domain1, h1 + h2
f'<VirtualHost *:{self.env.https_port}>',
f' ServerName {domain1brotli}',
- f' Protocols h2 http/1.1',
- f' SSLEngine on',
+ ' Protocols h2 http/1.1',
+ ' SSLEngine on',
f' SSLCertificateFile {creds1.cert_file}',
f' SSLCertificateKeyFile {creds1.pkey_file}',
f' DocumentRoot "{self._docs_dir}"',
- f' SetOutputFilter BROTLI_COMPRESS',
+ ' SetOutputFilter BROTLI_COMPRESS',
])
conf.extend(self._curltest_conf(domain1))
if domain1 in self._extra_configs:
conf.extend(self._extra_configs[domain1])
conf.extend([
- f'</VirtualHost>',
- f'',
+ '</VirtualHost>',
+ '',
])
conf.extend([ # plain http host for domain2
f'<VirtualHost *:{self.env.http_port}>',
f' ServerName {domain2}',
- f' ServerAlias localhost',
+ ' ServerAlias localhost',
f' DocumentRoot "{self._docs_dir}"',
- f' Protocols h2c http/1.1',
+ ' Protocols h2c http/1.1',
])
conf.extend(self._curltest_conf(domain2))
conf.extend([
- f'</VirtualHost>',
- f'',
+ '</VirtualHost>',
+ '',
])
conf.extend([ # https host for domain2, no h2
f'<VirtualHost *:{self.env.https_port}>',
f' ServerName {domain2}',
- f' Protocols http/1.1',
- f' SSLEngine on',
+ ' Protocols http/1.1',
+ ' SSLEngine on',
f' SSLCertificateFile {creds2.cert_file}',
f' SSLCertificateKeyFile {creds2.pkey_file}',
f' DocumentRoot "{self._docs_dir}/two"',
if domain2 in self._extra_configs:
conf.extend(self._extra_configs[domain2])
conf.extend([
- f'</VirtualHost>',
- f'',
+ '</VirtualHost>',
+ '',
])
conf.extend([ # http forward proxy
f'<VirtualHost *:{self.env.proxy_port}>',
f' ServerName {proxy_domain}',
- f' Protocols h2c http/1.1',
- f' ProxyRequests On',
- f' H2ProxyRequests On',
- f' ProxyVia On',
+ ' Protocols h2c http/1.1',
+ ' ProxyRequests On',
+ ' H2ProxyRequests On',
+ ' ProxyVia On',
f' AllowCONNECT {self.env.http_port} {self.env.https_port}',
])
conf.extend(self._get_proxy_conf())
conf.extend([
- f'</VirtualHost>',
- f'',
+ '</VirtualHost>',
+ '',
])
conf.extend([ # https forward proxy
f'<VirtualHost *:{self.env.proxys_port}>',
f' ServerName {proxy_domain}',
- f' Protocols h2 http/1.1',
- f' SSLEngine on',
+ ' Protocols h2 http/1.1',
+ ' SSLEngine on',
f' SSLCertificateFile {proxy_creds.cert_file}',
f' SSLCertificateKeyFile {proxy_creds.pkey_file}',
- f' ProxyRequests On',
- f' H2ProxyRequests On',
- f' ProxyVia On',
+ ' ProxyRequests On',
+ ' H2ProxyRequests On',
+ ' ProxyVia On',
f' AllowCONNECT {self.env.http_port} {self.env.https_port}',
])
conf.extend(self._get_proxy_conf())
conf.extend([
- f'</VirtualHost>',
- f'',
+ '</VirtualHost>',
+ '',
])
fd.write("\n".join(conf))
def _get_proxy_conf(self):
if self._proxy_auth_basic:
return [
- f' <Proxy "*">',
- f' AuthType Basic',
- f' AuthName "Restricted Proxy"',
- f' AuthBasicProvider file',
+ ' <Proxy "*">',
+ ' AuthType Basic',
+ ' AuthName "Restricted Proxy"',
+ ' AuthBasicProvider file',
f' AuthUserFile "{self._basic_passwords}"',
- f' Require user proxy',
- f' </Proxy>',
+ ' Require user proxy',
+ ' </Proxy>',
]
else:
return [
- f' <Proxy "*">',
- f' Require ip 127.0.0.1',
- f' </Proxy>',
+ ' <Proxy "*">',
+ ' Require ip 127.0.0.1',
+ ' </Proxy>',
]
def _get_log_level(self):
lines = []
if Httpd.MOD_CURLTEST is not None:
lines.extend([
- f' Redirect 302 /data.json.302 /data.json',
- f' Redirect 301 /curltest/echo301 /curltest/echo',
- f' Redirect 302 /curltest/echo302 /curltest/echo',
- f' Redirect 303 /curltest/echo303 /curltest/echo',
- f' Redirect 307 /curltest/echo307 /curltest/echo',
- f' <Location /curltest/sslinfo>',
- f' SSLOptions StdEnvVars',
- f' SetHandler curltest-sslinfo',
- f' </Location>',
- f' <Location /curltest/echo>',
- f' SetHandler curltest-echo',
- f' </Location>',
- f' <Location /curltest/put>',
- f' SetHandler curltest-put',
- f' </Location>',
- f' <Location /curltest/tweak>',
- f' SetHandler curltest-tweak',
- f' </Location>',
- f' Redirect 302 /tweak /curltest/tweak',
- f' <Location /curltest/1_1>',
- f' SetHandler curltest-1_1-required',
- f' </Location>',
- f' <Location /curltest/shutdown_unclean>',
- f' SetHandler curltest-tweak',
- f' SetEnv force-response-1.0 1',
- f' </Location>',
- f' SetEnvIf Request_URI "/shutdown_unclean" ssl-unclean=1',
+ ' Redirect 302 /data.json.302 /data.json',
+ ' Redirect 301 /curltest/echo301 /curltest/echo',
+ ' Redirect 302 /curltest/echo302 /curltest/echo',
+ ' Redirect 303 /curltest/echo303 /curltest/echo',
+ ' Redirect 307 /curltest/echo307 /curltest/echo',
+ ' <Location /curltest/sslinfo>',
+ ' SSLOptions StdEnvVars',
+ ' SetHandler curltest-sslinfo',
+ ' </Location>',
+ ' <Location /curltest/echo>',
+ ' SetHandler curltest-echo',
+ ' </Location>',
+ ' <Location /curltest/put>',
+ ' SetHandler curltest-put',
+ ' </Location>',
+ ' <Location /curltest/tweak>',
+ ' SetHandler curltest-tweak',
+ ' </Location>',
+ ' Redirect 302 /tweak /curltest/tweak',
+ ' <Location /curltest/1_1>',
+ ' SetHandler curltest-1_1-required',
+ ' </Location>',
+ ' <Location /curltest/shutdown_unclean>',
+ ' SetHandler curltest-tweak',
+ ' SetEnv force-response-1.0 1',
+ ' </Location>',
+ ' SetEnvIf Request_URI "/shutdown_unclean" ssl-unclean=1',
])
if self._auth_digest:
lines.extend([
f' <Directory {self.docs_dir}/restricted/digest>',
- f' AuthType Digest',
- f' AuthName "restricted area"',
+ ' AuthType Digest',
+ ' AuthName "restricted area"',
f' AuthDigestDomain "https://{servername}"',
- f' AuthBasicProvider file',
+ ' AuthBasicProvider file',
f' AuthUserFile "{self._digest_passwords}"',
- f' Require valid-user',
- f' </Directory>',
+ ' Require valid-user',
+ ' </Directory>',
])
return lines
def _write_config(self):
with open(self._conf_file, 'w') as fd:
- fd.write(f'# nghttpx test config'),
+ fd.write('# nghttpx test config'),
fd.write("\n".join([
'# do we need something here?'
]))
f'--frontend=*,{self.env.h3_port};quic',
f'--backend=127.0.0.1,{self.env.https_port};{self.env.domain1};sni={self.env.domain1};proto=h2;tls',
f'--backend=127.0.0.1,{self.env.http_port}',
- f'--log-level=INFO',
+ '--log-level=INFO',
f'--pid-file={self._pid_file}',
f'--errorlog-file={self._error_log}',
f'--conf={self._conf_file}',
f'--cacert={self.env.ca.cert_file}',
creds.pkey_file,
creds.cert_file,
- f'--frontend-http3-window-size=1M',
- f'--frontend-http3-max-window-size=10M',
- f'--frontend-http3-connection-window-size=10M',
- f'--frontend-http3-max-connection-window-size=100M',
+ '--frontend-http3-window-size=1M',
+ '--frontend-http3-max-window-size=10M',
+ '--frontend-http3-connection-window-size=10M',
+ '--frontend-http3-max-connection-window-size=100M',
# f'--frontend-quic-debug-log',
]
ngerr = open(self._stderr, 'a')
assert creds # convince pytype this isn't None
args = [
self._cmd,
- f'--http2-proxy',
+ '--http2-proxy',
f'--frontend=*,{self.env.h2proxys_port}',
f'--backend=127.0.0.1,{self.env.proxy_port}',
- f'--log-level=INFO',
+ '--log-level=INFO',
f'--pid-file={self._pid_file}',
f'--errorlog-file={self._error_log}',
f'--conf={self._conf_file}',
#
###########################################################################
#
-import inspect
import logging
import os
import subprocess
import time
-from typing import List, Union, Optional
from datetime import datetime, timedelta
return self.start()
return True
- def start(self, wait_live=True):
- pass
-
def stop_if_running(self):
if self.is_running():
return self.stop()
self._mkpath(self._docs_dir)
self._mkpath(self._tmp_dir)
conf = [ # base server config
- f'listen=YES',
- f'run_as_launching_user=YES',
- f'#listen_address=127.0.0.1',
+ 'listen=YES',
+ 'run_as_launching_user=YES',
+ '#listen_address=127.0.0.1',
f'listen_port={self.port}',
- f'local_enable=NO',
- f'anonymous_enable=YES',
+ 'local_enable=NO',
+ 'anonymous_enable=YES',
f'anon_root={self._docs_dir}',
- f'dirmessage_enable=YES',
- f'write_enable=YES',
- f'anon_upload_enable=YES',
- f'log_ftp_protocol=YES',
- f'xferlog_enable=YES',
- f'xferlog_std_format=NO',
+ 'dirmessage_enable=YES',
+ 'write_enable=YES',
+ 'anon_upload_enable=YES',
+ 'log_ftp_protocol=YES',
+ 'xferlog_enable=YES',
+ 'xferlog_std_format=NO',
f'vsftpd_log_file={self._error_log}',
- f'\n',
+ '\n',
]
if self._with_ssl:
creds = self.env.get_credentials(self.domain)
assert creds # convince pytype this isn't None
conf.extend([
- f'ssl_enable=YES',
- f'debug_ssl=YES',
- f'allow_anon_ssl=YES',
+ 'ssl_enable=YES',
+ 'debug_ssl=YES',
+ 'allow_anon_ssl=YES',
f'rsa_cert_file={creds.cert_file}',
f'rsa_private_key_file={creds.pkey_file}',
# require_ssl_reuse=YES means ctrl and data connection need to use the same session
- f'require_ssl_reuse=NO',
+ 'require_ssl_reuse=NO',
])
with open(self._conf_file, 'w') as fd:
import argparse
import asyncio
import logging
-from asyncio import IncompleteReadError
from websockets import server
from websockets.exceptions import ConnectionClosedError
import os
import socket
import sys
-import time
from util import ClosingFileHandler
# impacket needs to be installed in the Python environment
try:
- import impacket
+ import impacket # noqa: F401
except ImportError:
sys.stderr.write(
'Warning: Python package impacket is required for smb testing; '