]> git.ipfire.org Git - people/ms/pakfire.git/blob - python/pakfire/daemon.py
Fix typo.
[people/ms/pakfire.git] / python / pakfire / daemon.py
1 #!/usr/bin/python
2
3 import hashlib
4 import json
5 import multiprocessing
6 import os
7 import signal
8 import sys
9 import tempfile
10 import time
11
12 import pakfire.base
13 import pakfire.builder
14 import pakfire.config
15 import pakfire.downloader
16 import pakfire.system
17 import pakfire.util
18 from pakfire.system import system
19
20 import base
21 import transport
22
23 from pakfire.constants import *
24 from pakfire.i18n import _
25
26 import logging
27 log = logging.getLogger("pakfire.daemon")
28
29 class BuildJob(dict):
30 """
31 Wrapper class for build jobs, that are received from the hub.
32
33 This makes accessing attributes more easy.
34 """
35 def __getattr__(self, key):
36 try:
37 return self[key]
38 except KeyError:
39 raise AttributeError, key
40
41
42 class PakfireDaemon(object):
43 def __init__(self, config):
44 self.config = config
45
46 # Indicates if this daemon is in running mode.
47 self.__running = True
48
49 # Create daemon that sends keep-alive messages.
50 self.keepalive = PakfireDaemonKeepalive(self.config)
51
52 # List of worker processes.
53 self.__workers = [self.keepalive]
54
55 ### Configuration
56 # Number of workers in waiting state.
57 self.max_waiting = 1
58
59 # Number of running workers.
60 self.max_running = system.cpu_count * 2
61
62 def run(self, heartbeat=30):
63 """
64 Main loop.
65 """
66 # Register signal handlers.
67 self.register_signal_handlers()
68
69 # Start keepalive process.
70 self.keepalive.start()
71
72 # Run main loop.
73 while self.__running:
74 time_started = time.time()
75
76 # Check if keepalive process is still alive.
77 if not self.keepalive.is_alive():
78 self.restart_keepalive(wait=10)
79
80 # Spawn a sufficient number of worker processes.
81 self.spawn_workers_if_needed()
82
83 # Get runtime of this loop iteration.
84 time_elapsed = time.time() - time_started
85
86 # Wait if the heartbeat time has not been reached, yet.
87 if time_elapsed < heartbeat:
88 time.sleep(heartbeat - time_elapsed)
89
90 # Main loop has ended, but we wait until all workers have finished.
91 self.terminate_all_workers()
92
93 def shutdown(self):
94 """
95 Terminates all workers and exists the daemon.
96 """
97 if not self.__running:
98 return
99
100 log.info(_("Shutting down..."))
101 self.__running = False
102
103 def restart_keepalive(self, wait=None):
104 log.critical(_("Restarting keepalive process"))
105
106 # Send SIGTERM to really end the process.
107 self.keepalive.terminate()
108
109 # Wait for the process to terminate.
110 if wait:
111 self.keepalive.join(wait)
112
113 # Remove the keepalive process from the process list.
114 try:
115 self.__workers.remove(self.keepalive)
116 except ValueError:
117 pass
118
119 # Create a new process and start it.
120 self.keepalive = PakfireDaemonKeepalive(self.config)
121 self.keepalive.start()
122
123 # Add the process to the process list.
124 self.__workers.append(self.keepalive)
125
126 def spawn_workers_if_needed(self, *args, **kwargs):
127 """
128 Spawns more workers if needed.
129 """
130 # Do not create any more processes when the daemon is shutting down.
131 if not self.__running:
132 return
133
134 # Cleanup all other workers.
135 self.cleanup_workers()
136
137 # Do not create more workers if there are already enough workers
138 # active.
139 if len(self.workers) >= self.max_running:
140 log.warning("More workers running than allowed")
141 return
142
143 # Do nothing, if there is are already enough workers waiting.
144 wanted_waiting_workers = self.max_waiting - len(self.waiting_workers)
145 if wanted_waiting_workers <= 0:
146 return
147
148 # Spawn a new worker.
149 for i in range(wanted_waiting_workers):
150 self.spawn_worker(*args, **kwargs)
151
152 def spawn_worker(self, *args, **kwargs):
153 """
154 Spawns a new worker process.
155 """
156 worker = PakfireWorker(config=self.config, *args, **kwargs)
157 worker.start()
158
159 log.debug("Spawned new worker process: %s" % worker)
160 self.__workers.append(worker)
161
162 def terminate_worker(self, worker):
163 """
164 Terminates the given worker.
165 """
166 log.warning(_("Terminating worker process: %s") % worker)
167
168 worker.terminate()
169
170 def terminate_all_workers(self):
171 """
172 Terminates all workers.
173 """
174 # First send SIGTERM to all processes.
175 self.terminate_worker(self.keepalive)
176 for worker in self.workers:
177 self.terminate_worker(worker)
178
179 # Then wait until they all have finished.
180 self.keepalive.join()
181 for worker in self.workers:
182 worker.join()
183
184 def remove_worker(self, worker):
185 """
186 Removes a worker from the internal list of worker processes.
187 """
188 assert not worker.is_alive(), "Remove alive worker?"
189
190 log.debug("Removing worker: %s" % worker)
191 try:
192 self.__workers.remove(worker)
193 except:
194 pass
195
196 def cleanup_workers(self):
197 """
198 Remove workers that are not alive any more.
199 """
200 for worker in self.workers:
201 if worker.is_alive():
202 continue
203
204 self.remove_worker(worker)
205
206 @property
207 def workers(self):
208 return [w for w in self.__workers if isinstance(w, PakfireWorker)]
209
210 @property
211 def running_workers(self):
212 workers = []
213
214 for worker in self.workers:
215 if worker.waiting.is_set():
216 continue
217
218 workers.append(worker)
219
220 return workers
221
222 @property
223 def waiting_workers(self):
224 workers = []
225
226 for worker in self.workers:
227 if worker.waiting.is_set():
228 workers.append(worker)
229
230 return workers
231
232 # Signal handling.
233
234 def register_signal_handlers(self):
235 signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
236 signal.signal(signal.SIGINT, self.handle_SIGTERM)
237 signal.signal(signal.SIGTERM, self.handle_SIGTERM)
238
239 def handle_SIGCHLD(self, signum, frame):
240 """
241 Handle signal SIGCHLD.
242 """
243 # Spawn new workers if necessary.
244 self.spawn_workers_if_needed()
245
246 def handle_SIGTERM(self, signum, frame):
247 """
248 Handle signal SIGTERM.
249 """
250 # Just shut down.
251 self.shutdown()
252
253
254 class PakfireDaemonKeepalive(multiprocessing.Process):
255 def __init__(self, config):
256 multiprocessing.Process.__init__(self)
257
258 # Save config.
259 self.config = config
260
261 def run(self, heartbeat=30):
262 # Register signal handlers.
263 self.register_signal_handlers()
264
265 # Create connection to the hub.
266 self.transport = transport.PakfireHubTransport(self.config)
267
268 # Send our profile to the hub.
269 self.send_builder_info()
270
271 while True:
272 time_started = time.time()
273
274 # Send keepalive message.
275 self.send_keepalive()
276
277 # Get runtime of this loop iteration.
278 time_elapsed = time.time() - time_started
279
280 # Wait if the heartbeat time has not been reached, yet.
281 if time_elapsed < heartbeat:
282 time.sleep(heartbeat - time_elapsed)
283
284 def shutdown(self):
285 """
286 Ends this process immediately.
287 """
288 sys.exit(1)
289
290 # Signal handling.
291
292 def register_signal_handlers(self):
293 signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
294 signal.signal(signal.SIGINT, self.handle_SIGTERM)
295 signal.signal(signal.SIGTERM, self.handle_SIGTERM)
296
297 def handle_SIGCHLD(self, signum, frame):
298 """
299 Handle signal SIGCHLD.
300 """
301 # Must be here so that SIGCHLD won't be propagated to
302 # PakfireDaemon.
303 pass
304
305 def handle_SIGTERM(self, signum, frame):
306 """
307 Handle signal SIGTERM.
308 """
309 # Just shut down.
310 self.shutdown()
311
312 # Talking to the hub.
313
314 def send_builder_info(self):
315 log.info(_("Sending builder information to hub..."))
316
317 data = {
318 # CPU info
319 "cpu_model" : system.cpu_model,
320 "cpu_count" : system.cpu_count,
321 "cpu_arch" : system.native_arch,
322 "cpu_bogomips" : system.cpu_bogomips,
323
324 # Memory + swap
325 "mem_total" : system.memory,
326 "swap_total" : system.swap_total,
327
328 # Pakfire + OS
329 "pakfire_version" : PAKFIRE_VERSION,
330 "host_key" : self.config.get("signatures", "host_key", None),
331 "os_name" : system.distro.pretty_name,
332
333 # Supported arches
334 "supported_arches" : ",".join(system.supported_arches),
335 }
336 self.transport.post("/builders/info", data=data)
337
338 def send_keepalive(self):
339 log.debug("Sending keepalive message to hub...")
340
341 data = {
342 # Load average.
343 "loadavg1" : system.loadavg1,
344 "loadavg5" : system.loadavg5,
345 "loadavg15" : system.loadavg15,
346
347 # Memory
348 "mem_total" : system.memory_total,
349 "mem_free" : system.memory_free,
350
351 # Swap
352 "swap_total" : system.swap_total,
353 "swap_free" : system.swap_free,
354
355 # Disk space
356 "space_free" : self.free_space,
357 }
358 self.transport.post("/builders/keepalive", data=data)
359
360 @property
361 def free_space(self):
362 mp = system.get_mountpoint(BUILD_ROOT)
363
364 return mp.space_left
365
366
367 class PakfireWorker(multiprocessing.Process):
368 def __init__(self, config, waiting=None):
369 multiprocessing.Process.__init__(self)
370
371 # Save config.
372 self.config = config
373
374 # Waiting event. Clear if this worker is running a build.
375 self.waiting = multiprocessing.Event()
376 self.waiting.set()
377
378 # Indicates if this worker is running.
379 self.__running = True
380
381 def run(self):
382 # Register signal handlers.
383 self.register_signal_handlers()
384
385 # Create connection to the hub.
386 self.transport = transport.PakfireHubTransport(self.config)
387
388 while self.__running:
389 # Try to get a new build job.
390 job = self.get_new_build_job()
391 if not job:
392 continue
393
394 # If we got a job, we are not waiting anymore.
395 self.waiting.clear()
396
397 # Run the job and return.
398 return self.execute_job(job)
399
400 def shutdown(self):
401 self.__running = False
402
403 # When we are just waiting, we can edit right away.
404 if self.waiting.is_set():
405 log.debug("Exiting immediately")
406 sys.exit(1)
407
408 # XXX figure out what to do, when a build is running.
409
410 # Signal handling.
411
412 def register_signal_handlers(self):
413 signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
414 signal.signal(signal.SIGINT, self.handle_SIGTERM)
415 signal.signal(signal.SIGTERM, self.handle_SIGTERM)
416
417 def handle_SIGCHLD(self, signum, frame):
418 """
419 Handle signal SIGCHLD.
420 """
421 # Must be here so that SIGCHLD won't be propagated to
422 # PakfireDaemon.
423 pass
424
425 def handle_SIGTERM(self, signum, frame):
426 """
427 Handle signal SIGTERM.
428 """
429 self.shutdown()
430
431 def get_new_build_job(self, timeout=600):
432 log.debug("Requesting new job...")
433
434 try:
435 job = self.transport.get_json("/builders/jobs/queue",
436 data={ "timeout" : timeout, }, timeout=timeout)
437
438 if job:
439 return BuildJob(job)
440
441 # As this is a long poll request, it is okay to time out.
442 except TransportMaxTriesExceededError:
443 pass
444
445 def execute_job(self, job):
446 log.debug("Executing job: %s" % job)
447
448 # Call the function that processes the build and try to catch general
449 # exceptions and report them to the server.
450 # If everything goes okay, we tell this the server, too.
451 try:
452 # Create a temporary file and a directory for the resulting files.
453 tmpdir = tempfile.mkdtemp()
454 tmpfile = os.path.join(tmpdir, os.path.basename(job.source_url))
455 logfile = os.path.join(tmpdir, "build.log")
456
457 # Create pakfire configuration instance.
458 config = pakfire.config.ConfigDaemon()
459 config.parse(job.config)
460
461 # Create pakfire instance.
462 p = None
463 try:
464 p = pakfire.base.PakfireBuilder(config=config, arch=job.arch)
465
466 # Download the source package.
467 grabber = pakfire.downloader.PackageDownloader(p)
468 grabber.urlgrab(job.source_url, filename=tmpfile)
469
470 # Check if the download checksum matches (if provided).
471 if job.source_hash_sha512:
472 h = hashlib.new("sha512")
473 f = open(tmpfile, "rb")
474 while True:
475 buf = f.read(BUFFER_SIZE)
476 if not buf:
477 break
478
479 h.update(buf)
480 f.close()
481
482 if not job.source_hash_sha512 == h.hexdigest():
483 raise DownloadError, "Hash check did not succeed."
484
485 # Create a new instance of a build environment.
486 build = pakfire.builder.BuildEnviron(p, tmpfile,
487 release_build=True, build_id=job.id, logfile=logfile)
488
489 try:
490 # Create the build environment.
491 build.start()
492
493 # Update the build status on the server.
494 self.upload_buildroot(job, build.installed_packages)
495 self.update_state(job, "running")
496
497 # Run the build (without install test).
498 build.build(install_test=False)
499
500 # Copy the created packages to the tempdir.
501 build.copy_result(tmpdir)
502
503 finally:
504 # Cleanup the build environment.
505 build.stop()
506
507 # Jippie, build is finished, we are going to upload the files.
508 self.update_state(job, "uploading")
509
510 # Walk through the result directory and upload all (binary) files.
511 # Skip that for test builds.
512 if not job.type == "test":
513 for dir, subdirs, files in os.walk(tmpdir):
514 for file in files:
515 file = os.path.join(dir, file)
516 if file in (logfile, tmpfile,):
517 continue
518
519 self.upload_file(job, file, "package")
520
521 except DependencyError, e:
522 message = "%s: %s" % (e.__class__.__name__, e)
523 self.update_state(job, "dependency_error", message)
524 raise
525
526 except DownloadError, e:
527 message = "%s: %s" % (e.__class__.__name__, e)
528 self.update_state(job, "download_error", message)
529 raise
530
531 finally:
532 if p:
533 p.destroy()
534
535 # Upload the logfile in any case and if it exists.
536 if os.path.exists(logfile):
537 self.upload_file(job, logfile, "log")
538
539 # Cleanup the files we created.
540 pakfire.util.rm(tmpdir)
541
542 except DependencyError:
543 # This has already been reported.
544 raise
545
546 except (DownloadError,):
547 # Do not take any further action for these exceptions.
548 pass
549
550 except (KeyboardInterrupt, SystemExit):
551 self.update_state(job, "aborted")
552
553 except Exception, e:
554 # Format the exception and send it to the server.
555 message = "%s: %s" % (e.__class__.__name__, e)
556
557 self.update_state(job, "failed", message)
558 raise
559
560 else:
561 self.update_state(job, "finished")
562
563 def update_state(self, job, state, message=None):
564 """
565 Update state of the build job on the hub.
566 """
567 data = {
568 "message" : message or "",
569 }
570
571 self.transport.post("/builders/jobs/%s/state/%s" % (job.id, state),
572 data=data)
573
574 def upload_file(self, job, filename, type):
575 assert os.path.exists(filename)
576 assert type in ("package", "log")
577
578 # First upload the file data and save the upload_id.
579 upload_id = self.transport.upload_file(filename)
580
581 data = {
582 "type" : type,
583 }
584
585 # Add the file to the build.
586 self.transport.post("/builders/jobs/%s/addfile/%s" % (job.id, upload_id),
587 data=data)
588
589 def upload_buildroot(self, job, installed_packages):
590 pkgs = []
591 for pkg in installed_packages:
592 pkgs.append((pkg.friendly_name, pkg.uuid))
593
594 data = { "buildroot" : json.dumps(pkgs) }
595
596 self.transport.post("/builders/jobs/%s/buildroot" % job.id, data=data)