# #
###############################################################################
-import base64
+import functools
import hashlib
+import json
import logging
-import math
import os.path
-import time
-
-from . import http
+import tornado.httpclient
+import urllib.parse
from .constants import *
+# Setup logging
log = logging.getLogger("pakfire.hub")
-log.propagate = 1
+
+# Configure some useful defaults for all requests
+tornado.httpclient.AsyncHTTPClient.configure(
+ None, defaults = {
+ "user_agent" : "pakfire/%s" % PAKFIRE_VERSION,
+ },
+)
class Hub(object):
- def __init__(self, huburl, username, password):
+ def __init__(self, url, username, password):
+ self.url = url
self.username = username
self.password = password
# Initialise the HTTP client
- self.http = http.Client(baseurl=huburl)
+ self.client = tornado.httpclient.AsyncHTTPClient()
- @property
- def _request_args(self):
- """
- Arguments sent with each request
- """
- return {
- "auth" : (self.username, self.password),
- }
+ # XXX support proxies
- def _request(self, *args, **kwargs):
- """
- Wrapper function around the HTTP Client request()
- function that adds authentication, etc.
- """
- kwargs.update(self._request_args)
+ async def _request(self, method, path, body_producer=None, **kwargs):
+ # Make absolute URL
+ url = urllib.parse.urljoin(self.url, path)
- return self.http.request(*args, **kwargs)
+ # Add query arguments
+ if method in ("GET", "PUT"):
+ url = "%s?%s" % (url, urllib.parse.urlencode(kwargs))
- def _upload(self, *args, **kwargs):
- kwargs.update(self._request_args)
+ # Make the request
+ req = tornado.httpclient.HTTPRequest(
+ method=method, url=url,
- return self.http.upload(*args, **kwargs)
+ # Authentication
+ auth_username=self.username, auth_password=self.password,
+
+ # All all the rest
+ body_producer=body_producer,
+ )
+
+ # Send the request and wait for a response
+ res = await self.client.fetch(req)
+
+ # XXX Do we have to catch any errors here?
+
+ # Decode JSON response
+ if res.body:
+ return json.loads(res.body)
+
+ # Empty response
+ return {}
# Test functions
# File uploads
- def upload_file(self, path):
- # Send some basic information to the hub
- # and request an upload ID
- data = {
- "filename" : os.path.basename(path),
- "filesize" : os.path.getsize(path),
- }
- upload_id = self._request("/uploads/create", method="GET",
- decode="ascii", data=data)
+ async def upload(self, path, filename=None):
+ """
+ Uploads the file to the hub returning the upload ID
+ """
+ log.debug("Uploading %s..." % path)
+
+ # Use the basename of the file if no name was given
+ if filename is None:
+ filename = os.path.basename(path)
+
+ # Determine the filesize
+ size = os.path.getsize(path)
+
+ # Compute a digest
+ digest = self._compute_digest("blake2b", path)
+
+ # Prepare the file for streaming
+ body_producer = functools.partial(self._stream_file, path)
+
+ # Perform upload
+ response = await self._request("PUT", "/upload",
+ body_producer=body_producer,
+ filename=filename, size=size, digest=digest
+ )
+
+ # Return the upload ID
+ return response.get("id")
+
+ @staticmethod
+ def _stream_file(path, write):
+ with open(path, "rb") as f:
+ while True:
+ buf = f.read(64 * 1024)
+ if not buf:
+ break
+
+ write(buf)
+
+ @staticmethod
+ def _compute_digest(algo, path):
+ h = hashlib.new(algo)
- log.debug("Upload ID: %s" % upload_id)
+ with open(path, "rb") as f:
+ while True:
+ buf = f.read(64 * 1024)
+ if not buf:
+ break
- # Upload the data
- self._upload("/uploads/stream?id=%s" % upload_id, path)
+ h.update(buf)
- return upload_id
\ No newline at end of file
+ return "%s:%s" % (algo, h.hexdigest())