]> git.ipfire.org Git - people/ms/pakfire.git/blame - src/pakfire/daemon.py
buildservice: Move the old stuff into the new wrapper
[people/ms/pakfire.git] / src / pakfire / daemon.py
CommitLineData
964aa579 1#!/usr/bin/python3
aa14071d 2
da170b36 3import asyncio
6f0cd275 4import functools
688e6846 5import glob
509aaad2 6import io
aa14071d 7import json
9b3f1378 8import logging
cae7ad0e 9import logging.handlers
aa14071d 10import multiprocessing
ef009305 11import os.path
cb7def8a 12import setproctitle
aa14071d 13import signal
e04443c2 14import socket
6f0cd275 15import tempfile
4ceab0c3 16
a5600261 17from . import _pakfire
f6111824 18from . import buildservice
4ceab0c3 19from . import config
414b1875 20from . import logger
aa14071d
MT
21
22from pakfire.constants import *
23from pakfire.i18n import _
24
050b130b
MT
25# Setup logging
26log = logging.getLogger("pakfire.daemon")
27
aa62aed6 28class Daemon(object):
9b3f1378 29 def __init__(self, config_file="daemon.conf", debug=False, verbose=False):
099aacb4
MT
30 self.config = config.Config(config_file)
31 self.debug = debug
32 self.verbose = verbose
4ceab0c3 33
414b1875
MT
34 # Setup logger
35 self.log = logger.setup(
07eea4dc 36 "pakfire",
414b1875 37 syslog_identifier="pakfire-daemon",
099aacb4 38 enable_console=self.verbose,
55f0f68d 39 debug=self.debug,
414b1875
MT
40 )
41
1ab7529b
MT
42 # Initialize the connection to the buildservice
43 self.service = buildservice.BuildService()
aa14071d 44
856711a8
MT
45 # Set when this process receives a shutdown signal
46 self._shutdown_signalled = None
aa14071d 47
9a9926fd 48 # List of worker processes.
891fb709 49 self.workers = []
aa14071d 50
1479df35
MT
51 # Stats Connection
52 self.stats = None
53
ef009305
MT
54 @property
55 def ccache_path(self):
56 """
57 Returns the ccache path
58 """
59 return self.config.get("daemon", "ccache_path", "/var/cache/pakfire/ccache")
60
da170b36 61 async def run(self):
aa14071d
MT
62 """
63 Main loop.
64 """
65 # Register signal handlers.
66 self.register_signal_handlers()
67
856711a8
MT
68 # Initialize shutdown signal
69 self._shutdown_signalled = asyncio.Event()
70
9216b623 71 # Create the control connection
1ab7529b 72 self.control = await self.service.control(daemon=self)
76960e90 73
da170b36 74 # Run main loop
856711a8 75 while True:
9216b623
MT
76 # Submit stats
77 await self.control.submit_stats()
78
856711a8
MT
79 # Check if we are running by awaiting the shutdown signal
80 try:
81 await asyncio.wait_for(self._shutdown_signalled.wait(), timeout=5)
82 break
83 except asyncio.TimeoutError:
84 pass
85
aa14071d
MT
86 # Main loop has ended, but we wait until all workers have finished.
87 self.terminate_all_workers()
88
89 def shutdown(self):
90 """
91 Terminates all workers and exists the daemon.
92 """
856711a8
MT
93 # Ignore if the main method has never been called
94 if not self._shutdown_signalled:
95 return
96
97 # Ignore, if we are already shutting down
98 if self._shutdown_signalled.is_set():
aa14071d
MT
99 return
100
414b1875 101 self.log.info(_("Shutting down..."))
856711a8 102 self._shutdown_signalled.set()
aa14071d 103
9216b623
MT
104 # Close the control connection
105 if self.control:
106 self.control.close()
0c94fe09 107
aa14071d
MT
108 def terminate_all_workers(self):
109 """
110 Terminates all workers.
111 """
891fb709
MT
112 self.log.debug("Sending SIGTERM to all workers")
113
114 # First send SIGTERM to all processes
aa14071d 115 for worker in self.workers:
891fb709
MT
116 worker.terminate()
117
118 self.log.debug("Waiting for workers to terminate")
aa14071d 119
fa736f5a 120 # Then wait until they all have finished.
fa736f5a
MT
121 for worker in self.workers:
122 worker.join()
9a9926fd 123
891fb709 124 self.log.debug("All workers have finished")
aa14071d 125
aa14071d
MT
126 # Signal handling.
127
128 def register_signal_handlers(self):
129 signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
130 signal.signal(signal.SIGINT, self.handle_SIGTERM)
131 signal.signal(signal.SIGTERM, self.handle_SIGTERM)
132
133 def handle_SIGCHLD(self, signum, frame):
134 """
135 Handle signal SIGCHLD.
136 """
891fb709
MT
137 # Find the worker process that has terminated
138 for worker in self.workers:
139 # Skip any workers that are still alive
140 if worker.is_alive():
141 continue
142
143 self.log.debug("Worker %s has terminated with status %s" % \
144 (worker.pid, worker.exitcode))
145
146 # Remove the worker from the list
147 try:
148 self.workers.remove(worker)
149 except ValueError:
150 pass
151
152 # Close the process
153 worker.close()
154
155 # We finish after handling one worker. If multiple workers have finished
156 # at the same time, this handler will be called again to handle it.
157 break
9a9926fd
MT
158
159 def handle_SIGTERM(self, signum, frame):
160 """
161 Handle signal SIGTERM.
162 """
163 # Just shut down.
164 self.shutdown()
165
76960e90
MT
166 def job_received(self, job):
167 """
168 Called when this builder was assigned a new job
169 """
891fb709 170 # Launch a new worker
db1162a2 171 worker = Worker(self, job)
891fb709 172 self.workers.append(worker)
76960e90 173
891fb709
MT
174 # Run it
175 worker.start()
176
177 self.log.debug("Spawned a new worker process as PID %s" % worker.pid)
76960e90 178
aa14071d 179
891fb709 180class Worker(multiprocessing.Process):
db1162a2 181 def __init__(self, daemon, data):
aa14071d 182 multiprocessing.Process.__init__(self)
db1162a2 183 self.daemon = daemon
891fb709 184
891fb709
MT
185 # The job that has been received
186 self.data = data
aa14071d 187
1ab7529b
MT
188 @property
189 def service(self):
190 return self.daemon.service
191
192 @property
193 def log(self):
194 return self.daemon.log
195
aa14071d 196 def run(self):
891fb709 197 self.log.debug("Worker %s has launched" % self.pid)
aa14071d 198
891fb709
MT
199 # Register signal handlers
200 self.register_signal_handlers()
aa14071d 201
49841f06
MT
202 # Run everything from here asynchronously
203 asyncio.run(self._work())
204
205 self.log.debug("Worker %s terminated gracefully" % self.pid)
206
a6add487
MT
207 def is_test(self):
208 """
209 Returns True if this job is a test job
210 """
211 return self.data.get("test", False)
212
4c9c875d
MT
213 @property
214 def job_id(self):
215 return self.data.get("id")
216
ef009305
MT
217 @property
218 def ccache(self):
219 """
220 ccache settings
221 """
222 return self.data.get("ccache", {})
223
224 @property
225 def ccache_enabled(self):
226 return self.ccache.get("enabled", False)
227
228 @property
229 def ccache_path(self):
230 """
231 The ccache path for this job
232 """
233 path = self.ccache.get("path", None)
234
235 if path:
236 return os.path.join(self.daemon.ccache_path, path)
237
49841f06
MT
238 async def _work(self):
239 """
240 Called from the async IO loop doing all the work
241 """
2dc6117b
MT
242 success = False
243
4c9c875d
MT
244 # Check if we have received some useful data
245 if not self.job_id:
cb7def8a
MT
246 raise ValueError("Did not receive a job ID")
247
248 # Set the process title
4c9c875d 249 setproctitle.setproctitle("pakfire-worker job %s" % self.job_id)
cb7def8a 250
6f0cd275
MT
251 # Fetch the build architecture
252 arch = self.data.get("arch")
253
6f0cd275
MT
254 # Fetch the package URL
255 pkg = self.data.get("pkg")
256 if not pkg:
257 raise ValueError("Did not received a package URL")
258
1ab7529b
MT
259 # Connect to the service
260 self.job = await self.service.job(self.job_id, worker=self)
cae7ad0e
MT
261
262 # Setup build logger
263 logger = BuildLogger(self.log, self.job)
264
688e6846
MT
265 # Create a temporary directory in which the built packages will be copied
266 with tempfile.TemporaryDirectory(prefix="pakfire-packages-") as target:
1479df35 267 packages = []
2dc6117b 268
688e6846
MT
269 # Run the build
270 try:
f4497d73 271 build = self.build(pkg, arch=arch, target=target,
4c9c875d 272 logger=logger._log, build_id=self.job_id,
ef009305
MT
273
274 # Always disable using snapshots
275 disable_snapshot=True,
276
277 # ccache
278 disable_ccache=not self.ccache_enabled,
279 ccache_path=self.ccache_path,
280 )
688e6846
MT
281
282 # Wait until the build process is done and stream the log in the meantime
283 while not build.done():
284 await logger.stream(timeout=1)
285
68bc587f
MT
286 # Await the build task (which would raise any exceptions)
287 await build
288
688e6846
MT
289 # Catch any other Exception
290 except Exception as e:
291 raise e
292
293 # The build has finished successfully
294 else:
295 success = True
296
297 # Find any packages
a6add487
MT
298 if not self.is_test():
299 packages = glob.glob("%s/*.pfm" % target)
688e6846 300
1ab7529b 301 # Notify the service that the job has finished
688e6846 302 finally:
1479df35 303 await self.job.finished(
688e6846 304 success=success,
1479df35 305 logfile=logger.logfile.name,
688e6846
MT
306 packages=packages,
307 )
cae7ad0e 308
f4497d73 309 def build(self, *args, **kwargs):
cae7ad0e
MT
310 """
311 Sets up a new Pakfire instance and runs it in a new thread.
312
313 This method returns an async.Task() object which can be used to track
314 if this job is still running.
315 """
f4497d73
MT
316 thread = asyncio.to_thread(self._build, *args, **kwargs)
317
318 return asyncio.create_task(thread)
319
320 def _build(self, pkg, arch=None, logger=None, **kwargs):
6f0cd275 321 # Setup Pakfire instance
89a99cc8 322 p = _pakfire.Pakfire(arch=arch, conf=self.pakfire_conf, logger=logger)
6f0cd275 323
f4497d73
MT
324 # Run the build
325 return p.build(pkg, **kwargs)
aa14071d 326
aa14071d 327 def shutdown(self):
30b3c555 328 self.log.debug("Shutting down worker %s" % self.pid)
aa14071d 329
30b3c555 330 # XXX figure out what to do, when a build is running
aa14071d 331
4f872b98
MT
332 def abort(self, *args, **kwargs):
333 """
334 Called to abort a running build immediately
335 """
336 log.warning("Build job has been aborted")
337
338 # XXX TODO
339
aa14071d
MT
340 # Signal handling.
341
342 def register_signal_handlers(self):
343 signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
344 signal.signal(signal.SIGINT, self.handle_SIGTERM)
345 signal.signal(signal.SIGTERM, self.handle_SIGTERM)
346
347 def handle_SIGCHLD(self, signum, frame):
348 """
349 Handle signal SIGCHLD.
350 """
351 # Must be here so that SIGCHLD won't be propagated to
352 # PakfireDaemon.
353 pass
354
355 def handle_SIGTERM(self, signum, frame):
356 """
357 Handle signal SIGTERM.
358 """
359 self.shutdown()
6f0cd275
MT
360
361 @functools.cached_property
362 def pakfire_conf(self):
363 """
364 Writes the pakfire configuration to file and returns its path
365 """
366 conf = self.data.get("conf")
367
050b130b
MT
368 # Dump pakfire configuration
369 log.debug("Pakfire configuration:\n%s" % conf)
370
509aaad2 371 return io.StringIO(conf)
cae7ad0e
MT
372
373
374class BuildLogger(object):
375 """
376 This class groups together all sorts of logging.
377 """
378 def __init__(self, log, job):
379 self.log = log
380 self.job = job
381
382 # Create a logfile
e24e6cf3 383 self.logfile = tempfile.NamedTemporaryFile(mode="w")
cae7ad0e
MT
384
385 # Create a FIFO queue to buffer any log messages
386 self.queue = asyncio.Queue()
387
388 # Create a new logger
389 self.logger = self.log.getChild(self.job.id)
cae7ad0e
MT
390 self.logger.setLevel(logging.DEBUG)
391
392 # Log everything to the queue
393 handler = logging.handlers.QueueHandler(self.queue)
4b26caf0 394 handler.setLevel(logging.INFO)
cae7ad0e
MT
395 self.logger.addHandler(handler)
396
397 # Log everything to the file
398 handler = logging.StreamHandler(self.logfile)
4b26caf0 399 handler.setLevel(logging.INFO)
cae7ad0e
MT
400 self.logger.addHandler(handler)
401
402 def _log(self, level, message):
a0088f44
MT
403 # Remove any trailing newline (but only one)
404 if message:
405 message = message.removesuffix("\n")
406
cae7ad0e
MT
407 return self.logger.log(level, message)
408
409 async def stream(self, timeout=0):
cae7ad0e
MT
410 while True:
411 # Fetch a message from the queue
6d97c00f
MT
412 try:
413 message = await asyncio.wait_for(self.queue.get(), timeout=timeout)
414
415 # If we did not receive any messages within the timeout,
416 # we return control back to the caller
417 except asyncio.TimeoutError as e:
418 break
419
420 # Ignore any empty messages
cae7ad0e
MT
421 if message is None:
422 continue
423
1ab7529b 424 # Send message to the service
47bf5891 425 await self.job.log(message.created, message.levelno, message.getMessage())