# #
###############################################################################
+import asyncio
import cpuinfo
import functools
import hashlib
import logging
import os.path
import psutil
+import subprocess
+import tempfile
import tornado.escape
import tornado.httpclient
+import tornado.simple_httpclient
import tornado.websocket
import urllib.parse
# XXX support proxies
- async def _request(self, method, path, websocket=False, authenticate=True,
+ async def _socket(self, path, **kwargs):
+ return await self._request("GET", path,
+
+ # Enable websocket and ping once every ten seconds
+ websocket=True,
+ websocket_ping_interval=10,
+ websocket_ping_timeout=60,
+
+ **kwargs,
+ )
+
+ async def _request(self, method, path, websocket=False, websocket_ping_interval=None,
+ websocket_ping_timeout=None, authenticate=True,
body=None, body_producer=None, on_message_callback=None, **kwargs):
headers = {}
query_args = {}
if websocket:
return await tornado.websocket.websocket_connect(
req,
+ ping_interval=websocket_ping_interval,
+ ping_timeout=websocket_ping_timeout,
on_message_callback=on_message_callback,
)
# Empty response
return {}
+ async def _proxy(self, cls, *args, **kwargs):
+ conn = cls(self, *args, **kwargs)
+
+ # Create the initial connection
+ await conn.reconnect()
+
+ return conn
+
def _setup_krb5_context(self, url):
"""
Creates the Kerberos context that can be used to perform client
# Build actions
def get_build(self, uuid):
- return self._request("/builds/%s" % uuid, decode="json")
+ return self._request("/api/v1/builds/%s" % uuid, decode="json")
async def build(self, path, repo=None, arches=None):
"""
log.debug("%s has been uploaded as %s" % (path, upload_id))
# Create a new build
- build_id = await self._request("POST", "/builds",
+ build_id = await self._request("POST", "/api/v1/builds",
upload_id=upload_id, repo=repo, arches=arches)
log.debug("Build creates as %s" % build_id)
"""
Returns a list of all uploads
"""
- response = await self._request("GET", "/uploads")
+ response = await self._request("GET", "/api/v1/uploads")
return response.get("uploads")
body_producer = functools.partial(self._stream_file, path, size, p)
# Perform upload
- response = await self._request("PUT", "/uploads",
+ response = await self._request("PUT", "/api/v1/uploads",
body_producer=body_producer,
filename=filename, size=size, digest=digest
)
return response.get("id")
async def delete_upload(self, upload_id):
- await self._request("DELETE", "/uploads", id=upload_id)
+ await self._request("DELETE", "/api/v1/uploads/%s" % upload_id)
async def upload_multi(self, *paths, show_progress=True):
"""
# Builder
- async def send_builder_info(self):
+ async def builder_stats(self):
"""
Sends information about this host to the hub.
This information is something that doesn't change during
the lifetime of the daemon.
"""
- log.info(_("Sending builder information to hub..."))
+ return await self._proxy(BuilderStats)
- # Fetch processor information
- cpu = cpuinfo.get_cpu_info()
+ async def queue(self, job_received_callback):
+ """
+ Connects to the hub and asks for a build job
+ """
+ on_message_callback = functools.partial(
+ self._decode_json_message, job_received_callback,
+ )
- data = {
- # CPU info
- "cpu_model" : cpu.get("brand_raw"),
- "cpu_count" : cpu.get("count"),
- "cpu_arch" : _pakfire.native_arch(),
+ return await self._proxy(QueueProxy, on_message_callback=on_message_callback)
- # Pakfire + OS
- "pakfire_version" : PAKFIRE_VERSION,
- "os_name" : util.get_distro_name(),
- }
+ @staticmethod
+ def _decode_json_message(callback, message):
+ """
+ Takes a received message and decodes it.
+
+ It will then call the callback with the decoded message.
+ """
+ # Ignore empty messages
+ if message is None:
+ return
+
+ try:
+ message = json.loads(message)
+ except json.JSONDecodeError:
+ log.error("Could not decode JSON message:\n%s" % message)
+ return
+
+ return callback(message)
+
+ async def job(self, job_id):
+ """
+ Connects to the given job
+ """
+ # Return a Job proxy
+ return await self._proxy(Job, job_id)
+
+
+class HubObject(object):
+ # Disable Nagle's algorithm?
+ nodelay = False
+
+ def __init__(self, hub, *args, on_message_callback=None, **kwargs):
+ self.hub = hub
+
+ # The active connection
+ self.conn = None
+
+ # Callbacks
+ self.on_message_callback = on_message_callback
+
+ # Perform custom initialization
+ self.init(*args, **kwargs)
+
+ def init(self, *args, **kwargs):
+ pass
+
+ @property
+ def url(self):
+ raise NotImplementedError
+
+ async def connect(self):
+ """
+ This will create a connection
+ """
+ conn = await self.hub._socket(self.url,
+ on_message_callback=self.on_message_callback)
+
+ # Disable Nagle's algorithm
+ if self.nodelay:
+ conn.set_nodelay(True)
+
+ return conn
+
+ async def reconnect(self):
+ """
+ Tries to reconnect for forever
+ """
+ attempts = 0
+
+ while True:
+ attempts += 1
+
+ log.debug("Trying to reconnect (attempt %s)..." % attempts)
+
+ try:
+ self.conn = await self.connect()
+
+ # The web service responded with some error
+ except tornado.httpclient.HTTPClientError as e:
+ log.error("%s: Received HTTP Error %s" % (self.url, e.code))
+
+ # If the service is down we will retry in 10 seconds
+ if e.code == 503:
+ await asyncio.sleep(10)
+
+ # Raise any unhandled errors
+ else:
+ raise e
+
+ # The web service did not respond in time
+ except tornado.simple_httpclient.HTTPTimeoutError as e:
+ await asyncio.sleep(30)
+
+ # Raise all other exceptions
+ except Exception as e:
+ raise e
+
+ # If the connection was established successfully, we return
+ else:
+ return
+
+ async def write_message(self, message, **kwargs):
+ """
+ Sends a message but encodes it into JSON first
+ """
+ # This should never happen
+ if not self.conn:
+ raise RuntimeError("Not connected")
+
+ if isinstance(message, dict):
+ message = tornado.escape.json_encode(message)
+
+ try:
+ return await self.conn.write_message(message, **kwargs)
- # Send request
- await self._request("POST", "/builders/info", **data)
+ except tornado.websocket.WebSocketClosedError as e:
+ # Try to reconnect
+ await self.reconnect()
- async def send_builder_stats(self):
+ # Try to send the message again
+ return await self.write_message(message, **kwargs)
+
+
+class QueueProxy(HubObject):
+ url = "/api/v1/jobs/queue"
+
+
+class BuilderStats(HubObject):
+ """
+ Proxy for Builder Stats
+ """
+ def init(self):
+ # Fetch processor information
+ self.cpu = cpuinfo.get_cpu_info()
+
+ # Fetch the native architecture
+ self.native_arch = _pakfire.native_arch()
+
+ @property
+ def url(self):
+ return "/api/v1/builders/stats"
+
+ async def submit(self):
log.debug("Sending stat message to hub...")
# Fetch processor information
- cpu = psutil.cpu_times_percent()
+ cpu_times = psutil.cpu_times_percent()
# Fetch memory/swap information
mem = psutil.virtual_memory()
# Fetch load average
loadavg = psutil.getloadavg()
- data = {
- # CPU
- "cpu_user" : cpu.user,
- "cpu_nice" : cpu.nice,
- "cpu_system" : cpu.system,
- "cpu_idle" : cpu.idle,
- "cpu_iowait" : cpu.iowait,
- "cpu_irq" : cpu.irq,
- "cpu_softirq" : cpu.softirq,
- "cpu_steal" : cpu.steal,
- "cpu_guest" : cpu.guest,
- "cpu_guest_nice" : cpu.guest_nice,
+ await self.write_message({
+ # CPU info
+ "cpu_model" : self.cpu.get("brand_raw"),
+ "cpu_count" : self.cpu.get("count"),
+ "cpu_arch" : self.native_arch,
+
+ # Pakfire + OS
+ "pakfire_version" : PAKFIRE_VERSION,
+ "os_name" : util.get_distro_name(),
+
+ # CPU Times
+ "cpu_user" : cpu_times.user,
+ "cpu_nice" : cpu_times.nice,
+ "cpu_system" : cpu_times.system,
+ "cpu_idle" : cpu_times.idle,
+ "cpu_iowait" : cpu_times.iowait,
+ "cpu_irq" : cpu_times.irq,
+ "cpu_softirq" : cpu_times.softirq,
+ "cpu_steal" : cpu_times.steal,
+ "cpu_guest" : cpu_times.guest,
+ "cpu_guest_nice" : cpu_times.guest_nice,
# Load average
"loadavg1" : loadavg[0],
"swap_total" : swap.total,
"swap_used" : swap.used,
"swap_free" : swap.free,
- }
-
- # Send request
- await self._request("POST", "/builders/stats", **data)
-
- async def queue(self, job_received_callback):
- """
- Connects to the hub and asks for a build job
- """
- on_message_callback = functools.partial(
- self._decode_json_message, job_received_callback)
-
- # Join the queue
- queue = await self._request("GET", "/queue", websocket=True, ping=10,
- on_message_callback=on_message_callback)
-
- log.debug("Joined the queue")
-
- return queue
-
- @staticmethod
- def _decode_json_message(callback, message):
- """
- Takes a received message and decodes it.
+ })
- It will then call the callback with the decoded message.
- """
- # Ignore empty messages
- if message is None:
- return
- try:
- message = json.loads(message)
- except json.JSONDecodeError:
- log.error("Could not decode JSON message:\n%s" % message)
- return
+class Job(HubObject):
+ """
+ Proxy for Build Jobs
+ """
+ def init(self, id):
+ self.id = id
- return callback(message)
+ @property
+ def url(self):
+ return "/api/v1/jobs/%s" % self.id
- async def job(self, id):
+ async def status(self, status):
"""
- Connects to the given job
+ Sends a new status to the hub
"""
- # Connect to the hub
- conn = await self._request("GET", "/jobs/%s/builder" % id,
- websocket=True, ping=10)
-
- # Return a Job proxy
- return Job(self, id, conn)
+ await self.write_message({
+ "message" : "status",
+ "status" : status,
+ })
- async def finish_job(self, job_id, success, packages=None, log=None):
+ async def finished(self, success, packages=None, logfile=None):
"""
Will tell the hub that a job has finished
"""
# Upload the log file
- if log:
- log = await self.upload(log, filename="%s.log" % job_id)
+ if logfile:
+ logfile = await self.hub.upload(logfile, filename="%s.log" % self.id)
# Upload the packages
if packages:
- packages = await self.upload_multi(*packages)
+ packages = await self.hub.upload_multi(*packages)
# Send the request
- response = await self._request("POST", "/jobs/%s/finished" % job_id,
- success=success, log=log, packages=packages)
+ await self.write_message({
+ "message" : "finished",
+ "success" : success,
+ "logfile" : logfile,
+ "packages" : packages,
+ })
# Handle the response
# XXX TODO
-
-class Job(object):
- """
- Proxy for Build Jobs
- """
- def __init__(self, hub, id, conn):
- self.hub = hub
- self.id = id
- self.conn = conn
-
- async def _write_message(self, message, **kwargs):
- """
- Sends a message but encodes it into JSON first
- """
- if isinstance(message, dict):
- message = tornado.escape.json_encode(message)
-
- return await self.conn.write_message(message, **kwargs)
-
- async def status(self, status):
- """
- Sends a new status to the hub
- """
- await self._write_message({
- "message" : "status",
- "status" : status,
- })
-
- async def log(self, level, message):
+ async def log(self, timestamp, level, message):
"""
Sends a log message to the hub
"""
- await self._write_message({
- "message" : "log",
- "level" : level,
- "log" : message,
+ await self.write_message({
+ "message" : "log",
+ "timestamp" : timestamp,
+ "level" : level,
+ "log" : message,
})