import time
import urllib.request
+
def run(url, timeout, basic_auth):
deadline = time.time() + timeout
req = urllib.request.Request(url, method="HEAD")
if basic_auth:
import base64
- encoded_credentials = base64.b64encode(basic_auth.encode("ascii")).decode("ascii")
+
+ encoded_credentials = base64.b64encode(
+ basic_auth.encode("ascii")
+ ).decode("ascii")
req.add_header("Authorization", f"Basic {encoded_credentials}")
while True:
try:
except urllib.error.URLError as e:
print(e.reason)
if time.time() > deadline:
- print(f"All connection attempts failed within {timeout} seconds.")
+ print(
+ f"All connection attempts failed within {timeout} seconds."
+ )
sys.exit(1)
time.sleep(0.5)
+
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
- parser.add_argument('--basic-auth', '-B',
- help='Basic auth tuple like user:pass')
- parser.add_argument('--timeout', '-t', metavar='TIMEOUT',
- default=10, type=int,
- help='Maximum seconds to wait for successful connection attempt '
- '[default: 10 seconds]')
- parser.add_argument('url',
- type=str,
- help='URL to connect to')
+ parser.add_argument(
+ "--basic-auth", "-B", help="Basic auth tuple like user:pass"
+ )
+ parser.add_argument(
+ "--timeout",
+ "-t",
+ metavar="TIMEOUT",
+ default=10,
+ type=int,
+ help="Maximum seconds to wait for successful connection attempt "
+ "[default: 10 seconds]",
+ )
+ parser.add_argument("url", type=str, help="URL to connect to")
args = parser.parse_args()
run(
import socket
import sys
+
class AuthenticationError(Exception):
pass
+
class PUTEnabledHTTPRequestHandler(SimpleHTTPRequestHandler):
def __init__(self, *args, basic_auth=None, **kwargs):
self.basic_auth = None
if basic_auth:
import base64
- self.basic_auth = base64.b64encode(basic_auth.encode("ascii")).decode("ascii")
+
+ self.basic_auth = base64.b64encode(
+ basic_auth.encode("ascii")
+ ).decode("ascii")
super().__init__(*args, **kwargs)
def do_GET(self):
path = self.translate_path(self.path)
try:
self._handle_auth()
- file_length = int(self.headers['Content-Length'])
- with open(path, 'wb') as output_file:
+ file_length = int(self.headers["Content-Length"])
+ with open(path, "wb") as output_file:
output_file.write(self.rfile.read(file_length))
self.send_response(HTTPStatus.CREATED)
self.send_header("Content-Length", "0")
except AuthenticationError:
self.send_error(HTTPStatus.UNAUTHORIZED, "Need Authentication")
except OSError:
- self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, "Cannot open file for writing")
+ self.send_error(
+ HTTPStatus.INTERNAL_SERVER_ERROR, "Cannot open file for writing"
+ )
def do_DELETE(self):
path = self.translate_path(self.path)
except AuthenticationError:
self.send_error(HTTPStatus.UNAUTHORIZED, "Need Authentication")
except OSError:
- self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR, "Cannot delete file")
+ self.send_error(
+ HTTPStatus.INTERNAL_SERVER_ERROR, "Cannot delete file"
+ )
def _handle_auth(self):
if not self.basic_auth:
if authorization:
authorization = authorization.split()
if len(authorization) == 2:
- if authorization[0] == "Basic" and authorization[1] == self.basic_auth:
+ if (
+ authorization[0] == "Basic"
+ and authorization[1] == self.basic_auth
+ ):
return
raise AuthenticationError("Authentication required")
+
def _get_best_family(*address):
infos = socket.getaddrinfo(
*address,
family, type, proto, canonname, sockaddr = next(iter(infos))
return family, sockaddr
+
def run(HandlerClass, ServerClass, port, bind):
HandlerClass.protocol_version = "HTTP/1.1"
ServerClass.address_family, addr = _get_best_family(bind, port)
with ServerClass(addr, HandlerClass) as httpd:
host, port = httpd.socket.getsockname()[:2]
- url_host = f'[{host}]' if ':' in host else host
+ url_host = f"[{host}]" if ":" in host else host
print(
f"Serving HTTP on {host} port {port} "
f"(http://{url_host}:{port}/) ..."
print("\nKeyboard interrupt received, exiting.")
sys.exit(0)
+
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
- parser.add_argument('--basic-auth', '-B',
- help='Basic auth tuple like user:pass')
- parser.add_argument('--bind', '-b', metavar='ADDRESS',
- help='Specify alternate bind address '
- '[default: all interfaces]')
- parser.add_argument('--directory', '-d', default=os.getcwd(),
- help='Specify alternative directory '
- '[default:current directory]')
- parser.add_argument('port', action='store',
- default=8080, type=int,
- nargs='?',
- help='Specify alternate port [default: 8080]')
+ parser.add_argument(
+ "--basic-auth", "-B", help="Basic auth tuple like user:pass"
+ )
+ parser.add_argument(
+ "--bind",
+ "-b",
+ metavar="ADDRESS",
+ help="Specify alternate bind address " "[default: all interfaces]",
+ )
+ parser.add_argument(
+ "--directory",
+ "-d",
+ default=os.getcwd(),
+ help="Specify alternative directory " "[default:current directory]",
+ )
+ parser.add_argument(
+ "port",
+ action="store",
+ default=8080,
+ type=int,
+ nargs="?",
+ help="Specify alternate port [default: 8080]",
+ )
args = parser.parse_args()
- handler_class = partial(PUTEnabledHTTPRequestHandler, basic_auth=args.basic_auth)
+ handler_class = partial(
+ PUTEnabledHTTPRequestHandler, basic_auth=args.basic_auth
+ )
os.chdir(args.directory)