From f020d724b7968b21538c4d35bdfac29e9b70edbd Mon Sep 17 00:00:00 2001 From: Joel Rosdahl Date: Fri, 8 Aug 2025 13:10:56 +0200 Subject: [PATCH] chore: Modernize Python code in test/http-* --- test/http-client | 18 +++------- test/http-server | 85 ++++++++++++++++++++---------------------------- 2 files changed, 40 insertions(+), 63 deletions(-) diff --git a/test/http-client b/test/http-client index d7a60c9c0..95382d7a8 100755 --- a/test/http-client +++ b/test/http-client @@ -3,6 +3,7 @@ # This is a simple HTTP client to test readiness of the asynchronously # launched HTTP server. +import base64 import sys import time import urllib.request @@ -12,12 +13,8 @@ def run(url, timeout, basic_auth): deadline = time.time() + timeout req = urllib.request.Request(url, method="HEAD") if basic_auth: - import base64 - - encoded_credentials = base64.b64encode( - basic_auth.encode("ascii") - ).decode("ascii") - req.add_header("Authorization", f"Basic {encoded_credentials}") + credentials = base64.b64encode(basic_auth.encode("ascii")).decode("ascii") + req.add_header("Authorization", f"Basic {credentials}") while True: try: response = urllib.request.urlopen(req) @@ -26,10 +23,7 @@ def run(url, timeout, basic_auth): except urllib.error.URLError as e: print(e.reason) if time.time() > deadline: - print( - f"All connection attempts failed within {timeout} seconds." - ) - sys.exit(1) + sys.exit(f"All connection attempts failed within {timeout} seconds.") time.sleep(0.5) @@ -37,9 +31,7 @@ if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() - parser.add_argument( - "--basic-auth", "-B", help="Basic auth tuple like user:pass" - ) + parser.add_argument("--basic-auth", "-B", help="Basic auth tuple like user:pass") parser.add_argument( "--timeout", "-t", diff --git a/test/http-server b/test/http-server index 4c5791fc8..2d041e643 100755 --- a/test/http-server +++ b/test/http-server @@ -6,13 +6,16 @@ # # See: https://github.com/python/cpython/blob/main/Lib/http/server.py -from functools import partial -from http import HTTPStatus -from http.server import HTTPServer, SimpleHTTPRequestHandler +import argparse +import base64 import os import signal import socket import sys +from functools import partial +from http import HTTPStatus +from http.server import HTTPServer, SimpleHTTPRequestHandler +from pathlib import Path class AuthenticationError(Exception): @@ -23,35 +26,32 @@ class PUTEnabledHTTPRequestHandler(SimpleHTTPRequestHandler): def __init__(self, *args, basic_auth=None, **kwargs): self.basic_auth = None if basic_auth: - import base64 - - self.basic_auth = base64.b64encode( - basic_auth.encode("ascii") - ).decode("ascii") + self.basic_auth = base64.b64encode(basic_auth.encode("ascii")).decode( + "ascii" + ) super().__init__(*args, **kwargs) - def do_GET(self): + def do_GET(self): # noqa: N802 try: self._handle_auth() super().do_GET() except AuthenticationError: self.send_error(HTTPStatus.UNAUTHORIZED, "Need Authentication") - def do_HEAD(self): + def do_HEAD(self): # noqa: N802 try: self._handle_auth() super().do_HEAD() except AuthenticationError: self.send_error(HTTPStatus.UNAUTHORIZED, "Need Authentication") - def do_PUT(self): - path = self.translate_path(self.path) - os.makedirs(os.path.dirname(path), exist_ok=True) + def do_PUT(self): # noqa: N802 + path = Path(self.translate_path(self.path)) + path.parent.mkdir(parents=True, exist_ok=True) try: self._handle_auth() file_length = int(self.headers["Content-Length"]) - with open(path, "wb") as output_file: - output_file.write(self.rfile.read(file_length)) + path.write_bytes(self.rfile.read(file_length)) self.send_response(HTTPStatus.CREATED) self.send_header("Content-Length", "0") self.end_headers() @@ -62,20 +62,18 @@ class PUTEnabledHTTPRequestHandler(SimpleHTTPRequestHandler): HTTPStatus.INTERNAL_SERVER_ERROR, "Cannot open file for writing" ) - def do_DELETE(self): - path = self.translate_path(self.path) + def do_DELETE(self): # noqa: N802 + path = Path(self.translate_path(self.path)) try: self._handle_auth() - os.remove(path) + path.unlink() self.send_response(HTTPStatus.OK) self.send_header("Content-Length", "0") self.end_headers() except AuthenticationError: self.send_error(HTTPStatus.UNAUTHORIZED, "Need Authentication") except OSError: - self.send_error( - HTTPStatus.INTERNAL_SERVER_ERROR, "Cannot delete file" - ) + self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, "Cannot delete file") def _handle_auth(self): if not self.basic_auth: @@ -83,12 +81,8 @@ class PUTEnabledHTTPRequestHandler(SimpleHTTPRequestHandler): authorization = self.headers.get("authorization") if authorization: authorization = authorization.split() - if len(authorization) == 2: - if ( - authorization[0] == "Basic" - and authorization[1] == self.basic_auth - ): - return + if authorization == ["Basic", self.basic_auth]: + return raise AuthenticationError("Authentication required") @@ -98,21 +92,18 @@ def _get_best_family(*address): type=socket.SOCK_STREAM, flags=socket.AI_PASSIVE, ) - family, type, proto, canonname, sockaddr = next(iter(infos)) + family, _type, _proto, _canonname, sockaddr = next(iter(infos)) return family, sockaddr -def run(HandlerClass, ServerClass, port, bind): - HandlerClass.protocol_version = "HTTP/1.1" - ServerClass.address_family, addr = _get_best_family(bind, port) +def run(handler_class, server_class, port, bind): + handler_class.protocol_version = "HTTP/1.1" + server_class.address_family, addr = _get_best_family(bind, port) - with ServerClass(addr, HandlerClass) as httpd: + with server_class(addr, handler_class) as httpd: host, port = httpd.socket.getsockname()[:2] url_host = f"[{host}]" if ":" in host else host - print( - f"Serving HTTP on {host} port {port} " - f"(http://{url_host}:{port}/) ..." - ) + print(f"Serving HTTP on {host} port {port} (http://{url_host}:{port}/) ...") try: httpd.serve_forever() except KeyboardInterrupt: @@ -120,30 +111,26 @@ def run(HandlerClass, ServerClass, port, bind): sys.exit(0) -def on_terminate(signum, frame): +def on_terminate(signum, _frame): sys.stdout.flush() sys.stderr.flush() sys.exit(128 + signum) if __name__ == "__main__": - import argparse - parser = argparse.ArgumentParser() - parser.add_argument( - "--basic-auth", "-B", help="Basic auth tuple like user:pass" - ) + parser.add_argument("--basic-auth", "-B", help="Basic auth tuple like user:pass") parser.add_argument( "--bind", "-b", metavar="ADDRESS", - help="Specify alternate bind address " "[default: all interfaces]", + help="Specify alternate bind address [default: all interfaces]", ) parser.add_argument( "--directory", "-d", - default=os.getcwd(), - help="Specify alternative directory " "[default:current directory]", + default=Path.cwd(), + help="Specify alternative directory [default:current directory]", ) parser.add_argument( "port", @@ -155,9 +142,7 @@ if __name__ == "__main__": ) args = parser.parse_args() - handler_class = partial( - PUTEnabledHTTPRequestHandler, basic_auth=args.basic_auth - ) + handler_class = partial(PUTEnabledHTTPRequestHandler, basic_auth=args.basic_auth) os.chdir(args.directory) @@ -165,8 +150,8 @@ if __name__ == "__main__": signal.signal(signal.SIGTERM, on_terminate) run( - HandlerClass=handler_class, - ServerClass=HTTPServer, + handler_class=handler_class, + server_class=HTTPServer, port=args.port, bind=args.bind, ) -- 2.47.2