]> git.ipfire.org Git - people/ms/pakfire.git/blame - python/pakfire/daemon.py
daemon: Make shutting down a bit faster.
[people/ms/pakfire.git] / python / pakfire / daemon.py
CommitLineData
aa14071d
MT
1#!/usr/bin/python
2
3import hashlib
4import json
5import multiprocessing
6import os
7import signal
8import sys
9import tempfile
10import time
11
12import pakfire.base
13import pakfire.builder
14import pakfire.config
15import pakfire.downloader
16import pakfire.system
17import pakfire.util
18from pakfire.system import system
19
20import base
21import transport
22
23from pakfire.constants import *
24from pakfire.i18n import _
25
26import logging
27log = logging.getLogger("pakfire.daemon")
28
29class BuildJob(dict):
30 """
31 Wrapper class for build jobs, that are received from the hub.
32
33 This makes accessing attributes more easy.
34 """
35 def __getattr__(self, key):
36 try:
37 return self[key]
38 except KeyError:
39 raise AttributeError, key
40
41
42class PakfireDaemon(object):
43 def __init__(self, config):
44 self.config = config
45
46 # Indicates if this daemon is in running mode.
47 self.__running = True
48
9a9926fd
MT
49 # Create daemon that sends keep-alive messages.
50 self.keepalive = PakfireDaemonKeepalive(self.config)
aa14071d 51
9a9926fd
MT
52 # List of worker processes.
53 self.__workers = [self.keepalive]
aa14071d
MT
54
55 ### Configuration
56 # Number of workers in waiting state.
57 self.max_waiting = 1
58
59 # Number of running workers.
60 self.max_running = system.cpu_count * 2
61
62 def run(self, heartbeat=30):
63 """
64 Main loop.
65 """
66 # Register signal handlers.
67 self.register_signal_handlers()
68
9a9926fd
MT
69 # Start keepalive process.
70 self.keepalive.start()
aa14071d 71
9a9926fd 72 # Run main loop.
aa14071d
MT
73 while self.__running:
74 time_started = time.time()
75
aa14071d
MT
76 # Spawn a sufficient number of worker processes.
77 self.spawn_workers_if_needed()
78
79 # Get runtime of this loop iteration.
80 time_elapsed = time.time() - time_started
81
82 # Wait if the heartbeat time has not been reached, yet.
83 if time_elapsed < heartbeat:
84 time.sleep(heartbeat - time_elapsed)
85
86 # Main loop has ended, but we wait until all workers have finished.
87 self.terminate_all_workers()
88
89 def shutdown(self):
90 """
91 Terminates all workers and exists the daemon.
92 """
93 if not self.__running:
94 return
95
96 log.info(_("Shutting down..."))
97 self.__running = False
98
99 def spawn_workers_if_needed(self, *args, **kwargs):
100 """
101 Spawns more workers if needed.
102 """
103 # Do not create any more processes when the daemon is shutting down.
104 if not self.__running:
105 return
106
107 # Cleanup all other workers.
108 self.cleanup_workers()
109
110 # Do not create more workers if there are already enough workers
111 # active.
112 if len(self.workers) >= self.max_running:
113 log.warning("More workers running than allowed")
114 return
115
116 # Do nothing, if there is are already enough workers waiting.
117 wanted_waiting_workers = self.max_waiting - len(self.waiting_workers)
118 if wanted_waiting_workers <= 0:
119 return
120
121 # Spawn a new worker.
122 for i in range(wanted_waiting_workers):
123 self.spawn_worker(*args, **kwargs)
124
125 def spawn_worker(self, *args, **kwargs):
126 """
127 Spawns a new worker process.
128 """
129 worker = PakfireWorker(config=self.config, *args, **kwargs)
130 worker.start()
131
132 log.debug("Spawned new worker process: %s" % worker)
133 self.__workers.append(worker)
134
135 def terminate_worker(self, worker):
136 """
137 Terminates the given worker.
138 """
139 log.warning(_("Terminating worker process: %s") % worker)
140
141 worker.terminate()
142
143 def terminate_all_workers(self):
144 """
145 Terminates all workers.
146 """
fa736f5a
MT
147 # First send SIGTERM to all processes.
148 self.terminate_worker(self.keepalive)
aa14071d
MT
149 for worker in self.workers:
150 self.terminate_worker(worker)
151
fa736f5a 152 # Then wait until they all have finished.
9a9926fd 153 self.keepalive.join()
fa736f5a
MT
154 for worker in self.workers:
155 worker.join()
9a9926fd 156
aa14071d
MT
157 def remove_worker(self, worker):
158 """
159 Removes a worker from the internal list of worker processes.
160 """
161 assert not worker.is_alive(), "Remove alive worker?"
162
163 log.debug("Removing worker: %s" % worker)
164 try:
165 self.__workers.remove(worker)
166 except:
167 pass
168
169 def cleanup_workers(self):
170 """
171 Remove workers that are not alive any more.
172 """
173 for worker in self.workers:
174 if worker.is_alive():
175 continue
176
177 self.remove_worker(worker)
178
179 @property
180 def workers(self):
9a9926fd 181 return [w for w in self.__workers if isinstance(w, PakfireWorker)]
aa14071d
MT
182
183 @property
184 def running_workers(self):
185 workers = []
186
187 for worker in self.workers:
188 if worker.waiting.is_set():
189 continue
190
191 workers.append(worker)
192
193 return workers
194
195 @property
196 def waiting_workers(self):
197 workers = []
198
199 for worker in self.workers:
200 if worker.waiting.is_set():
201 workers.append(worker)
202
203 return workers
204
205 # Signal handling.
206
207 def register_signal_handlers(self):
208 signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
209 signal.signal(signal.SIGINT, self.handle_SIGTERM)
210 signal.signal(signal.SIGTERM, self.handle_SIGTERM)
211
212 def handle_SIGCHLD(self, signum, frame):
213 """
214 Handle signal SIGCHLD.
215 """
216 # Spawn new workers if necessary.
217 self.spawn_workers_if_needed()
218
219 def handle_SIGTERM(self, signum, frame):
220 """
221 Handle signal SIGTERM.
222 """
223 # Just shut down.
224 self.shutdown()
225
9a9926fd
MT
226
227class PakfireDaemonKeepalive(multiprocessing.Process):
228 def __init__(self, config):
229 multiprocessing.Process.__init__(self)
230
231 # Save config.
232 self.config = config
233
234 def run(self, heartbeat=30):
235 # Register signal handlers.
236 self.register_signal_handlers()
237
238 # Create connection to the hub.
239 self.transport = transport.PakfireHubTransport(self.config)
9a9926fd
MT
240
241 # Send our profile to the hub.
242 self.send_builder_info()
243
244 while True:
245 time_started = time.time()
246
247 # Send keepalive message.
248 self.send_keepalive()
249
250 # Get runtime of this loop iteration.
251 time_elapsed = time.time() - time_started
252
253 # Wait if the heartbeat time has not been reached, yet.
254 if time_elapsed < heartbeat:
255 time.sleep(heartbeat - time_elapsed)
256
257 def shutdown(self):
258 """
259 Ends this process immediately.
260 """
261 sys.exit(1)
262
263 # Signal handling.
264
265 def register_signal_handlers(self):
266 signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
267 signal.signal(signal.SIGINT, self.handle_SIGTERM)
268 signal.signal(signal.SIGTERM, self.handle_SIGTERM)
269
270 def handle_SIGCHLD(self, signum, frame):
271 """
272 Handle signal SIGCHLD.
273 """
274 # Must be here so that SIGCHLD won't be propagated to
275 # PakfireDaemon.
276 pass
277
278 def handle_SIGTERM(self, signum, frame):
279 """
280 Handle signal SIGTERM.
281 """
282 # Just shut down.
283 self.shutdown()
284
aa14071d
MT
285 # Talking to the hub.
286
287 def send_builder_info(self):
288 log.info(_("Sending builder information to hub..."))
289
290 data = {
291 # CPU info
292 "cpu_model" : system.cpu_model,
293 "cpu_count" : system.cpu_count,
294 "cpu_arch" : system.native_arch,
295 "cpu_bogomips" : system.cpu_bogomips,
296
297 # Memory + swap
298 "mem_total" : system.memory,
299 "swap_total" : system.swap_total,
300
301 # Pakfire + OS
302 "pakfire_version" : PAKFIRE_VERSION,
303 "host_key" : self.config.get("signatures", "host_key", None),
304 "os_name" : system.distro.pretty_name,
305
306 # Supported arches
307 "supported_arches" : ",".join(system.supported_arches),
308 }
309 self.transport.post("/builders/info", data=data)
310
311 def send_keepalive(self):
312 log.debug("Sending keepalive message to hub...")
313
314 data = {
315 # Load average.
316 "loadavg1" : system.loadavg1,
317 "loadavg5" : system.loadavg5,
318 "loadavg15" : system.loadavg15,
319
320 # Memory
321 "mem_total" : system.memory_total,
322 "mem_free" : system.memory_free,
323
324 # Swap
325 "swap_total" : system.swap_total,
326 "swap_free" : system.swap_free,
327
328 # Disk space
329 "space_free" : self.free_space,
330 }
331 self.transport.post("/builders/keepalive", data=data)
332
333 @property
334 def free_space(self):
335 mp = system.get_mountpoint(BUILD_ROOT)
336
337 return mp.space_left
338
339
340class PakfireWorker(multiprocessing.Process):
341 def __init__(self, config, waiting=None):
342 multiprocessing.Process.__init__(self)
343
344 # Save config.
345 self.config = config
346
347 # Waiting event. Clear if this worker is running a build.
348 self.waiting = multiprocessing.Event()
349 self.waiting.set()
350
351 # Indicates if this worker is running.
352 self.__running = True
353
354 def run(self):
355 # Register signal handlers.
356 self.register_signal_handlers()
357
358 # Create connection to the hub.
359 self.transport = transport.PakfireHubTransport(self.config)
aa14071d
MT
360
361 while self.__running:
362 # Try to get a new build job.
363 job = self.get_new_build_job()
364 if not job:
365 continue
366
367 # If we got a job, we are not waiting anymore.
368 self.waiting.clear()
369
370 # Run the job and return.
371 return self.execute_job(job)
372
373 def shutdown(self):
374 self.__running = False
375
376 # When we are just waiting, we can edit right away.
377 if self.waiting.is_set():
378 log.debug("Exiting immediately")
379 sys.exit(1)
380
381 # XXX figure out what to do, when a build is running.
382
383 # Signal handling.
384
385 def register_signal_handlers(self):
386 signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
387 signal.signal(signal.SIGINT, self.handle_SIGTERM)
388 signal.signal(signal.SIGTERM, self.handle_SIGTERM)
389
390 def handle_SIGCHLD(self, signum, frame):
391 """
392 Handle signal SIGCHLD.
393 """
394 # Must be here so that SIGCHLD won't be propagated to
395 # PakfireDaemon.
396 pass
397
398 def handle_SIGTERM(self, signum, frame):
399 """
400 Handle signal SIGTERM.
401 """
402 self.shutdown()
403
404 def get_new_build_job(self, timeout=600):
405 log.debug("Requesting new job...")
406
407 try:
408 job = self.transport.get_json("/builders/jobs/queue",
409 data={ "timeout" : timeout, }, timeout=timeout)
410
411 if job:
412 return BuildJob(job)
413
414 # As this is a long poll request, it is okay to time out.
415 except TransportMaxTriesExceededError:
416 pass
417
418 def execute_job(self, job):
419 log.debug("Executing job: %s" % job)
420
421 # Call the function that processes the build and try to catch general
422 # exceptions and report them to the server.
423 # If everything goes okay, we tell this the server, too.
424 try:
425 # Create a temporary file and a directory for the resulting files.
426 tmpdir = tempfile.mkdtemp()
427 tmpfile = os.path.join(tmpdir, os.path.basename(job.source_url))
428 logfile = os.path.join(tmpdir, "build.log")
429
430 # Create pakfire configuration instance.
431 config = pakfire.config.ConfigDaemon()
432 config.parse(job.config)
433
434 # Create pakfire instance.
435 p = None
436 try:
437 p = pakfire.base.PakfireBuilder(config=config, arch=job.arch)
438
439 # Download the source package.
440 grabber = pakfire.downloader.PackageDownloader(p)
441 grabber.urlgrab(job.source_url, filename=tmpfile)
442
443 # Check if the download checksum matches (if provided).
444 if job.source_hash_sha512:
445 h = hashlib.new("sha512")
446 f = open(tmpfile, "rb")
447 while True:
448 buf = f.read(BUFFER_SIZE)
449 if not buf:
450 break
451
452 h.update(buf)
453 f.close()
454
455 if not job.source_hash_sha512 == h.hexdigest():
456 raise DownloadError, "Hash check did not succeed."
457
458 # Create a new instance of a build environment.
459 build = pakfire.builder.BuildEnviron(p, tmpfile,
460 release_build=True, build_id=job.id, logfile=logfile)
461
462 try:
463 # Create the build environment.
464 build.start()
465
466 # Update the build status on the server.
467 self.upload_buildroot(job, build.installed_packages)
468 self.update_state(job, "running")
469
470 # Run the build (without install test).
471 build.build(install_test=False)
472
473 # Copy the created packages to the tempdir.
474 build.copy_result(tmpdir)
475
476 finally:
477 # Cleanup the build environment.
478 build.stop()
479
480 # Jippie, build is finished, we are going to upload the files.
481 self.update_state(job, "uploading")
482
483 # Walk through the result directory and upload all (binary) files.
484 # Skip that for test builds.
485 if not job.type == "test":
486 for dir, subdirs, files in os.walk(tmpdir):
487 for file in files:
488 file = os.path.join(dir, file)
489 if file in (logfile, tmpfile,):
490 continue
491
492 self.upload_file(job, file, "package")
493
494 except DependencyError, e:
495 message = "%s: %s" % (e.__class__.__name__, e)
496 self.update_state(job, "dependency_error", message)
497 raise
498
499 except DownloadError, e:
500 message = "%s: %s" % (e.__class__.__name__, e)
501 self.update_state(job, "download_error", message)
502 raise
503
504 finally:
505 if p:
506 p.destroy()
507
508 # Upload the logfile in any case and if it exists.
509 if os.path.exists(logfile):
510 self.upload_file(job, logfile, "log")
511
512 # Cleanup the files we created.
513 pakfire.util.rm(tmpdir)
514
515 except DependencyError:
516 # This has already been reported.
517 raise
518
519 except (DownloadError,):
520 # Do not take any further action for these exceptions.
521 pass
522
523 except (KeyboardInterrupt, SystemExit):
524 self.update_state(job, "aborted")
525
526 except Exception, e:
527 # Format the exception and send it to the server.
528 message = "%s: %s" % (e.__class__.__name__, e)
529
530 self.update_state(job, "failed", message)
531 raise
532
533 else:
534 self.update_state(job, "finished")
535
536 def update_state(self, job, state, message=None):
537 """
538 Update state of the build job on the hub.
539 """
540 data = {
541 "message" : message or "",
542 }
543
544 self.transport.post("/builders/jobs/%s/state/%s" % (job.id, state),
545 data=data)
546
547 def upload_file(self, job, filename, type):
548 assert os.path.exists(filename)
549 assert type in ("package", "log")
550
551 # First upload the file data and save the upload_id.
552 upload_id = self.transport.upload_file(filename)
553
554 data = {
555 "type" : type,
556 }
557
558 # Add the file to the build.
559 self.transport.post("/builders/jobs/%s/addfile/%s" % (job.id, upload_id),
560 data=data)
561
562 def upload_buildroot(self, job, installed_packages):
563 pkgs = []
564 for pkg in installed_packages:
565 pkgs.append((pkg.friendly_name, pkg.uuid))
566
567 data = { "buildroot" : json.dumps(pkgs) }
568
569 self.transport.post("/builders/jobs/%s/buildroot" % job.id, data=data)