]> git.ipfire.org Git - people/stevee/pakfire.git/blob - python/pakfire/daemon.py
daemon: Check if build root is read-write mounted.
[people/stevee/pakfire.git] / python / pakfire / daemon.py
1 #!/usr/bin/python
2
3 import hashlib
4 import json
5 import multiprocessing
6 import os
7 import signal
8 import sys
9 import tempfile
10 import time
11
12 import pakfire.base
13 import pakfire.builder
14 import pakfire.config
15 import pakfire.downloader
16 import pakfire.system
17 import pakfire.util
18 from pakfire.system import system
19
20 import base
21 import transport
22
23 from pakfire.constants import *
24 from pakfire.i18n import _
25
26 import logging
27 log = logging.getLogger("pakfire.daemon")
28
29 class BuildJob(dict):
30 """
31 Wrapper class for build jobs, that are received from the hub.
32
33 This makes accessing attributes more easy.
34 """
35 def __getattr__(self, key):
36 try:
37 return self[key]
38 except KeyError:
39 raise AttributeError, key
40
41
42 class PakfireDaemon(object):
43 def __init__(self, config):
44 self.config = config
45
46 # Indicates if this daemon is in running mode.
47 self.__running = True
48
49 # Create daemon that sends keep-alive messages.
50 self.keepalive = PakfireDaemonKeepalive(self.config)
51
52 # List of worker processes.
53 self.__workers = [self.keepalive]
54
55 ### Configuration
56 # Number of workers in waiting state.
57 self.max_waiting = 1
58
59 # Number of running workers.
60 self.max_running = system.cpu_count * 2
61
62 def run(self, heartbeat=30):
63 """
64 Main loop.
65 """
66 # Register signal handlers.
67 self.register_signal_handlers()
68
69 # Start keepalive process.
70 self.keepalive.start()
71
72 # Run main loop.
73 while self.__running:
74 time_started = time.time()
75
76 # Check if keepalive process is still alive.
77 if not self.keepalive.is_alive():
78 self.restart_keepalive(wait=10)
79
80 # Spawn a sufficient number of worker processes.
81 self.spawn_workers_if_needed()
82
83 # Get runtime of this loop iteration.
84 time_elapsed = time.time() - time_started
85
86 # Wait if the heartbeat time has not been reached, yet.
87 if time_elapsed < heartbeat:
88 time.sleep(heartbeat - time_elapsed)
89
90 # Main loop has ended, but we wait until all workers have finished.
91 self.terminate_all_workers()
92
93 def shutdown(self):
94 """
95 Terminates all workers and exists the daemon.
96 """
97 if not self.__running:
98 return
99
100 log.info(_("Shutting down..."))
101 self.__running = False
102
103 def restart_keepalive(self, wait=None):
104 log.critical(_("Restarting keepalive process"))
105
106 # Send SIGTERM to really end the process.
107 self.keepalive.terminate()
108
109 # Wait for the process to terminate.
110 if wait:
111 self.keepalive.join(wait)
112
113 # Remove the keepalive process from the process list.
114 try:
115 self.__workers.remove(self.keepalive)
116 except ValueError:
117 pass
118
119 # Create a new process and start it.
120 self.keepalive = PakfireDaemonKeepalive(self.config)
121 self.keepalive.start()
122
123 # Add the process to the process list.
124 self.__workers.append(self.keepalive)
125
126 def spawn_workers_if_needed(self, *args, **kwargs):
127 """
128 Spawns more workers if needed.
129 """
130 # Do not create any more processes when the daemon is shutting down.
131 if not self.__running:
132 return
133
134 # Cleanup all other workers.
135 self.cleanup_workers()
136
137 # Do not create more workers if there are already enough workers
138 # active.
139 if len(self.workers) >= self.max_running:
140 log.warning("More workers running than allowed")
141 return
142
143 # Do nothing, if there is are already enough workers waiting.
144 wanted_waiting_workers = self.max_waiting - len(self.waiting_workers)
145 if wanted_waiting_workers <= 0:
146 return
147
148 # Spawn a new worker.
149 for i in range(wanted_waiting_workers):
150 self.spawn_worker(*args, **kwargs)
151
152 def spawn_worker(self, *args, **kwargs):
153 """
154 Spawns a new worker process.
155 """
156 worker = PakfireWorker(config=self.config, *args, **kwargs)
157 worker.start()
158
159 log.debug("Spawned new worker process: %s" % worker)
160 self.__workers.append(worker)
161
162 def terminate_worker(self, worker):
163 """
164 Terminates the given worker.
165 """
166 log.warning(_("Terminating worker process: %s") % worker)
167
168 worker.terminate()
169
170 def terminate_all_workers(self):
171 """
172 Terminates all workers.
173 """
174 # First send SIGTERM to all processes.
175 self.terminate_worker(self.keepalive)
176 for worker in self.workers:
177 self.terminate_worker(worker)
178
179 # Then wait until they all have finished.
180 self.keepalive.join()
181 for worker in self.workers:
182 worker.join()
183
184 def remove_worker(self, worker):
185 """
186 Removes a worker from the internal list of worker processes.
187 """
188 assert not worker.is_alive(), "Remove alive worker?"
189
190 log.debug("Removing worker: %s" % worker)
191 try:
192 self.__workers.remove(worker)
193 except:
194 pass
195
196 def cleanup_workers(self):
197 """
198 Remove workers that are not alive any more.
199 """
200 for worker in self.workers:
201 if worker.is_alive():
202 continue
203
204 self.remove_worker(worker)
205
206 @property
207 def workers(self):
208 return [w for w in self.__workers if isinstance(w, PakfireWorker)]
209
210 @property
211 def running_workers(self):
212 workers = []
213
214 for worker in self.workers:
215 if worker.waiting.is_set():
216 continue
217
218 workers.append(worker)
219
220 return workers
221
222 @property
223 def waiting_workers(self):
224 workers = []
225
226 for worker in self.workers:
227 if worker.waiting.is_set():
228 workers.append(worker)
229
230 return workers
231
232 # Signal handling.
233
234 def register_signal_handlers(self):
235 signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
236 signal.signal(signal.SIGINT, self.handle_SIGTERM)
237 signal.signal(signal.SIGTERM, self.handle_SIGTERM)
238
239 def handle_SIGCHLD(self, signum, frame):
240 """
241 Handle signal SIGCHLD.
242 """
243 # Spawn new workers if necessary.
244 self.spawn_workers_if_needed()
245
246 def handle_SIGTERM(self, signum, frame):
247 """
248 Handle signal SIGTERM.
249 """
250 # Just shut down.
251 self.shutdown()
252
253
254 class PakfireDaemonKeepalive(multiprocessing.Process):
255 def __init__(self, config):
256 multiprocessing.Process.__init__(self)
257
258 # Save config.
259 self.config = config
260
261 def run(self, heartbeat=30):
262 # Register signal handlers.
263 self.register_signal_handlers()
264
265 # Create connection to the hub.
266 self.transport = transport.PakfireHubTransport(self.config)
267
268 # Send our profile to the hub.
269 self.send_builder_info()
270
271 while True:
272 time_started = time.time()
273
274 # Send keepalive message.
275 self.send_keepalive()
276
277 # Get runtime of this loop iteration.
278 time_elapsed = time.time() - time_started
279
280 # Wait if the heartbeat time has not been reached, yet.
281 if time_elapsed < heartbeat:
282 time.sleep(heartbeat - time_elapsed)
283
284 def shutdown(self):
285 """
286 Ends this process immediately.
287 """
288 sys.exit(1)
289
290 # Signal handling.
291
292 def register_signal_handlers(self):
293 signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
294 signal.signal(signal.SIGINT, self.handle_SIGTERM)
295 signal.signal(signal.SIGTERM, self.handle_SIGTERM)
296
297 def handle_SIGCHLD(self, signum, frame):
298 """
299 Handle signal SIGCHLD.
300 """
301 # Must be here so that SIGCHLD won't be propagated to
302 # PakfireDaemon.
303 pass
304
305 def handle_SIGTERM(self, signum, frame):
306 """
307 Handle signal SIGTERM.
308 """
309 # Just shut down.
310 self.shutdown()
311
312 # Talking to the hub.
313
314 def send_builder_info(self):
315 log.info(_("Sending builder information to hub..."))
316
317 data = {
318 # CPU info
319 "cpu_model" : system.cpu_model,
320 "cpu_count" : system.cpu_count,
321 "cpu_arch" : system.native_arch,
322 "cpu_bogomips" : system.cpu_bogomips,
323
324 # Memory + swap
325 "mem_total" : system.memory,
326 "swap_total" : system.swap_total,
327
328 # Pakfire + OS
329 "pakfire_version" : PAKFIRE_VERSION,
330 "host_key" : self.config.get("signatures", "host_key", None),
331 "os_name" : system.distro.pretty_name,
332
333 # Supported arches
334 "supported_arches" : ",".join(system.supported_arches),
335 }
336 self.transport.post("/builders/info", data=data)
337
338 def send_keepalive(self):
339 log.debug("Sending keepalive message to hub...")
340
341 data = {
342 # Load average.
343 "loadavg1" : system.loadavg1,
344 "loadavg5" : system.loadavg5,
345 "loadavg15" : system.loadavg15,
346
347 # Memory
348 "mem_total" : system.memory_total,
349 "mem_free" : system.memory_free,
350
351 # Swap
352 "swap_total" : system.swap_total,
353 "swap_free" : system.swap_free,
354
355 # Disk space
356 "space_free" : self.free_space,
357 }
358 self.transport.post("/builders/keepalive", data=data)
359
360 @property
361 def free_space(self):
362 mp = system.get_mountpoint(BUILD_ROOT)
363
364 return mp.space_left
365
366
367 class PakfireWorker(multiprocessing.Process):
368 def __init__(self, config, waiting=None):
369 multiprocessing.Process.__init__(self)
370
371 # Save config.
372 self.config = config
373
374 # Waiting event. Clear if this worker is running a build.
375 self.waiting = multiprocessing.Event()
376 self.waiting.set()
377
378 # Indicates if this worker is running.
379 self.__running = True
380
381 def run(self):
382 # Register signal handlers.
383 self.register_signal_handlers()
384
385 # Create connection to the hub.
386 self.transport = transport.PakfireHubTransport(self.config)
387
388 while self.__running:
389 # Check if the build root is file.
390 if not self.check_buildroot():
391 time.sleep(60)
392 continue
393
394 # Try to get a new build job.
395 job = self.get_new_build_job()
396 if not job:
397 continue
398
399 # If we got a job, we are not waiting anymore.
400 self.waiting.clear()
401
402 # Run the job and return.
403 return self.execute_job(job)
404
405 def shutdown(self):
406 self.__running = False
407
408 # When we are just waiting, we can edit right away.
409 if self.waiting.is_set():
410 log.debug("Exiting immediately")
411 sys.exit(1)
412
413 # XXX figure out what to do, when a build is running.
414
415 # Signal handling.
416
417 def register_signal_handlers(self):
418 signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
419 signal.signal(signal.SIGINT, self.handle_SIGTERM)
420 signal.signal(signal.SIGTERM, self.handle_SIGTERM)
421
422 def handle_SIGCHLD(self, signum, frame):
423 """
424 Handle signal SIGCHLD.
425 """
426 # Must be here so that SIGCHLD won't be propagated to
427 # PakfireDaemon.
428 pass
429
430 def handle_SIGTERM(self, signum, frame):
431 """
432 Handle signal SIGTERM.
433 """
434 self.shutdown()
435
436 def check_buildroot(self):
437 """
438 Checks if the buildroot is fine.
439 """
440 mp = system.get_mountpoint(BUILD_ROOT)
441
442 # Check if the mountpoint is read-only.
443 if mp.is_readonly():
444 log.warning("Build directory is read-only: %s" % BUILD_ROOT)
445
446 # Trying to remount.
447 try:
448 mp.remount("rw")
449 except:
450 log.warning("Remounting (rw) %s has failed" % BUILD_ROOT)
451 return False
452 else:
453 log.warning("Successfully remounted as rw: %s" % BUILD_ROOT)
454
455 def get_new_build_job(self, timeout=600):
456 log.debug("Requesting new job...")
457 try:
458 job = self.transport.get_json("/builders/jobs/queue",
459 data={ "timeout" : timeout, }, timeout=timeout)
460
461 if job:
462 return BuildJob(job)
463
464 # As this is a long poll request, it is okay to time out.
465 except TransportMaxTriesExceededError:
466 pass
467
468 def execute_job(self, job):
469 log.debug("Executing job: %s" % job)
470
471 # Call the function that processes the build and try to catch general
472 # exceptions and report them to the server.
473 # If everything goes okay, we tell this the server, too.
474 try:
475 # Create a temporary file and a directory for the resulting files.
476 tmpdir = tempfile.mkdtemp()
477 tmpfile = os.path.join(tmpdir, os.path.basename(job.source_url))
478 logfile = os.path.join(tmpdir, "build.log")
479
480 # Create pakfire configuration instance.
481 config = pakfire.config.ConfigDaemon()
482 config.parse(job.config)
483
484 # Create pakfire instance.
485 p = None
486 try:
487 p = pakfire.base.PakfireBuilder(config=config, arch=job.arch)
488
489 # Download the source package.
490 grabber = pakfire.downloader.PackageDownloader(p)
491 grabber.urlgrab(job.source_url, filename=tmpfile)
492
493 # Check if the download checksum matches (if provided).
494 if job.source_hash_sha512:
495 h = hashlib.new("sha512")
496 f = open(tmpfile, "rb")
497 while True:
498 buf = f.read(BUFFER_SIZE)
499 if not buf:
500 break
501
502 h.update(buf)
503 f.close()
504
505 if not job.source_hash_sha512 == h.hexdigest():
506 raise DownloadError, "Hash check did not succeed."
507
508 # Create a new instance of a build environment.
509 build = pakfire.builder.BuildEnviron(p, tmpfile,
510 release_build=True, build_id=job.id, logfile=logfile)
511
512 try:
513 # Create the build environment.
514 build.start()
515
516 # Update the build status on the server.
517 self.upload_buildroot(job, build.installed_packages)
518 self.update_state(job, "running")
519
520 # Run the build (without install test).
521 build.build(install_test=False)
522
523 # Copy the created packages to the tempdir.
524 build.copy_result(tmpdir)
525
526 finally:
527 # Cleanup the build environment.
528 build.stop()
529
530 # Jippie, build is finished, we are going to upload the files.
531 self.update_state(job, "uploading")
532
533 # Walk through the result directory and upload all (binary) files.
534 # Skip that for test builds.
535 if not job.type == "test":
536 for dir, subdirs, files in os.walk(tmpdir):
537 for file in files:
538 file = os.path.join(dir, file)
539 if file in (logfile, tmpfile,):
540 continue
541
542 self.upload_file(job, file, "package")
543
544 except DependencyError, e:
545 message = "%s: %s" % (e.__class__.__name__, e)
546 self.update_state(job, "dependency_error", message)
547 raise
548
549 except DownloadError, e:
550 message = "%s: %s" % (e.__class__.__name__, e)
551 self.update_state(job, "download_error", message)
552 raise
553
554 finally:
555 if p:
556 p.destroy()
557
558 # Upload the logfile in any case and if it exists.
559 if os.path.exists(logfile):
560 self.upload_file(job, logfile, "log")
561
562 # Cleanup the files we created.
563 pakfire.util.rm(tmpdir)
564
565 except DependencyError:
566 # This has already been reported.
567 raise
568
569 except (DownloadError,):
570 # Do not take any further action for these exceptions.
571 pass
572
573 except (KeyboardInterrupt, SystemExit):
574 self.update_state(job, "aborted")
575
576 except Exception, e:
577 # Format the exception and send it to the server.
578 message = "%s: %s" % (e.__class__.__name__, e)
579
580 self.update_state(job, "failed", message)
581 raise
582
583 else:
584 self.update_state(job, "finished")
585
586 def update_state(self, job, state, message=None):
587 """
588 Update state of the build job on the hub.
589 """
590 data = {
591 "message" : message or "",
592 }
593
594 self.transport.post("/builders/jobs/%s/state/%s" % (job.id, state),
595 data=data)
596
597 def upload_file(self, job, filename, type):
598 assert os.path.exists(filename)
599 assert type in ("package", "log")
600
601 # First upload the file data and save the upload_id.
602 upload_id = self.transport.upload_file(filename)
603
604 data = {
605 "type" : type,
606 }
607
608 # Add the file to the build.
609 self.transport.post("/builders/jobs/%s/addfile/%s" % (job.id, upload_id),
610 data=data)
611
612 def upload_buildroot(self, job, installed_packages):
613 pkgs = []
614 for pkg in installed_packages:
615 pkgs.append((pkg.friendly_name, pkg.uuid))
616
617 data = { "buildroot" : json.dumps(pkgs) }
618
619 self.transport.post("/builders/jobs/%s/buildroot" % job.id, data=data)