]> git.ipfire.org Git - people/stevee/pakfire.git/blob - src/pakfire/daemon.py
Use autotools.
[people/stevee/pakfire.git] / src / pakfire / daemon.py
1 #!/usr/bin/python
2
3 import hashlib
4 import json
5 import multiprocessing
6 import os
7 import signal
8 import sys
9 import tempfile
10 import time
11
12 import pakfire.base
13 import pakfire.builder
14 import pakfire.config
15 import pakfire.downloader
16 import pakfire.system
17 import pakfire.util
18 from pakfire.system import system
19
20 import base
21 import transport
22
23 from pakfire.constants import *
24 from pakfire.i18n import _
25
26 import logging
27 log = logging.getLogger("pakfire.daemon")
28
29 class BuildJob(dict):
30 """
31 Wrapper class for build jobs, that are received from the hub.
32
33 This makes accessing attributes more easy.
34 """
35 def __getattr__(self, key):
36 try:
37 return self[key]
38 except KeyError:
39 raise AttributeError, key
40
41
42 class PakfireDaemon(object):
43 def __init__(self, config):
44 self.config = config
45
46 # Indicates if this daemon is in running mode.
47 self.__running = True
48
49 # Create daemon that sends keep-alive messages.
50 self.keepalive = PakfireDaemonKeepalive(self.config)
51
52 # List of worker processes.
53 self.__workers = [self.keepalive]
54
55 ### Configuration
56 # Number of workers in waiting state.
57 self.max_waiting = 1
58
59 # Number of running workers.
60 self.max_running = system.cpu_count * 2
61
62 def run(self, heartbeat=30):
63 """
64 Main loop.
65 """
66 # Register signal handlers.
67 self.register_signal_handlers()
68
69 # Start keepalive process.
70 self.keepalive.start()
71
72 # Run main loop.
73 while self.__running:
74 time_started = time.time()
75
76 # Check if keepalive process is still alive.
77 if not self.keepalive.is_alive():
78 self.restart_keepalive(wait=10)
79
80 # Spawn a sufficient number of worker processes.
81 self.spawn_workers_if_needed()
82
83 # Get runtime of this loop iteration.
84 time_elapsed = time.time() - time_started
85
86 # Wait if the heartbeat time has not been reached, yet.
87 if time_elapsed < heartbeat:
88 time.sleep(heartbeat - time_elapsed)
89
90 # Main loop has ended, but we wait until all workers have finished.
91 self.terminate_all_workers()
92
93 def shutdown(self):
94 """
95 Terminates all workers and exists the daemon.
96 """
97 if not self.__running:
98 return
99
100 log.info(_("Shutting down..."))
101 self.__running = False
102
103 def restart_keepalive(self, wait=None):
104 log.critical(_("Restarting keepalive process"))
105
106 # Send SIGTERM to really end the process.
107 self.keepalive.terminate()
108
109 # Wait for the process to terminate.
110 if wait:
111 self.keepalive.join(wait)
112
113 # Remove the keepalive process from the process list.
114 try:
115 self.__workers.remove(self.keepalive)
116 except ValueError:
117 pass
118
119 # Create a new process and start it.
120 self.keepalive = PakfireDaemonKeepalive(self.config)
121 self.keepalive.start()
122
123 # Add the process to the process list.
124 self.__workers.append(self.keepalive)
125
126 def spawn_workers_if_needed(self, *args, **kwargs):
127 """
128 Spawns more workers if needed.
129 """
130 # Do not create any more processes when the daemon is shutting down.
131 if not self.__running:
132 return
133
134 # Cleanup all other workers.
135 self.cleanup_workers()
136
137 # Do not create more workers if there are already enough workers
138 # active.
139 if len(self.workers) >= self.max_running:
140 log.warning("More workers running than allowed")
141 return
142
143 # Do nothing, if there is are already enough workers waiting.
144 wanted_waiting_workers = self.max_waiting - len(self.waiting_workers)
145 if wanted_waiting_workers <= 0:
146 return
147
148 # Spawn a new worker.
149 for i in range(wanted_waiting_workers):
150 self.spawn_worker(*args, **kwargs)
151
152 def spawn_worker(self, *args, **kwargs):
153 """
154 Spawns a new worker process.
155 """
156 worker = PakfireWorker(config=self.config, *args, **kwargs)
157 worker.start()
158
159 log.debug("Spawned new worker process: %s" % worker)
160 self.__workers.append(worker)
161
162 def terminate_worker(self, worker):
163 """
164 Terminates the given worker.
165 """
166 log.warning(_("Terminating worker process: %s") % worker)
167
168 worker.terminate()
169
170 def terminate_all_workers(self):
171 """
172 Terminates all workers.
173 """
174 # First send SIGTERM to all processes.
175 self.terminate_worker(self.keepalive)
176 for worker in self.workers:
177 self.terminate_worker(worker)
178
179 # Then wait until they all have finished.
180 self.keepalive.join()
181 for worker in self.workers:
182 worker.join()
183
184 def remove_worker(self, worker):
185 """
186 Removes a worker from the internal list of worker processes.
187 """
188 assert not worker.is_alive(), "Remove alive worker?"
189
190 log.debug("Removing worker: %s" % worker)
191 try:
192 self.__workers.remove(worker)
193 except:
194 pass
195
196 def cleanup_workers(self):
197 """
198 Remove workers that are not alive any more.
199 """
200 for worker in self.workers:
201 if worker.is_alive():
202 continue
203
204 self.remove_worker(worker)
205
206 @property
207 def workers(self):
208 return [w for w in self.__workers if isinstance(w, PakfireWorker)]
209
210 @property
211 def running_workers(self):
212 workers = []
213
214 for worker in self.workers:
215 if worker.waiting.is_set():
216 continue
217
218 workers.append(worker)
219
220 return workers
221
222 @property
223 def waiting_workers(self):
224 workers = []
225
226 for worker in self.workers:
227 if worker.waiting.is_set():
228 workers.append(worker)
229
230 return workers
231
232 # Signal handling.
233
234 def register_signal_handlers(self):
235 signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
236 signal.signal(signal.SIGINT, self.handle_SIGTERM)
237 signal.signal(signal.SIGTERM, self.handle_SIGTERM)
238
239 def handle_SIGCHLD(self, signum, frame):
240 """
241 Handle signal SIGCHLD.
242 """
243 # Spawn new workers if necessary.
244 self.spawn_workers_if_needed()
245
246 def handle_SIGTERM(self, signum, frame):
247 """
248 Handle signal SIGTERM.
249 """
250 # Just shut down.
251 self.shutdown()
252
253
254 class PakfireDaemonKeepalive(multiprocessing.Process):
255 def __init__(self, config):
256 multiprocessing.Process.__init__(self)
257
258 # Save config.
259 self.config = config
260
261 def run(self, heartbeat=30):
262 # Register signal handlers.
263 self.register_signal_handlers()
264
265 # Create connection to the hub.
266 self.transport = transport.PakfireHubTransport(self.config)
267
268 # Send our profile to the hub.
269 self.send_builder_info()
270
271 while True:
272 time_started = time.time()
273
274 # Send keepalive message.
275 self.send_keepalive()
276
277 # Get runtime of this loop iteration.
278 time_elapsed = time.time() - time_started
279
280 # Wait if the heartbeat time has not been reached, yet.
281 if time_elapsed < heartbeat:
282 time.sleep(heartbeat - time_elapsed)
283
284 def shutdown(self):
285 """
286 Ends this process immediately.
287 """
288 sys.exit(1)
289
290 # Signal handling.
291
292 def register_signal_handlers(self):
293 signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
294 signal.signal(signal.SIGINT, self.handle_SIGTERM)
295 signal.signal(signal.SIGTERM, self.handle_SIGTERM)
296
297 def handle_SIGCHLD(self, signum, frame):
298 """
299 Handle signal SIGCHLD.
300 """
301 # Must be here so that SIGCHLD won't be propagated to
302 # PakfireDaemon.
303 pass
304
305 def handle_SIGTERM(self, signum, frame):
306 """
307 Handle signal SIGTERM.
308 """
309 # Just shut down.
310 self.shutdown()
311
312 # Talking to the hub.
313
314 def send_builder_info(self):
315 log.info(_("Sending builder information to hub..."))
316
317 data = {
318 # CPU info
319 "cpu_model" : system.cpu_model,
320 "cpu_count" : system.cpu_count,
321 "cpu_arch" : system.native_arch,
322 "cpu_bogomips" : system.cpu_bogomips,
323
324 # Memory + swap
325 "mem_total" : system.memory,
326 "swap_total" : system.swap_total,
327
328 # Pakfire + OS
329 "pakfire_version" : PAKFIRE_VERSION,
330 "host_key" : self.config.get("signatures", "host_key", None),
331 "os_name" : system.distro.pretty_name,
332
333 # Supported arches
334 "supported_arches" : ",".join(system.supported_arches),
335 }
336 self.transport.post("/builders/info", data=data)
337
338 def send_keepalive(self):
339 log.debug("Sending keepalive message to hub...")
340
341 data = {
342 # Load average.
343 "loadavg1" : system.loadavg1,
344 "loadavg5" : system.loadavg5,
345 "loadavg15" : system.loadavg15,
346
347 # Memory
348 "mem_total" : system.memory_total,
349 "mem_free" : system.memory_free,
350
351 # Swap
352 "swap_total" : system.swap_total,
353 "swap_free" : system.swap_free,
354
355 # Disk space
356 "space_free" : self.free_space,
357 }
358 self.transport.post("/builders/keepalive", data=data)
359
360 @property
361 def free_space(self):
362 mp = system.get_mountpoint(BUILD_ROOT)
363
364 return mp.space_left
365
366
367 class PakfireWorker(multiprocessing.Process):
368 def __init__(self, config, waiting=None):
369 multiprocessing.Process.__init__(self)
370
371 # Save config.
372 self.config = config
373
374 # Waiting event. Clear if this worker is running a build.
375 self.waiting = multiprocessing.Event()
376 self.waiting.set()
377
378 # Indicates if this worker is running.
379 self.__running = True
380
381 def run(self):
382 # Register signal handlers.
383 self.register_signal_handlers()
384
385 # Create connection to the hub.
386 self.transport = transport.PakfireHubTransport(self.config)
387
388 while self.__running:
389 # Check if the build root is file.
390 if not self.check_buildroot():
391 time.sleep(60)
392 continue
393
394 # Try to get a new build job.
395 job = self.get_new_build_job()
396 if not job:
397 continue
398
399 # If we got a job, we are not waiting anymore.
400 self.waiting.clear()
401
402 # Run the job and return.
403 return self.execute_job(job)
404
405 def shutdown(self):
406 self.__running = False
407
408 # When we are just waiting, we can edit right away.
409 if self.waiting.is_set():
410 log.debug("Exiting immediately")
411 sys.exit(1)
412
413 # XXX figure out what to do, when a build is running.
414
415 # Signal handling.
416
417 def register_signal_handlers(self):
418 signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
419 signal.signal(signal.SIGINT, self.handle_SIGTERM)
420 signal.signal(signal.SIGTERM, self.handle_SIGTERM)
421
422 def handle_SIGCHLD(self, signum, frame):
423 """
424 Handle signal SIGCHLD.
425 """
426 # Must be here so that SIGCHLD won't be propagated to
427 # PakfireDaemon.
428 pass
429
430 def handle_SIGTERM(self, signum, frame):
431 """
432 Handle signal SIGTERM.
433 """
434 self.shutdown()
435
436 def check_buildroot(self):
437 """
438 Checks if the buildroot is fine.
439 """
440 mp = system.get_mountpoint(BUILD_ROOT)
441
442 # Check if the mountpoint is read-only.
443 if mp.is_readonly():
444 log.warning("Build directory is read-only: %s" % BUILD_ROOT)
445
446 # Trying to remount.
447 try:
448 mp.remount("rw")
449 except:
450 log.warning("Remounting (rw) %s has failed" % BUILD_ROOT)
451 return False
452 else:
453 log.warning("Successfully remounted as rw: %s" % BUILD_ROOT)
454
455 # Everything looks fine.
456 return True
457
458 def get_new_build_job(self, timeout=600):
459 log.debug("Requesting new job...")
460 try:
461 job = self.transport.get_json("/builders/jobs/queue",
462 data={ "timeout" : timeout, }, timeout=timeout)
463
464 if job:
465 return BuildJob(job)
466
467 # As this is a long poll request, it is okay to time out.
468 except TransportMaxTriesExceededError:
469 pass
470
471 def execute_job(self, job):
472 log.debug("Executing job: %s" % job)
473
474 # Call the function that processes the build and try to catch general
475 # exceptions and report them to the server.
476 # If everything goes okay, we tell this the server, too.
477 try:
478 # Create a temporary file and a directory for the resulting files.
479 tmpdir = tempfile.mkdtemp()
480 tmpfile = os.path.join(tmpdir, os.path.basename(job.source_url))
481 logfile = os.path.join(tmpdir, "build.log")
482
483 # Create pakfire configuration instance.
484 config = pakfire.config.ConfigDaemon()
485 config.parse(job.config)
486
487 # Create pakfire instance.
488 p = None
489 try:
490 p = pakfire.base.PakfireBuilder(config=config, arch=job.arch)
491
492 # Download the source package.
493 grabber = pakfire.downloader.PackageDownloader(p)
494 grabber.urlgrab(job.source_url, filename=tmpfile)
495
496 # Check if the download checksum matches (if provided).
497 if job.source_hash_sha512:
498 h = hashlib.new("sha512")
499 f = open(tmpfile, "rb")
500 while True:
501 buf = f.read(BUFFER_SIZE)
502 if not buf:
503 break
504
505 h.update(buf)
506 f.close()
507
508 if not job.source_hash_sha512 == h.hexdigest():
509 raise DownloadError, "Hash check did not succeed."
510
511 # Create a new instance of a build environment.
512 build = pakfire.builder.BuildEnviron(p, tmpfile,
513 release_build=True, build_id=job.id, logfile=logfile)
514
515 try:
516 # Create the build environment.
517 build.start()
518
519 # Update the build status on the server.
520 self.upload_buildroot(job, build.installed_packages)
521 self.update_state(job, "running")
522
523 # Run the build (without install test).
524 build.build(install_test=False)
525
526 # Copy the created packages to the tempdir.
527 build.copy_result(tmpdir)
528
529 finally:
530 # Cleanup the build environment.
531 build.stop()
532
533 # Jippie, build is finished, we are going to upload the files.
534 self.update_state(job, "uploading")
535
536 # Walk through the result directory and upload all (binary) files.
537 # Skip that for test builds.
538 if not job.type == "test":
539 for dir, subdirs, files in os.walk(tmpdir):
540 for file in files:
541 file = os.path.join(dir, file)
542 if file in (logfile, tmpfile,):
543 continue
544
545 self.upload_file(job, file, "package")
546
547 except DependencyError, e:
548 message = "%s: %s" % (e.__class__.__name__, e)
549 self.update_state(job, "dependency_error", message)
550 raise
551
552 except DownloadError, e:
553 message = "%s: %s" % (e.__class__.__name__, e)
554 self.update_state(job, "download_error", message)
555 raise
556
557 finally:
558 if p:
559 p.destroy()
560
561 # Upload the logfile in any case and if it exists.
562 if os.path.exists(logfile):
563 self.upload_file(job, logfile, "log")
564
565 # Cleanup the files we created.
566 pakfire.util.rm(tmpdir)
567
568 except DependencyError:
569 # This has already been reported.
570 raise
571
572 except (DownloadError,):
573 # Do not take any further action for these exceptions.
574 pass
575
576 except (KeyboardInterrupt, SystemExit):
577 self.update_state(job, "aborted")
578
579 except Exception, e:
580 # Format the exception and send it to the server.
581 message = "%s: %s" % (e.__class__.__name__, e)
582
583 self.update_state(job, "failed", message)
584 raise
585
586 else:
587 self.update_state(job, "finished")
588
589 def update_state(self, job, state, message=None):
590 """
591 Update state of the build job on the hub.
592 """
593 data = {
594 "message" : message or "",
595 }
596
597 self.transport.post("/builders/jobs/%s/state/%s" % (job.id, state),
598 data=data)
599
600 def upload_file(self, job, filename, type):
601 assert os.path.exists(filename)
602 assert type in ("package", "log")
603
604 # First upload the file data and save the upload_id.
605 upload_id = self.transport.upload_file(filename)
606
607 data = {
608 "type" : type,
609 }
610
611 # Add the file to the build.
612 self.transport.post("/builders/jobs/%s/addfile/%s" % (job.id, upload_id),
613 data=data)
614
615 def upload_buildroot(self, job, installed_packages):
616 pkgs = []
617 for pkg in installed_packages:
618 pkgs.append((pkg.friendly_name, pkg.uuid))
619
620 data = { "buildroot" : json.dumps(pkgs) }
621
622 self.transport.post("/builders/jobs/%s/buildroot" % job.id, data=data)