]>
Commit | Line | Data |
---|---|---|
964aa579 | 1 | #!/usr/bin/python3 |
aa14071d | 2 | |
da170b36 | 3 | import asyncio |
6f0cd275 | 4 | import functools |
688e6846 | 5 | import glob |
509aaad2 | 6 | import io |
aa14071d | 7 | import json |
9b3f1378 | 8 | import logging |
cae7ad0e | 9 | import logging.handlers |
aa14071d | 10 | import multiprocessing |
ef009305 | 11 | import os.path |
cb7def8a | 12 | import setproctitle |
aa14071d | 13 | import signal |
e04443c2 | 14 | import socket |
6f0cd275 | 15 | import tempfile |
4ceab0c3 | 16 | |
a5600261 | 17 | from . import _pakfire |
f6111824 | 18 | from . import buildservice |
4ceab0c3 | 19 | from . import config |
414b1875 | 20 | from . import logger |
aa14071d MT |
21 | |
22 | from pakfire.constants import * | |
23 | from pakfire.i18n import _ | |
24 | ||
050b130b MT |
25 | # Setup logging |
26 | log = logging.getLogger("pakfire.daemon") | |
27 | ||
aa62aed6 | 28 | class Daemon(object): |
9b3f1378 | 29 | def __init__(self, config_file="daemon.conf", debug=False, verbose=False): |
099aacb4 MT |
30 | self.config = config.Config(config_file) |
31 | self.debug = debug | |
32 | self.verbose = verbose | |
4ceab0c3 | 33 | |
414b1875 MT |
34 | # Setup logger |
35 | self.log = logger.setup( | |
07eea4dc | 36 | "pakfire", |
414b1875 | 37 | syslog_identifier="pakfire-daemon", |
099aacb4 | 38 | enable_console=self.verbose, |
55f0f68d | 39 | debug=self.debug, |
414b1875 MT |
40 | ) |
41 | ||
1ab7529b MT |
42 | # Initialize the connection to the buildservice |
43 | self.service = buildservice.BuildService() | |
aa14071d | 44 | |
856711a8 MT |
45 | # Set when this process receives a shutdown signal |
46 | self._shutdown_signalled = None | |
aa14071d | 47 | |
9a9926fd | 48 | # List of worker processes. |
891fb709 | 49 | self.workers = [] |
aa14071d | 50 | |
1479df35 MT |
51 | # Stats Connection |
52 | self.stats = None | |
53 | ||
ef009305 MT |
54 | @property |
55 | def ccache_path(self): | |
56 | """ | |
57 | Returns the ccache path | |
58 | """ | |
59 | return self.config.get("daemon", "ccache_path", "/var/cache/pakfire/ccache") | |
60 | ||
da170b36 | 61 | async def run(self): |
aa14071d MT |
62 | """ |
63 | Main loop. | |
64 | """ | |
65 | # Register signal handlers. | |
66 | self.register_signal_handlers() | |
67 | ||
856711a8 MT |
68 | # Initialize shutdown signal |
69 | self._shutdown_signalled = asyncio.Event() | |
70 | ||
9216b623 | 71 | # Create the control connection |
1ab7529b | 72 | self.control = await self.service.control(daemon=self) |
76960e90 | 73 | |
da170b36 | 74 | # Run main loop |
856711a8 | 75 | while True: |
9216b623 MT |
76 | # Submit stats |
77 | await self.control.submit_stats() | |
78 | ||
856711a8 MT |
79 | # Check if we are running by awaiting the shutdown signal |
80 | try: | |
81 | await asyncio.wait_for(self._shutdown_signalled.wait(), timeout=5) | |
82 | break | |
83 | except asyncio.TimeoutError: | |
84 | pass | |
85 | ||
aa14071d MT |
86 | # Main loop has ended, but we wait until all workers have finished. |
87 | self.terminate_all_workers() | |
88 | ||
89 | def shutdown(self): | |
90 | """ | |
91 | Terminates all workers and exists the daemon. | |
92 | """ | |
856711a8 MT |
93 | # Ignore if the main method has never been called |
94 | if not self._shutdown_signalled: | |
95 | return | |
96 | ||
97 | # Ignore, if we are already shutting down | |
98 | if self._shutdown_signalled.is_set(): | |
aa14071d MT |
99 | return |
100 | ||
414b1875 | 101 | self.log.info(_("Shutting down...")) |
856711a8 | 102 | self._shutdown_signalled.set() |
aa14071d | 103 | |
9216b623 MT |
104 | # Close the control connection |
105 | if self.control: | |
106 | self.control.close() | |
0c94fe09 | 107 | |
aa14071d MT |
108 | def terminate_all_workers(self): |
109 | """ | |
110 | Terminates all workers. | |
111 | """ | |
891fb709 MT |
112 | self.log.debug("Sending SIGTERM to all workers") |
113 | ||
114 | # First send SIGTERM to all processes | |
aa14071d | 115 | for worker in self.workers: |
891fb709 MT |
116 | worker.terminate() |
117 | ||
118 | self.log.debug("Waiting for workers to terminate") | |
aa14071d | 119 | |
fa736f5a | 120 | # Then wait until they all have finished. |
fa736f5a MT |
121 | for worker in self.workers: |
122 | worker.join() | |
9a9926fd | 123 | |
891fb709 | 124 | self.log.debug("All workers have finished") |
aa14071d | 125 | |
aa14071d MT |
126 | # Signal handling. |
127 | ||
128 | def register_signal_handlers(self): | |
129 | signal.signal(signal.SIGCHLD, self.handle_SIGCHLD) | |
130 | signal.signal(signal.SIGINT, self.handle_SIGTERM) | |
131 | signal.signal(signal.SIGTERM, self.handle_SIGTERM) | |
132 | ||
133 | def handle_SIGCHLD(self, signum, frame): | |
134 | """ | |
135 | Handle signal SIGCHLD. | |
136 | """ | |
891fb709 MT |
137 | # Find the worker process that has terminated |
138 | for worker in self.workers: | |
139 | # Skip any workers that are still alive | |
140 | if worker.is_alive(): | |
141 | continue | |
142 | ||
143 | self.log.debug("Worker %s has terminated with status %s" % \ | |
144 | (worker.pid, worker.exitcode)) | |
145 | ||
146 | # Remove the worker from the list | |
147 | try: | |
148 | self.workers.remove(worker) | |
149 | except ValueError: | |
150 | pass | |
151 | ||
152 | # Close the process | |
153 | worker.close() | |
154 | ||
155 | # We finish after handling one worker. If multiple workers have finished | |
156 | # at the same time, this handler will be called again to handle it. | |
157 | break | |
9a9926fd MT |
158 | |
159 | def handle_SIGTERM(self, signum, frame): | |
160 | """ | |
161 | Handle signal SIGTERM. | |
162 | """ | |
163 | # Just shut down. | |
164 | self.shutdown() | |
165 | ||
76960e90 MT |
166 | def job_received(self, job): |
167 | """ | |
168 | Called when this builder was assigned a new job | |
169 | """ | |
891fb709 | 170 | # Launch a new worker |
db1162a2 | 171 | worker = Worker(self, job) |
891fb709 | 172 | self.workers.append(worker) |
76960e90 | 173 | |
891fb709 MT |
174 | # Run it |
175 | worker.start() | |
176 | ||
177 | self.log.debug("Spawned a new worker process as PID %s" % worker.pid) | |
76960e90 | 178 | |
aa14071d | 179 | |
891fb709 | 180 | class Worker(multiprocessing.Process): |
db1162a2 | 181 | def __init__(self, daemon, data): |
aa14071d | 182 | multiprocessing.Process.__init__(self) |
db1162a2 | 183 | self.daemon = daemon |
891fb709 | 184 | |
891fb709 MT |
185 | # The job that has been received |
186 | self.data = data | |
aa14071d | 187 | |
1ab7529b MT |
188 | @property |
189 | def service(self): | |
190 | return self.daemon.service | |
191 | ||
192 | @property | |
193 | def log(self): | |
194 | return self.daemon.log | |
195 | ||
aa14071d | 196 | def run(self): |
891fb709 | 197 | self.log.debug("Worker %s has launched" % self.pid) |
aa14071d | 198 | |
891fb709 MT |
199 | # Register signal handlers |
200 | self.register_signal_handlers() | |
aa14071d | 201 | |
49841f06 MT |
202 | # Run everything from here asynchronously |
203 | asyncio.run(self._work()) | |
204 | ||
205 | self.log.debug("Worker %s terminated gracefully" % self.pid) | |
206 | ||
a6add487 MT |
207 | def is_test(self): |
208 | """ | |
209 | Returns True if this job is a test job | |
210 | """ | |
211 | return self.data.get("test", False) | |
212 | ||
4c9c875d MT |
213 | @property |
214 | def job_id(self): | |
215 | return self.data.get("id") | |
216 | ||
ef009305 MT |
217 | @property |
218 | def ccache(self): | |
219 | """ | |
220 | ccache settings | |
221 | """ | |
222 | return self.data.get("ccache", {}) | |
223 | ||
224 | @property | |
225 | def ccache_enabled(self): | |
226 | return self.ccache.get("enabled", False) | |
227 | ||
228 | @property | |
229 | def ccache_path(self): | |
230 | """ | |
231 | The ccache path for this job | |
232 | """ | |
233 | path = self.ccache.get("path", None) | |
234 | ||
235 | if path: | |
236 | return os.path.join(self.daemon.ccache_path, path) | |
237 | ||
49841f06 MT |
238 | async def _work(self): |
239 | """ | |
240 | Called from the async IO loop doing all the work | |
241 | """ | |
2dc6117b MT |
242 | success = False |
243 | ||
4c9c875d MT |
244 | # Check if we have received some useful data |
245 | if not self.job_id: | |
cb7def8a MT |
246 | raise ValueError("Did not receive a job ID") |
247 | ||
248 | # Set the process title | |
4c9c875d | 249 | setproctitle.setproctitle("pakfire-worker job %s" % self.job_id) |
cb7def8a | 250 | |
6f0cd275 MT |
251 | # Fetch the build architecture |
252 | arch = self.data.get("arch") | |
253 | ||
6f0cd275 MT |
254 | # Fetch the package URL |
255 | pkg = self.data.get("pkg") | |
256 | if not pkg: | |
257 | raise ValueError("Did not received a package URL") | |
258 | ||
1ab7529b MT |
259 | # Connect to the service |
260 | self.job = await self.service.job(self.job_id, worker=self) | |
cae7ad0e MT |
261 | |
262 | # Setup build logger | |
263 | logger = BuildLogger(self.log, self.job) | |
264 | ||
688e6846 MT |
265 | # Create a temporary directory in which the built packages will be copied |
266 | with tempfile.TemporaryDirectory(prefix="pakfire-packages-") as target: | |
1479df35 | 267 | packages = [] |
2dc6117b | 268 | |
688e6846 MT |
269 | # Run the build |
270 | try: | |
f4497d73 | 271 | build = self.build(pkg, arch=arch, target=target, |
4c9c875d | 272 | logger=logger._log, build_id=self.job_id, |
ef009305 MT |
273 | |
274 | # Always disable using snapshots | |
275 | disable_snapshot=True, | |
276 | ||
277 | # ccache | |
278 | disable_ccache=not self.ccache_enabled, | |
279 | ccache_path=self.ccache_path, | |
280 | ) | |
688e6846 MT |
281 | |
282 | # Wait until the build process is done and stream the log in the meantime | |
283 | while not build.done(): | |
284 | await logger.stream(timeout=1) | |
285 | ||
68bc587f MT |
286 | # Await the build task (which would raise any exceptions) |
287 | await build | |
288 | ||
688e6846 MT |
289 | # Catch any other Exception |
290 | except Exception as e: | |
291 | raise e | |
292 | ||
293 | # The build has finished successfully | |
294 | else: | |
295 | success = True | |
296 | ||
297 | # Find any packages | |
a6add487 MT |
298 | if not self.is_test(): |
299 | packages = glob.glob("%s/*.pfm" % target) | |
688e6846 | 300 | |
1ab7529b | 301 | # Notify the service that the job has finished |
688e6846 | 302 | finally: |
1479df35 | 303 | await self.job.finished( |
688e6846 | 304 | success=success, |
1479df35 | 305 | logfile=logger.logfile.name, |
688e6846 MT |
306 | packages=packages, |
307 | ) | |
cae7ad0e | 308 | |
f4497d73 | 309 | def build(self, *args, **kwargs): |
cae7ad0e MT |
310 | """ |
311 | Sets up a new Pakfire instance and runs it in a new thread. | |
312 | ||
313 | This method returns an async.Task() object which can be used to track | |
314 | if this job is still running. | |
315 | """ | |
f4497d73 MT |
316 | thread = asyncio.to_thread(self._build, *args, **kwargs) |
317 | ||
318 | return asyncio.create_task(thread) | |
319 | ||
320 | def _build(self, pkg, arch=None, logger=None, **kwargs): | |
6f0cd275 | 321 | # Setup Pakfire instance |
89a99cc8 | 322 | p = _pakfire.Pakfire(arch=arch, conf=self.pakfire_conf, logger=logger) |
6f0cd275 | 323 | |
f4497d73 MT |
324 | # Run the build |
325 | return p.build(pkg, **kwargs) | |
aa14071d | 326 | |
aa14071d | 327 | def shutdown(self): |
30b3c555 | 328 | self.log.debug("Shutting down worker %s" % self.pid) |
aa14071d | 329 | |
30b3c555 | 330 | # XXX figure out what to do, when a build is running |
aa14071d | 331 | |
4f872b98 MT |
332 | def abort(self, *args, **kwargs): |
333 | """ | |
334 | Called to abort a running build immediately | |
335 | """ | |
336 | log.warning("Build job has been aborted") | |
337 | ||
338 | # XXX TODO | |
339 | ||
aa14071d MT |
340 | # Signal handling. |
341 | ||
342 | def register_signal_handlers(self): | |
343 | signal.signal(signal.SIGCHLD, self.handle_SIGCHLD) | |
344 | signal.signal(signal.SIGINT, self.handle_SIGTERM) | |
345 | signal.signal(signal.SIGTERM, self.handle_SIGTERM) | |
346 | ||
347 | def handle_SIGCHLD(self, signum, frame): | |
348 | """ | |
349 | Handle signal SIGCHLD. | |
350 | """ | |
351 | # Must be here so that SIGCHLD won't be propagated to | |
352 | # PakfireDaemon. | |
353 | pass | |
354 | ||
355 | def handle_SIGTERM(self, signum, frame): | |
356 | """ | |
357 | Handle signal SIGTERM. | |
358 | """ | |
359 | self.shutdown() | |
6f0cd275 MT |
360 | |
361 | @functools.cached_property | |
362 | def pakfire_conf(self): | |
363 | """ | |
364 | Writes the pakfire configuration to file and returns its path | |
365 | """ | |
366 | conf = self.data.get("conf") | |
367 | ||
050b130b MT |
368 | # Dump pakfire configuration |
369 | log.debug("Pakfire configuration:\n%s" % conf) | |
370 | ||
509aaad2 | 371 | return io.StringIO(conf) |
cae7ad0e MT |
372 | |
373 | ||
374 | class BuildLogger(object): | |
375 | """ | |
376 | This class groups together all sorts of logging. | |
377 | """ | |
378 | def __init__(self, log, job): | |
379 | self.log = log | |
380 | self.job = job | |
381 | ||
382 | # Create a logfile | |
e24e6cf3 | 383 | self.logfile = tempfile.NamedTemporaryFile(mode="w") |
cae7ad0e MT |
384 | |
385 | # Create a FIFO queue to buffer any log messages | |
386 | self.queue = asyncio.Queue() | |
387 | ||
388 | # Create a new logger | |
389 | self.logger = self.log.getChild(self.job.id) | |
cae7ad0e MT |
390 | self.logger.setLevel(logging.DEBUG) |
391 | ||
392 | # Log everything to the queue | |
393 | handler = logging.handlers.QueueHandler(self.queue) | |
4b26caf0 | 394 | handler.setLevel(logging.INFO) |
cae7ad0e MT |
395 | self.logger.addHandler(handler) |
396 | ||
397 | # Log everything to the file | |
398 | handler = logging.StreamHandler(self.logfile) | |
4b26caf0 | 399 | handler.setLevel(logging.INFO) |
cae7ad0e MT |
400 | self.logger.addHandler(handler) |
401 | ||
402 | def _log(self, level, message): | |
a0088f44 MT |
403 | # Remove any trailing newline (but only one) |
404 | if message: | |
405 | message = message.removesuffix("\n") | |
406 | ||
cae7ad0e MT |
407 | return self.logger.log(level, message) |
408 | ||
409 | async def stream(self, timeout=0): | |
cae7ad0e MT |
410 | while True: |
411 | # Fetch a message from the queue | |
6d97c00f MT |
412 | try: |
413 | message = await asyncio.wait_for(self.queue.get(), timeout=timeout) | |
414 | ||
415 | # If we did not receive any messages within the timeout, | |
416 | # we return control back to the caller | |
417 | except asyncio.TimeoutError as e: | |
418 | break | |
419 | ||
420 | # Ignore any empty messages | |
cae7ad0e MT |
421 | if message is None: |
422 | continue | |
423 | ||
1ab7529b | 424 | # Send message to the service |
47bf5891 | 425 | await self.job.log(message.created, message.levelno, message.getMessage()) |