]> git.ipfire.org Git - thirdparty/ccache.git/commitdiff
chore: Modernize Python code in test/http-* master
authorJoel Rosdahl <joel@rosdahl.net>
Fri, 8 Aug 2025 11:10:56 +0000 (13:10 +0200)
committerJoel Rosdahl <joel@rosdahl.net>
Fri, 8 Aug 2025 11:12:29 +0000 (13:12 +0200)
test/http-client
test/http-server

index d7a60c9c073f4fb89044aa11ff40f27842e1ea4f..95382d7a82abf477c74bac28d51469c900491592 100755 (executable)
@@ -3,6 +3,7 @@
 # This is a simple HTTP client to test readiness of the asynchronously
 # launched HTTP server.
 
+import base64
 import sys
 import time
 import urllib.request
@@ -12,12 +13,8 @@ def run(url, timeout, basic_auth):
     deadline = time.time() + timeout
     req = urllib.request.Request(url, method="HEAD")
     if basic_auth:
-        import base64
-
-        encoded_credentials = base64.b64encode(
-            basic_auth.encode("ascii")
-        ).decode("ascii")
-        req.add_header("Authorization", f"Basic {encoded_credentials}")
+        credentials = base64.b64encode(basic_auth.encode("ascii")).decode("ascii")
+        req.add_header("Authorization", f"Basic {credentials}")
     while True:
         try:
             response = urllib.request.urlopen(req)
@@ -26,10 +23,7 @@ def run(url, timeout, basic_auth):
         except urllib.error.URLError as e:
             print(e.reason)
             if time.time() > deadline:
-                print(
-                    f"All connection attempts failed within {timeout} seconds."
-                )
-                sys.exit(1)
+                sys.exit(f"All connection attempts failed within {timeout} seconds.")
             time.sleep(0.5)
 
 
@@ -37,9 +31,7 @@ if __name__ == "__main__":
     import argparse
 
     parser = argparse.ArgumentParser()
-    parser.add_argument(
-        "--basic-auth", "-B", help="Basic auth tuple like user:pass"
-    )
+    parser.add_argument("--basic-auth", "-B", help="Basic auth tuple like user:pass")
     parser.add_argument(
         "--timeout",
         "-t",
index 4c5791fc8df1152a68ad23ba3efaacc54fb91aaf..2d041e643d338eb12aa887a9917cf19defd9d236 100755 (executable)
@@ -6,13 +6,16 @@
 #
 # See: https://github.com/python/cpython/blob/main/Lib/http/server.py
 
-from functools import partial
-from http import HTTPStatus
-from http.server import HTTPServer, SimpleHTTPRequestHandler
+import argparse
+import base64
 import os
 import signal
 import socket
 import sys
+from functools import partial
+from http import HTTPStatus
+from http.server import HTTPServer, SimpleHTTPRequestHandler
+from pathlib import Path
 
 
 class AuthenticationError(Exception):
@@ -23,35 +26,32 @@ class PUTEnabledHTTPRequestHandler(SimpleHTTPRequestHandler):
     def __init__(self, *args, basic_auth=None, **kwargs):
         self.basic_auth = None
         if basic_auth:
-            import base64
-
-            self.basic_auth = base64.b64encode(
-                basic_auth.encode("ascii")
-            ).decode("ascii")
+            self.basic_auth = base64.b64encode(basic_auth.encode("ascii")).decode(
+                "ascii"
+            )
         super().__init__(*args, **kwargs)
 
-    def do_GET(self):
+    def do_GET(self):  # noqa: N802
         try:
             self._handle_auth()
             super().do_GET()
         except AuthenticationError:
             self.send_error(HTTPStatus.UNAUTHORIZED, "Need Authentication")
 
-    def do_HEAD(self):
+    def do_HEAD(self):  # noqa: N802
         try:
             self._handle_auth()
             super().do_HEAD()
         except AuthenticationError:
             self.send_error(HTTPStatus.UNAUTHORIZED, "Need Authentication")
 
-    def do_PUT(self):
-        path = self.translate_path(self.path)
-        os.makedirs(os.path.dirname(path), exist_ok=True)
+    def do_PUT(self):  # noqa: N802
+        path = Path(self.translate_path(self.path))
+        path.parent.mkdir(parents=True, exist_ok=True)
         try:
             self._handle_auth()
             file_length = int(self.headers["Content-Length"])
-            with open(path, "wb") as output_file:
-                output_file.write(self.rfile.read(file_length))
+            path.write_bytes(self.rfile.read(file_length))
             self.send_response(HTTPStatus.CREATED)
             self.send_header("Content-Length", "0")
             self.end_headers()
@@ -62,20 +62,18 @@ class PUTEnabledHTTPRequestHandler(SimpleHTTPRequestHandler):
                 HTTPStatus.INTERNAL_SERVER_ERROR, "Cannot open file for writing"
             )
 
-    def do_DELETE(self):
-        path = self.translate_path(self.path)
+    def do_DELETE(self):  # noqa: N802
+        path = Path(self.translate_path(self.path))
         try:
             self._handle_auth()
-            os.remove(path)
+            path.unlink()
             self.send_response(HTTPStatus.OK)
             self.send_header("Content-Length", "0")
             self.end_headers()
         except AuthenticationError:
             self.send_error(HTTPStatus.UNAUTHORIZED, "Need Authentication")
         except OSError:
-            self.send_error(
-                HTTPStatus.INTERNAL_SERVER_ERROR, "Cannot delete file"
-            )
+            self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, "Cannot delete file")
 
     def _handle_auth(self):
         if not self.basic_auth:
@@ -83,12 +81,8 @@ class PUTEnabledHTTPRequestHandler(SimpleHTTPRequestHandler):
         authorization = self.headers.get("authorization")
         if authorization:
             authorization = authorization.split()
-            if len(authorization) == 2:
-                if (
-                    authorization[0] == "Basic"
-                    and authorization[1] == self.basic_auth
-                ):
-                    return
+            if authorization == ["Basic", self.basic_auth]:
+                return
         raise AuthenticationError("Authentication required")
 
 
@@ -98,21 +92,18 @@ def _get_best_family(*address):
         type=socket.SOCK_STREAM,
         flags=socket.AI_PASSIVE,
     )
-    family, type, proto, canonname, sockaddr = next(iter(infos))
+    family, _type, _proto, _canonname, sockaddr = next(iter(infos))
     return family, sockaddr
 
 
-def run(HandlerClass, ServerClass, port, bind):
-    HandlerClass.protocol_version = "HTTP/1.1"
-    ServerClass.address_family, addr = _get_best_family(bind, port)
+def run(handler_class, server_class, port, bind):
+    handler_class.protocol_version = "HTTP/1.1"
+    server_class.address_family, addr = _get_best_family(bind, port)
 
-    with ServerClass(addr, HandlerClass) as httpd:
+    with server_class(addr, handler_class) as httpd:
         host, port = httpd.socket.getsockname()[:2]
         url_host = f"[{host}]" if ":" in host else host
-        print(
-            f"Serving HTTP on {host} port {port} "
-            f"(http://{url_host}:{port}/) ..."
-        )
+        print(f"Serving HTTP on {host} port {port} (http://{url_host}:{port}/) ...")
         try:
             httpd.serve_forever()
         except KeyboardInterrupt:
@@ -120,30 +111,26 @@ def run(HandlerClass, ServerClass, port, bind):
             sys.exit(0)
 
 
-def on_terminate(signum, frame):
+def on_terminate(signum, _frame):
     sys.stdout.flush()
     sys.stderr.flush()
     sys.exit(128 + signum)
 
 
 if __name__ == "__main__":
-    import argparse
-
     parser = argparse.ArgumentParser()
-    parser.add_argument(
-        "--basic-auth", "-B", help="Basic auth tuple like user:pass"
-    )
+    parser.add_argument("--basic-auth", "-B", help="Basic auth tuple like user:pass")
     parser.add_argument(
         "--bind",
         "-b",
         metavar="ADDRESS",
-        help="Specify alternate bind address " "[default: all interfaces]",
+        help="Specify alternate bind address [default: all interfaces]",
     )
     parser.add_argument(
         "--directory",
         "-d",
-        default=os.getcwd(),
-        help="Specify alternative directory " "[default:current directory]",
+        default=Path.cwd(),
+        help="Specify alternative directory [default:current directory]",
     )
     parser.add_argument(
         "port",
@@ -155,9 +142,7 @@ if __name__ == "__main__":
     )
     args = parser.parse_args()
 
-    handler_class = partial(
-        PUTEnabledHTTPRequestHandler, basic_auth=args.basic_auth
-    )
+    handler_class = partial(PUTEnabledHTTPRequestHandler, basic_auth=args.basic_auth)
 
     os.chdir(args.directory)
 
@@ -165,8 +150,8 @@ if __name__ == "__main__":
     signal.signal(signal.SIGTERM, on_terminate)
 
     run(
-        HandlerClass=handler_class,
-        ServerClass=HTTPServer,
+        handler_class=handler_class,
+        server_class=HTTPServer,
         port=args.port,
         bind=args.bind,
     )