These all seem reasonable to enable for this code.
run: find . -name '*.py' -exec pytype -j auto -k {} +
- name: ruff
- run: ruff check
+ run: ruff check --extend-select=B007,B016,C405,C416,COM818,D200,D213,D204,D401,D415,FURB129,N818,PERF401,PERF403,PIE790,PIE808,PLW0127,Q004,RUF010,SIM101,SIM117,SIM118,TRY400,TRY401
reuse:
runs-on: ubuntu-latest
#
###########################################################################
#
-""" DICT server """
+"""DICT server."""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
def dictserver(options):
- """
- Starts up a TCP server with a DICT handler and serves DICT requests
- forever.
- """
+ """Start up a TCP server with a DICT handler and serve DICT requests forever."""
if options.pidfile:
pid = os.getpid()
# see tests/server/util.c function write_pidfile
class DictHandler(socketserver.BaseRequestHandler):
- """Handler class for DICT connections.
+ """Handler class for DICT connections."""
- """
def handle(self):
- """
- Simple function which responds to all queries with a 552.
- """
+ """Respond to all queries with a 552."""
try:
# First, send a response to allow the server to continue.
rsp = "220 dictserver <xnooptions> <msgid@msgid>\n"
def setup_logging(options):
- """
- Set up logging from the command line options
- """
+ """Set up logging from the command line options."""
root_logger = logging.getLogger()
add_stdout = False
class ScriptRC(object):
- """Enum for script return codes"""
+ """Enum for script return codes."""
+
SUCCESS = 0
FAILURE = 1
EXCEPTION = 2
-class ScriptException(Exception):
- pass
-
-
if __name__ == '__main__':
# Get the options from the user.
options = get_options()
# Run main script.
try:
rc = dictserver(options)
- except Exception as e:
- log.exception(e)
+ except Exception:
+ log.exception('Error running server')
rc = ScriptRC.EXCEPTION
if options.pidfile and os.path.isfile(options.pidfile):
log = logging.getLogger(__name__)
-class ScoreCardException(Exception):
+class ScoreCardError(Exception):
pass
c_samples = []
hs_samples = []
errors = []
- for i in range(sample_size):
+ for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
args = [
'--http3-only' if proto == 'h3' else '--http2',
errors = []
profiles = []
self.info('single...')
- for i in range(sample_size):
+ for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
with_headers=False, with_profile=True)
profiles = []
url = f'{url}?[0-{count - 1}]'
self.info('serial...')
- for i in range(sample_size):
+ for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
with_headers=False, with_profile=True)
max_parallel = self._download_parallel if self._download_parallel > 0 else count
url = f'{url}?[0-{count - 1}]'
self.info('parallel...')
- for i in range(sample_size):
+ for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
with_headers=False,
errors = []
profiles = []
self.info('single...')
- for i in range(sample_size):
+ for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=proto,
with_headers=False, with_profile=True)
profiles = []
url = f'{url}?id=[0-{count - 1}]'
self.info('serial...')
- for i in range(sample_size):
+ for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=proto,
with_headers=False, with_profile=True)
max_parallel = count
url = f'{url}?id=[0-{count - 1}]'
self.info('parallel...')
- for i in range(sample_size):
+ for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=proto,
with_headers=False, with_profile=True,
'--parallel', '--parallel-max', str(max_parallel)
])
self.info(f'{max_parallel}...')
- for i in range(sample_size):
+ for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
with_headers=False, with_profile=True,
if proto == 'h3':
p['name'] = 'h3'
if not self.env.have_h3_curl():
- raise ScoreCardException('curl does not support HTTP/3')
+ raise ScoreCardError('curl does not support HTTP/3')
for lib in ['ngtcp2', 'quiche', 'msh3', 'nghttp3']:
if self.env.curl_uses_lib(lib):
p['implementation'] = lib
elif proto == 'h2':
p['name'] = 'h2'
if not self.env.have_h2_curl():
- raise ScoreCardException('curl does not support HTTP/2')
+ raise ScoreCardError('curl does not support HTTP/2')
for lib in ['nghttp2', 'hyper']:
if self.env.curl_uses_lib(lib):
p['implementation'] = lib
p['implementation'] = 'hyper' if self.env.curl_uses_lib('hyper')\
else 'native'
else:
- raise ScoreCardException(f"unknown protocol: {proto}")
+ raise ScoreCardError(f"unknown protocol: {proto}")
if 'implementation' not in p:
- raise ScoreCardException(f'did not recognized {p} lib')
+ raise ScoreCardError(f'did not recognized {p} lib')
p['version'] = Env.curl_lib_version(p['implementation'])
score = {
m_names = {}
mcol_width = 12
mcol_sw = 17
- for server, server_score in score['downloads'].items():
+ for server_score in score['downloads'].values():
for sskey, ssval in server_score.items():
if isinstance(ssval, str):
continue
size_score = score['downloads'][server][size]
print(f' {server:<8} {size:>8}', end='')
errors = []
- for key, val in size_score.items():
+ for val in size_score.values():
if 'errors' in val:
errors.extend(val['errors'])
for m in measures:
m_names = {}
mcol_width = 12
mcol_sw = 17
- for server, server_score in score['uploads'].items():
+ for server_score in score['uploads'].values():
for sskey, ssval in server_score.items():
if isinstance(ssval, str):
continue
size_score = score['uploads'][server][size]
print(f' {server:<8} {size:>8}', end='')
errors = []
- for key, val in size_score.items():
+ for val in size_score.values():
if 'errors' in val:
errors.extend(val['errors'])
for m in measures:
for server in score['requests']:
server_score = score['requests'][server]
for sskey, ssval in server_score.items():
- if isinstance(ssval, str) or isinstance(ssval, int):
+ if isinstance(ssval, (str, int)):
continue
if sskey not in sizes:
sizes.append(sskey)
- for mkey, mval in server_score[sskey].items():
+ for mkey in server_score[sskey]:
if mkey not in measures:
measures.append(mkey)
m_names[mkey] = f'{mkey}'
count = score['requests'][server]['count']
print(f' {server:<8} {size:>6} {count:>6}', end='')
errors = []
- for key, val in size_score.items():
+ for val in size_score.values():
if 'errors' in val:
errors.extend(val['errors'])
for m in measures:
else:
card.print_score(score)
- except ScoreCardException as ex:
- sys.stderr.write(f"ERROR: {str(ex)}\n")
+ except ScoreCardError as ex:
+ sys.stderr.write(f"ERROR: {ex}\n")
rv = 1
except KeyboardInterrupt:
log.warning("aborted")
r = curl.http_upload(urls=[url], data=data, alpn_proto=proto,
extra_args=['--parallel'])
r.check_stats(count=count, http_status=200, exitcode=0)
- for i in range(0,count):
+ for i in range(count):
respdata = open(curl.response_file(i)).readlines()
assert respdata == [data]
# check timings reported on a transfer for consistency
url = s['url_effective']
# all stat keys which reporting timings
- all_keys = set([
+ all_keys = {
'time_appconnect', 'time_connect', 'time_redirect',
'time_pretransfer', 'time_starttransfer', 'time_total'
- ])
+ }
# stat keys where we expect a positive value
- pos_keys = set(['time_pretransfer', 'time_starttransfer', 'time_total'])
+ pos_keys = {'time_pretransfer', 'time_starttransfer', 'time_total'}
if s['num_connects'] > 0:
pos_keys.add('time_connect')
if url.startswith('https:'):
@staticmethod
def gen_test_17_09_list():
- ret = []
- for tls_proto in ['TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3']:
- for max_ver in range(0, 5):
- for min_ver in range(-2, 4):
- ret.append([tls_proto, max_ver, min_ver])
- return ret
+ return [[tls_proto, max_ver, min_ver]
+ for tls_proto in ['TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3']
+ for max_ver in range(5)
+ for min_ver in range(-2, 4)]
@pytest.mark.parametrize("tls_proto, max_ver, min_ver", gen_test_17_09_list())
def test_17_09_ssl_min_max(self, env: Env, httpd, tls_proto, max_ver, min_ver):
valid_from: timedelta = timedelta(days=-1),
valid_to: timedelta = timedelta(days=89),
) -> Credentials:
- """Create a certificate signed by this CA for the given domains.
+ """
+ Create a certificate signed by this CA for the given domains.
+
:returns: the certificate and private key PEM file paths
"""
if spec.domains and len(spec.domains):
elif common_name:
name_pieces.append(x509.NameAttribute(NameOID.COMMON_NAME, common_name))
if parent:
- name_pieces.extend([rdn for rdn in parent])
+ name_pieces.extend(list(parent))
return x509.Name(name_pieces)
@staticmethod
valid_from: timedelta = timedelta(days=-1),
valid_to: timedelta = timedelta(days=89),
) -> Credentials:
- name = name
pkey = _private_key(key_type=key_type)
subject = TestCA._make_x509_name(common_name=name, parent=issuer.subject)
csr = TestCA._make_csr(subject=subject,
if key in os.environ and key not in run_env:
run_env[key] = os.environ[key]
try:
- with open(self._stdoutfile, 'w') as cout:
- with open(self._stderrfile, 'w') as cerr:
- p = subprocess.run(myargs, stderr=cerr, stdout=cout,
- cwd=self._run_dir, shell=False,
- input=None, env=run_env,
- timeout=self._timeout)
- exitcode = p.returncode
+ with open(self._stdoutfile, 'w') as cout, open(self._stderrfile, 'w') as cerr:
+ p = subprocess.run(myargs, stderr=cerr, stdout=cout,
+ cwd=self._run_dir, shell=False,
+ input=None, env=run_env,
+ timeout=self._timeout)
+ exitcode = p.returncode
except subprocess.TimeoutExpired:
log.warning(f'Timeout after {self._timeout}s: {args}')
exitcode = -1
def stats(self) -> Optional[List[str]]:
if self._proc:
raise Exception('tcpdump still running')
- lines = []
- for line in open(self._stdoutfile).readlines():
- if re.match(r'.* IP 127\.0\.0\.1\.\d+ [<>] 127\.0\.0\.1\.\d+:.*', line):
- lines.append(line)
- return lines
+ return [line
+ for line in open(self._stdoutfile)
+ if re.match(r'.* IP 127\.0\.0\.1\.\d+ [<>] 127\.0\.0\.1\.\d+:.*', line)]
def stats_excluding(self, src_port) -> Optional[List[str]]:
if self._proc:
raise Exception('tcpdump still running')
- lines = []
- for line in self.stats:
- if not re.match(r'.* IP 127\.0\.0\.1\.' + str(src_port) + ' >.*', line):
- lines.append(line)
- return lines
+ return [line
+ for line in self.stats
+ if not re.match(r'.* IP 127\.0\.0\.1\.' + str(src_port) + ' >.*', line)]
@property
def stderr(self) -> List[str]:
args.extend([
tcpdump, '-i', local_if, '-n', 'tcp[tcpflags] & (tcp-rst)!=0'
])
- with open(self._stdoutfile, 'w') as cout:
- with open(self._stderrfile, 'w') as cerr:
- self._proc = subprocess.Popen(args, stdout=cout, stderr=cerr,
- text=True, cwd=self._run_dir,
- shell=False)
- assert self._proc
- assert self._proc.returncode is None
- while self._proc:
- try:
- self._proc.wait(timeout=1)
- except subprocess.TimeoutExpired:
- pass
- except Exception as e:
- log.error(f'Tcpdump: {e}')
+ with open(self._stdoutfile, 'w') as cout, open(self._stderrfile, 'w') as cerr:
+ self._proc = subprocess.Popen(args, stdout=cout, stderr=cerr,
+ text=True, cwd=self._run_dir,
+ shell=False)
+ assert self._proc
+ assert self._proc.returncode is None
+ while self._proc:
+ try:
+ self._proc.wait(timeout=1)
+ except subprocess.TimeoutExpired:
+ pass
+ except Exception:
+ log.exception('Tcpdump')
def start(self):
def do_sample():
self._stats.append(json.loads(line))
# TODO: specify specific exceptions here
except: # noqa: E722
- log.error(f'not a JSON stat: {line}')
+ log.exception(f'not a JSON stat: {line}')
break
@property
tcpdump = RunTcpDump(self.env, self._run_dir)
tcpdump.start()
try:
- with open(self._stdoutfile, 'w') as cout:
- with open(self._stderrfile, 'w') as cerr:
- if with_profile:
- end_at = started_at + timedelta(seconds=self._timeout) \
- if self._timeout else None
- log.info(f'starting: {args}')
- p = subprocess.Popen(args, stderr=cerr, stdout=cout,
- cwd=self._run_dir, shell=False,
- env=self._run_env)
- profile = RunProfile(p.pid, started_at, self._run_dir)
- if intext is not None and False:
- p.communicate(input=intext.encode(), timeout=1)
- ptimeout = 0.0
- while True:
- try:
- p.wait(timeout=ptimeout)
- break
- except subprocess.TimeoutExpired:
- if end_at and datetime.now() >= end_at:
- p.kill()
- raise subprocess.TimeoutExpired(cmd=args, timeout=self._timeout)
- profile.sample()
- ptimeout = 0.01
- exitcode = p.returncode
- profile.finish()
- log.info(f'done: exit={exitcode}, profile={profile}')
- else:
- p = subprocess.run(args, stderr=cerr, stdout=cout,
- cwd=self._run_dir, shell=False,
- input=intext.encode() if intext else None,
- timeout=self._timeout,
- env=self._run_env)
- exitcode = p.returncode
+ with open(self._stdoutfile, 'w') as cout, open(self._stderrfile, 'w') as cerr:
+ if with_profile:
+ end_at = started_at + timedelta(seconds=self._timeout) \
+ if self._timeout else None
+ log.info(f'starting: {args}')
+ p = subprocess.Popen(args, stderr=cerr, stdout=cout,
+ cwd=self._run_dir, shell=False,
+ env=self._run_env)
+ profile = RunProfile(p.pid, started_at, self._run_dir)
+ if intext is not None and False:
+ p.communicate(input=intext.encode(), timeout=1)
+ ptimeout = 0.0
+ while True:
+ try:
+ p.wait(timeout=ptimeout)
+ break
+ except subprocess.TimeoutExpired:
+ if end_at and datetime.now() >= end_at:
+ p.kill()
+ raise subprocess.TimeoutExpired(cmd=args, timeout=self._timeout)
+ profile.sample()
+ ptimeout = 0.01
+ exitcode = p.returncode
+ profile.finish()
+ log.info(f'done: exit={exitcode}, profile={profile}')
+ else:
+ p = subprocess.run(args, stderr=cerr, stdout=cout,
+ cwd=self._run_dir, shell=False,
+ input=intext.encode() if intext else None,
+ timeout=self._timeout,
+ env=self._run_env)
+ exitcode = p.returncode
except subprocess.TimeoutExpired:
now = datetime.now()
duration = now - started_at
args.extend(['-v', '--trace-ids', '--trace-time'])
if self.env.verbose > 1:
args.extend(['--trace-config', 'http/2,http/3,h2-proxy,h1-proxy'])
- pass
active_options = options
if options is not None and '--next' in options:
log.error(f'{self.apxs} failed to query HTTPD_VERSION: {p}')
else:
self._httpd_version = p.stdout.strip()
- except Exception as e:
- log.error(f'{self.apxs} failed to run: {e}')
+ except Exception:
+ log.exception(f'{self.apxs} failed to run')
return self._httpd_version
def versiontuple(self, v):
def make_data_file(self, indir: str, fname: str, fsize: int,
line_length: int = 1024) -> str:
if line_length < 11:
- raise 'line_length less than 11 not supported'
+ raise RuntimeError('line_length less than 11 not supported')
fpath = os.path.join(indir, fname)
s10 = "0123456789"
s = round((line_length / 10) + 1) * s10
self._proxy_auth_basic = active
def _run(self, args, intext=''):
- env = {}
- for key, val in os.environ.items():
- env[key] = val
+ env = os.environ.copy()
env['APACHE_RUN_DIR'] = self._run_dir
env['APACHE_RUN_USER'] = os.environ['USER']
env['APACHE_LOCK_DIR'] = self._lock_dir
if os.path.exists(os.path.join(self._mods_dir, f'mod_{m}.so')):
fd.write(f'LoadModule {m}_module "{self._mods_dir}/mod_{m}.so"\n')
if Httpd.MOD_CURLTEST is not None:
- fd.write(f'LoadModule curltest_module \"{Httpd.MOD_CURLTEST}\"\n')
+ fd.write(f'LoadModule curltest_module "{Httpd.MOD_CURLTEST}"\n')
conf = [ # base server config
f'ServerRoot "{self._apache_dir}"',
'DefaultRuntimeDir logs',
def _write_config(self):
with open(self._conf_file, 'w') as fd:
- fd.write('# nghttpx test config'),
+ fd.write('# nghttpx test config')
fd.write("\n".join([
'# do we need something here?'
]))
#
# SPDX-License-Identifier: curl
#
-""" A telnet server which negotiates"""
+"""A telnet server which negotiates."""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
def telnetserver(options):
- """
- Starts up a TCP server with a telnet handler and serves DICT requests
- forever.
- """
+ """Start up a TCP server with a telnet handler and serve DICT requests forever."""
if options.pidfile:
pid = os.getpid()
# see tests/server/util.c function write_pidfile
class NegotiatingTelnetHandler(socketserver.BaseRequestHandler):
- """Handler class for Telnet connections.
+ """Handler class for Telnet connections."""
- """
def handle(self):
- """
- Negotiates options before reading data.
- """
+ """Negotiates options before reading data."""
neg = Negotiator(self.request)
try:
def recv(self, bytes):
"""
- Read bytes from TCP, handling negotiation sequences
+ Read bytes from TCP, handling negotiation sequences.
:param bytes: Number of bytes to read
:return: a buffer of bytes
@classmethod
def from_val(cls, val):
- for k in cls.__dict__.keys():
+ for k in cls.__dict__:
if getattr(cls, k) == val:
return k
def setup_logging(options):
- """
- Set up logging from the command line options
- """
+ """Set up logging from the command line options."""
root_logger = logging.getLogger()
add_stdout = False
class ScriptRC(object):
- """Enum for script return codes"""
+ """Enum for script return codes."""
+
SUCCESS = 0
FAILURE = 1
EXCEPTION = 2
-class ScriptException(Exception):
- pass
-
-
if __name__ == '__main__':
# Get the options from the user.
options = get_options()
# Run main script.
try:
rc = telnetserver(options)
- except Exception as e:
- log.exception(e)
+ except Exception:
+ log.exception('Error in telnet server')
rc = ScriptRC.EXCEPTION
if options.pidfile and os.path.isfile(options.pidfile):
#
# SPDX-License-Identifier: curl
#
-"""Server for testing SMB"""
+"""Server for testing SMB."""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
class ShutdownHandler(threading.Thread):
- """Cleanly shut down the SMB server
+ """
+ Cleanly shut down the SMB server.
This can only be done from another thread while the server is in
serve_forever(), so a thread is spawned here that waits for a shutdown
def smbserver(options):
- """Start up a TCP SMB server that serves forever
-
- """
+ """Start up a TCP SMB server that serves forever."""
if options.pidfile:
pid = os.getpid()
# see tests/server/util.c function write_pidfile
smb_config.set("TESTS", "path", TESTS_MAGIC)
if not options.srcdir or not os.path.isdir(options.srcdir):
- raise ScriptException("--srcdir is mandatory")
+ raise ScriptError("--srcdir is mandatory")
test_data_dir = os.path.join(options.srcdir, "data")
"""
conn_data = smb_server.getConnectionData(conn_id)
- # Wrap processing in a try block which allows us to throw SmbException
+ # Wrap processing in a try block which allows us to throw SmbError
# to control the flow.
try:
ncax_parms = imp_smb.SMBNtCreateAndX_Parameters(
# Currently we only support reading files.
if disposition != imp_smb.FILE_OPEN:
- raise SmbException(STATUS_ACCESS_DENIED,
+ raise SmbError(STATUS_ACCESS_DENIED,
"Only support reading files")
# Check to see if the path we were given is actually a
level=imp_smb.SMB_QUERY_FILE_ALL_INFO)
if error_code != STATUS_SUCCESS:
- raise SmbException(error_code, "Failed to query path info")
+ raise SmbError(error_code, "Failed to query path info")
resp_parms["CreateTime"] = resp_info["CreationTime"]
resp_parms["LastAccessTime"] = resp_info[
conn_data["OpenedFiles"][fakefid]["FileName"] = path
conn_data["OpenedFiles"][fakefid]["DeleteOnClose"] = False
- except SmbException as s:
- log.debug("[SMB] SmbException hit: %s", s)
+ except SmbError as s:
+ log.debug("[SMB] SmbError hit: %s", s)
error_code = s.error_code
resp_parms = ""
resp_data = ""
if "path" in conn_shares[tid]:
path = conn_shares[tid]["path"]
else:
- raise SmbException(STATUS_ACCESS_DENIED,
+ raise SmbError(STATUS_ACCESS_DENIED,
"Connection share had no path")
else:
- raise SmbException(imp_smbserver.STATUS_SMB_BAD_TID,
+ raise SmbError(imp_smbserver.STATUS_SMB_BAD_TID,
"TID was invalid")
return path
log.debug("[SMB] Get server path '%s'", requested_filename)
if requested_filename not in [VERIFIED_REQ]:
- raise SmbException(STATUS_NO_SUCH_FILE, "Couldn't find the file")
+ raise SmbError(STATUS_NO_SUCH_FILE, "Couldn't find the file")
fid, filename = tempfile.mkstemp()
log.debug("[SMB] Created %s (%d) for storing '%s'",
except Exception:
log.exception("Failed to make test file")
- raise SmbException(STATUS_NO_SUCH_FILE, "Failed to make test file")
+ raise SmbError(STATUS_NO_SUCH_FILE, "Failed to make test file")
-class SmbException(Exception):
+class SmbError(Exception):
def __init__(self, error_code, error_message):
- super(SmbException, self).__init__(error_message)
+ super(SmbError, self).__init__(error_message)
self.error_code = error_code
class ScriptRC(object):
- """Enum for script return codes"""
+ """Enum for script return codes."""
+
SUCCESS = 0
FAILURE = 1
EXCEPTION = 2
-class ScriptException(Exception):
+class ScriptError(Exception):
pass
def setup_logging(options):
- """
- Set up logging from the command line options
- """
+ """Set up logging from the command line options."""
root_logger = logging.getLogger()
add_stdout = False
# Run main script.
try:
rc = smbserver(options)
- except Exception as e:
- log.exception(e)
+ except Exception:
+ log.exception('Error in SMB server')
rc = ScriptRC.EXCEPTION
if options.pidfile and os.path.isfile(options.pidfile):
#
# SPDX-License-Identifier: curl
#
-"""Module for extracting test data from the test data folder and other utils"""
+"""Module for extracting test data from the test data folder and other utils."""
from __future__ import (absolute_import, division, print_function,
unicode_literals)