- name: install
run: |
- sudo apt-get install codespell python3-pip
+ sudo apt-get install codespell python3-pip python3-pytest
python3 -m pip install cmakelint==1.4.3
- name: spellcheck
from util import ClosingFileHandler
try: # Python 2
- import SocketServer as socketserver
+ import SocketServer as socketserver # type: ignore
except ImportError: # Python 3
import socketserver
import logging
import os
import sys
-from typing import Optional
+from typing import Generator
import pytest
@pytest.fixture(scope='package')
-def httpd(env) -> Httpd:
+def httpd(env) -> Generator[Httpd, None, None]:
httpd = Httpd(env=env)
if not httpd.exists():
pytest.skip(f'httpd not found: {env.httpd}')
@pytest.fixture(scope='package')
-def nghttpx(env, httpd) -> Optional[Nghttpx]:
+def nghttpx(env, httpd) -> Generator[Nghttpx, None, None]:
nghttpx = NghttpxQuic(env=env)
if env.have_h3():
nghttpx.clear_logs()
nghttpx.stop()
@pytest.fixture(scope='package')
-def nghttpx_fwd(env, httpd) -> Optional[Nghttpx]:
+def nghttpx_fwd(env, httpd) -> Generator[Nghttpx, None, None]:
nghttpx = NghttpxFwd(env=env)
if env.have_h3():
nghttpx.clear_logs()
import os
import time
import pytest
+from typing import List
from testenv import Env, CurlClient, LocalClient
up_speed = r.stats[0]['speed_upload']
assert (speed_limit * 0.5) <= up_speed <= (speed_limit * 1.5), f'{r.stats[0]}'
- def check_downloads(self, client, source: str, count: int,
+ def check_downloads(self, client, source: List[str], count: int,
complete: bool = True):
for i in range(count):
dfile = client.download_file(i)
import os
import socket
from threading import Thread
+from typing import Generator
+
import pytest
from testenv import Env, CurlClient
def __init__(self, path):
self._uds_path = path
self._done = False
+ self._socket = None
@property
def path(self):
class TestUnix:
@pytest.fixture(scope="class")
- def uds_faker(self, env: Env) -> UDSFaker:
+ def uds_faker(self, env: Env) -> Generator[UDSFaker, None, None]:
uds_path = os.path.join(env.gen_dir, 'uds_11.sock')
faker = UDSFaker(path=uds_path)
faker.start()
return self._docs_dir
@property
- def port(self) -> str:
+ def port(self) -> int:
return self.env.caddy_https_port
def clear_logs(self):
def _write_config(self):
domain1 = self.env.domain1
creds1 = self.env.get_credentials(domain1)
+ assert creds1 # convince pytype this isn't None
domain2 = self.env.domain2
creds2 = self.env.get_credentials(domain2)
+ assert creds2 # convince pytype this isn't None
self._mkpath(self._docs_dir)
self._mkpath(self._tmp_dir)
with open(os.path.join(self._docs_dir, 'data.json'), 'w') as fd:
self._cert_file = None
self._pkey_file = None
self._store = None
+ self._combined_file = None
@property
def name(self) -> str:
return creds
@staticmethod
- def _make_x509_name(org_name: str = None, common_name: str = None, parent: x509.Name = None) -> x509.Name:
+ def _make_x509_name(org_name: Optional[str] = None, common_name: Optional[str] = None, parent: x509.Name = None) -> x509.Name:
name_pieces = []
if org_name:
oid = NameOID.ORGANIZATIONAL_UNIT_NAME if parent else NameOID.ORGANIZATION_NAME
subject: x509.Name,
pkey: Any,
issuer_subject: Optional[Credentials],
- valid_from_delta: timedelta = None,
- valid_until_delta: timedelta = None
+ valid_from_delta: Optional[timedelta] = None,
+ valid_until_delta: Optional[timedelta] = None
):
pubkey = pkey.public_key()
issuer_subject = issuer_subject if issuer_subject is not None else subject
)
@staticmethod
- def _add_client_usages(csr: Any, issuer: Credentials, rfc82name: str = None) -> Any:
+ def _add_client_usages(csr: Any, issuer: Credentials, rfc82name: Optional[str] = None) -> Any:
cert = csr.add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True,
@staticmethod
def _make_ca_credentials(name, key_type: Any,
- issuer: Credentials = None,
+ issuer: Optional[Credentials] = None,
valid_from: timedelta = timedelta(days=-1),
valid_to: timedelta = timedelta(days=89),
) -> Credentials:
with_profile: bool = False,
with_tcpdump: bool = False,
no_save: bool = False,
- extra_args: List[str] = None):
+ extra_args: Optional[List[str]] = None):
if extra_args is None:
extra_args = []
if no_save:
with_profile: bool = False,
with_tcpdump: bool = False,
no_save: bool = False,
- extra_args: List[str] = None):
+ extra_args: Optional[List[str]] = None):
if extra_args is None:
extra_args = []
if no_save:
with_profile: bool = False,
with_tcpdump: bool = False,
no_save: bool = False,
- extra_args: List[str] = None):
+ extra_args: Optional[List[str]] = None):
if extra_args is None:
extra_args = []
extra_args.extend([
with_stats: bool = True,
with_profile: bool = False,
with_tcpdump: bool = False,
- extra_args: List[str] = None):
+ extra_args: Optional[List[str]] = None):
if extra_args is None:
extra_args = []
if fupload is not None:
with_stats: bool = True,
with_profile: bool = False,
with_tcpdump: bool = False,
- extra_args: List[str] = None):
+ extra_args: Optional[List[str]] = None):
if extra_args is None:
extra_args = []
extra_args.extend([
with_profile=with_profile, with_tcpdump=with_tcpdump)
if r.exit_code == 0 and with_headers:
self._parse_headerfile(self._headerfile, r=r)
- if r.json:
- r.response["json"] = r.json
return r
def _complete_args(self, urls, timeout=None, options=None,
args.append(url)
return args
- def _parse_headerfile(self, headerfile: str, r: ExecResult = None) -> ExecResult:
+ def _parse_headerfile(self, headerfile: str, r: Optional[ExecResult] = None) -> ExecResult:
lines = open(headerfile).readlines()
if r is None:
r = ExecResult(args=[], exit_code=0, stdout=[], stderr=[])
if 'CURL' in os.environ:
self.curl = os.environ['CURL']
self.curl_props = {
- 'version': None,
- 'os': None,
- 'fullname': None,
+ 'version': '',
+ 'os': '',
+ 'fullname': '',
'features': [],
'protocols': [],
'libs': [],
return 'unknown'
@staticmethod
- def curl_lib_version_at_least(libname: str, min_version) -> str:
+ def curl_lib_version_at_least(libname: str, min_version) -> bool:
lversion = Env.curl_lib_version(libname)
if lversion != 'unknown':
return Env.CONFIG.versiontuple(min_version) <= \
domain1 = self.env.domain1
domain1brotli = self.env.domain1brotli
creds1 = self.env.get_credentials(domain1)
+ assert creds1 # convince pytype this isn't None
domain2 = self.env.domain2
creds2 = self.env.get_credentials(domain2)
+ assert creds2 # convince pytype this isn't None
proxy_domain = self.env.proxy_domain
proxy_creds = self.env.get_credentials(proxy_domain)
+ assert proxy_creds # convince pytype this isn't None
self._mkpath(self._conf_dir)
self._mkpath(self._logs_dir)
self._mkpath(self._tmp_dir)
self._error_log = os.path.join(self._run_dir, 'nghttpx.log')
self._stderr = os.path.join(self._run_dir, 'nghttpx.stderr')
self._tmp_dir = os.path.join(self._run_dir, 'tmp')
- self._process = None
self._process: Optional[subprocess.Popen] = None
self._rmf(self._pid_file)
self._rmf(self._error_log)
self._mkpath(self._tmp_dir)
if self._process:
self.stop()
+ creds = self.env.get_credentials(self.env.domain1)
+ assert creds # convince pytype this isn't None
args = [
self._cmd,
f'--frontend=*,{self.env.h3_port};quic',
f'--errorlog-file={self._error_log}',
f'--conf={self._conf_file}',
f'--cacert={self.env.ca.cert_file}',
- self.env.get_credentials(self.env.domain1).pkey_file,
- self.env.get_credentials(self.env.domain1).cert_file,
+ creds.pkey_file,
+ creds.cert_file,
f'--frontend-http3-window-size=1M',
f'--frontend-http3-max-window-size=10M',
f'--frontend-http3-connection-window-size=10M',
self._mkpath(self._tmp_dir)
if self._process:
self.stop()
+ creds = self.env.get_credentials(self.env.proxy_domain)
+ assert creds # convince pytype this isn't None
args = [
self._cmd,
f'--http2-proxy',
f'--errorlog-file={self._error_log}',
f'--conf={self._conf_file}',
f'--cacert={self.env.ca.cert_file}',
- self.env.get_credentials(self.env.proxy_domain).pkey_file,
- self.env.get_credentials(self.env.proxy_domain).cert_file,
+ creds.pkey_file,
+ creds.cert_file,
]
ngerr = open(self._stderr, 'a')
self._process = subprocess.Popen(args=args, stderr=ngerr)
import logging
import os
import subprocess
-from datetime import timedelta, datetime
-from json import JSONEncoder
import time
from typing import List, Union, Optional
-from .curl import CurlClient, ExecResult
+from datetime import datetime, timedelta
+
+from .curl import CurlClient
from .env import Env
return self._docs_dir
@property
- def port(self) -> str:
+ def port(self) -> int:
return self._port
def clear_logs(self):
log.error(f"Server still not responding after {timeout}")
return False
- def _run(self, args, intext=''):
- env = {}
- for key, val in os.environ.items():
- env[key] = val
- with open(self._error_log, 'w') as cerr:
- self._process = subprocess.run(args, stderr=cerr, stdout=cerr,
- cwd=self._vsftpd_dir,
- input=intext.encode() if intext else None,
- env=env)
- start = datetime.now()
- return ExecResult(args=args, exit_code=self._process.returncode,
- duration=datetime.now() - start)
-
def _rmf(self, path):
if os.path.exists(path):
return os.remove(path)
]
if self._with_ssl:
creds = self.env.get_credentials(self.domain)
+ assert creds # convince pytype this isn't None
conf.extend([
f'ssl_enable=YES',
f'debug_ssl=YES',
if path == SERVER_MAGIC:
fid, full_path = self.get_server_path(requested_file)
else:
- assert (path == TESTS_MAGIC)
+ assert path == TESTS_MAGIC
fid, full_path = self.get_test_path(requested_file)
self.tmpfiles.append(full_path)