#
# See: https://github.com/python/cpython/blob/main/Lib/http/server.py
-from functools import partial
-from http import HTTPStatus
-from http.server import HTTPServer, SimpleHTTPRequestHandler
+import argparse
+import base64
import os
import signal
import socket
import sys
+from functools import partial
+from http import HTTPStatus
+from http.server import HTTPServer, SimpleHTTPRequestHandler
+from pathlib import Path
class AuthenticationError(Exception):
def __init__(self, *args, basic_auth=None, **kwargs):
self.basic_auth = None
if basic_auth:
- import base64
-
- self.basic_auth = base64.b64encode(
- basic_auth.encode("ascii")
- ).decode("ascii")
+ self.basic_auth = base64.b64encode(basic_auth.encode("ascii")).decode(
+ "ascii"
+ )
super().__init__(*args, **kwargs)
- def do_GET(self):
+ def do_GET(self): # noqa: N802
try:
self._handle_auth()
super().do_GET()
except AuthenticationError:
self.send_error(HTTPStatus.UNAUTHORIZED, "Need Authentication")
- def do_HEAD(self):
+ def do_HEAD(self): # noqa: N802
try:
self._handle_auth()
super().do_HEAD()
except AuthenticationError:
self.send_error(HTTPStatus.UNAUTHORIZED, "Need Authentication")
- def do_PUT(self):
- path = self.translate_path(self.path)
- os.makedirs(os.path.dirname(path), exist_ok=True)
+ def do_PUT(self): # noqa: N802
+ path = Path(self.translate_path(self.path))
+ path.parent.mkdir(parents=True, exist_ok=True)
try:
self._handle_auth()
file_length = int(self.headers["Content-Length"])
- with open(path, "wb") as output_file:
- output_file.write(self.rfile.read(file_length))
+ path.write_bytes(self.rfile.read(file_length))
self.send_response(HTTPStatus.CREATED)
self.send_header("Content-Length", "0")
self.end_headers()
HTTPStatus.INTERNAL_SERVER_ERROR, "Cannot open file for writing"
)
- def do_DELETE(self):
- path = self.translate_path(self.path)
+ def do_DELETE(self): # noqa: N802
+ path = Path(self.translate_path(self.path))
try:
self._handle_auth()
- os.remove(path)
+ path.unlink()
self.send_response(HTTPStatus.OK)
self.send_header("Content-Length", "0")
self.end_headers()
except AuthenticationError:
self.send_error(HTTPStatus.UNAUTHORIZED, "Need Authentication")
except OSError:
- self.send_error(
- HTTPStatus.INTERNAL_SERVER_ERROR, "Cannot delete file"
- )
+ self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, "Cannot delete file")
def _handle_auth(self):
if not self.basic_auth:
authorization = self.headers.get("authorization")
if authorization:
authorization = authorization.split()
- if len(authorization) == 2:
- if (
- authorization[0] == "Basic"
- and authorization[1] == self.basic_auth
- ):
- return
+ if authorization == ["Basic", self.basic_auth]:
+ return
raise AuthenticationError("Authentication required")
type=socket.SOCK_STREAM,
flags=socket.AI_PASSIVE,
)
- family, type, proto, canonname, sockaddr = next(iter(infos))
+ family, _type, _proto, _canonname, sockaddr = next(iter(infos))
return family, sockaddr
-def run(HandlerClass, ServerClass, port, bind):
- HandlerClass.protocol_version = "HTTP/1.1"
- ServerClass.address_family, addr = _get_best_family(bind, port)
+def run(handler_class, server_class, port, bind):
+ handler_class.protocol_version = "HTTP/1.1"
+ server_class.address_family, addr = _get_best_family(bind, port)
- with ServerClass(addr, HandlerClass) as httpd:
+ with server_class(addr, handler_class) as httpd:
host, port = httpd.socket.getsockname()[:2]
url_host = f"[{host}]" if ":" in host else host
- print(
- f"Serving HTTP on {host} port {port} "
- f"(http://{url_host}:{port}/) ..."
- )
+ print(f"Serving HTTP on {host} port {port} (http://{url_host}:{port}/) ...")
try:
httpd.serve_forever()
except KeyboardInterrupt:
sys.exit(0)
-def on_terminate(signum, frame):
+def on_terminate(signum, _frame):
sys.stdout.flush()
sys.stderr.flush()
sys.exit(128 + signum)
if __name__ == "__main__":
- import argparse
-
parser = argparse.ArgumentParser()
- parser.add_argument(
- "--basic-auth", "-B", help="Basic auth tuple like user:pass"
- )
+ parser.add_argument("--basic-auth", "-B", help="Basic auth tuple like user:pass")
parser.add_argument(
"--bind",
"-b",
metavar="ADDRESS",
- help="Specify alternate bind address " "[default: all interfaces]",
+ help="Specify alternate bind address [default: all interfaces]",
)
parser.add_argument(
"--directory",
"-d",
- default=os.getcwd(),
- help="Specify alternative directory " "[default:current directory]",
+ default=Path.cwd(),
+ help="Specify alternative directory [default:current directory]",
)
parser.add_argument(
"port",
)
args = parser.parse_args()
- handler_class = partial(
- PUTEnabledHTTPRequestHandler, basic_auth=args.basic_auth
- )
+ handler_class = partial(PUTEnabledHTTPRequestHandler, basic_auth=args.basic_auth)
os.chdir(args.directory)
signal.signal(signal.SIGTERM, on_terminate)
run(
- HandlerClass=handler_class,
- ServerClass=HTTPServer,
+ handler_class=handler_class,
+ server_class=HTTPServer,
port=args.port,
bind=args.bind,
)