# Connect to the Pakfire Hub
self.hub = self.connect_to_hub()
- self.queue = None
+ self.control = None
# Set when this process receives a shutdown signal
self._shutdown_signalled = None
# Initialize shutdown signal
self._shutdown_signalled = asyncio.Event()
- # Send builder information
- self.stats = await self.hub.builder_stats()
-
- # Join the job queue
- self.queue = await self.hub.queue(self.job_received)
+ # Create the control connection
+ self.control = await self.hub.control(daemon=self)
# Run main loop
while True:
+ # Submit stats
+ await self.control.submit_stats()
+
# Check if we are running by awaiting the shutdown signal
try:
await asyncio.wait_for(self._shutdown_signalled.wait(), timeout=5)
except asyncio.TimeoutError:
pass
- # Send some information about this builder
- if self.stats:
- await self.stats.submit()
-
# Main loop has ended, but we wait until all workers have finished.
self.terminate_all_workers()
self.log.info(_("Shutting down..."))
self._shutdown_signalled.set()
- # Close queue connection so we won't receive any new jobs
- if self.queue:
- self.queue.close()
+ # Close the control connection
+ if self.control:
+ self.control.close()
def terminate_all_workers(self):
"""
# Builder
- async def builder_stats(self):
+ async def control(self, *args, **kwargs):
"""
- Sends information about this host to the hub.
-
- This information is something that doesn't change during
- the lifetime of the daemon.
- """
- return await self._proxy(BuilderStats)
-
- async def queue(self, job_received_callback):
+ Creates a control connection
"""
- Connects to the hub and asks for a build job
- """
- on_message_callback = functools.partial(
- self._decode_json_message, job_received_callback,
- )
-
- return await self._proxy(QueueProxy, on_message_callback=on_message_callback)
+ return await self._proxy(ControlConnection, *args, **kwargs)
@staticmethod
def _decode_json_message(callback, message):
self.conn = None
# Callbacks
- self.on_message_callback = on_message_callback
+ self._on_message_callback = on_message_callback
# Perform custom initialization
self.init(*args, **kwargs)
else:
return
+ def close(self):
+ """
+ Closes the connection
+ """
+ if self.conn:
+ self.conn.close()
+
+ def on_message_callback(self, message):
+ if self._on_message_callback:
+ return self._on_message_callback(message)
+
async def write_message(self, message, **kwargs):
"""
Sends a message but encodes it into JSON first
return await self.write_message(message, **kwargs)
-class QueueProxy(HubObject):
- url = "/api/v1/jobs/queue"
+class ControlConnection(HubObject):
+ url = "/api/v1/builders/control"
+ def init(self, daemon):
+ self.daemon = daemon
-class BuilderStats(HubObject):
- """
- Proxy for Builder Stats
- """
- def init(self):
# Fetch processor information
self.cpu = cpuinfo.get_cpu_info()
# Fetch the native architecture
self.native_arch = _pakfire.native_arch()
- @property
- def url(self):
- return "/api/v1/builders/stats"
+ def on_message_callback(self, message):
+ print(message)
- async def submit(self):
- log.debug("Sending stat message to hub...")
+ async def submit_stats(self):
+ """
+ Sends stats about this builder
+ """
+ log.debug("Sending stats...")
# Fetch processor information
cpu_times = psutil.cpu_times_percent()
loadavg = psutil.getloadavg()
await self.write_message({
+ "type" : "stats",
+
# CPU info
"cpu_model" : self.cpu.get("brand_raw"),
"cpu_count" : self.cpu.get("count"),