13 import pakfire
.builder
15 import pakfire
.downloader
18 from pakfire
.system
import system
23 from pakfire
.constants
import *
24 from pakfire
.i18n
import _
27 log
= logging
.getLogger("pakfire.daemon")
31 Wrapper class for build jobs, that are received from the hub.
33 This makes accessing attributes more easy.
35 def __getattr__(self
, key
):
39 raise AttributeError, key
42 class PakfireDaemon(object):
43 def __init__(self
, config
):
46 # Indicates if this daemon is in running mode.
49 # Create daemon that sends keep-alive messages.
50 self
.keepalive
= PakfireDaemonKeepalive(self
.config
)
52 # List of worker processes.
53 self
.__workers
= [self
.keepalive
]
56 # Number of workers in waiting state.
59 # Number of running workers.
60 self
.max_running
= system
.cpu_count
* 2
62 def run(self
, heartbeat
=30):
66 # Register signal handlers.
67 self
.register_signal_handlers()
69 # Start keepalive process.
70 self
.keepalive
.start()
74 time_started
= time
.time()
76 # Check if keepalive process is still alive.
77 if not self
.keepalive
.is_alive():
78 self
.restart_keepalive(wait
=10)
80 # Spawn a sufficient number of worker processes.
81 self
.spawn_workers_if_needed()
83 # Get runtime of this loop iteration.
84 time_elapsed
= time
.time() - time_started
86 # Wait if the heartbeat time has not been reached, yet.
87 if time_elapsed
< heartbeat
:
88 time
.sleep(heartbeat
- time_elapsed
)
90 # Main loop has ended, but we wait until all workers have finished.
91 self
.terminate_all_workers()
95 Terminates all workers and exists the daemon.
97 if not self
.__running
:
100 log
.info(_("Shutting down..."))
101 self
.__running
= False
103 def restart_keepalive(self
, wait
=None):
104 log
.critial(_("Restarting keepalive process"))
106 # Send SIGTERM to really end the process.
107 self
.keepalive
.terminate()
109 # Wait for the process to terminate.
111 self
.keepalive
.join(wait
)
113 # Remove the keepalive process from the process list.
115 self
.__workers
.remove(self
.keepalive
)
119 # Create a new process and start it.
120 self
.keepalive
= PakfireDaemonKeepalive(self
.config
)
121 self
.keepalive
.start()
123 # Add the process to the process list.
124 self
.__workers
.append(self
.keepalive
)
126 def spawn_workers_if_needed(self
, *args
, **kwargs
):
128 Spawns more workers if needed.
130 # Do not create any more processes when the daemon is shutting down.
131 if not self
.__running
:
134 # Cleanup all other workers.
135 self
.cleanup_workers()
137 # Do not create more workers if there are already enough workers
139 if len(self
.workers
) >= self
.max_running
:
140 log
.warning("More workers running than allowed")
143 # Do nothing, if there is are already enough workers waiting.
144 wanted_waiting_workers
= self
.max_waiting
- len(self
.waiting_workers
)
145 if wanted_waiting_workers
<= 0:
148 # Spawn a new worker.
149 for i
in range(wanted_waiting_workers
):
150 self
.spawn_worker(*args
, **kwargs
)
152 def spawn_worker(self
, *args
, **kwargs
):
154 Spawns a new worker process.
156 worker
= PakfireWorker(config
=self
.config
, *args
, **kwargs
)
159 log
.debug("Spawned new worker process: %s" % worker
)
160 self
.__workers
.append(worker
)
162 def terminate_worker(self
, worker
):
164 Terminates the given worker.
166 log
.warning(_("Terminating worker process: %s") % worker
)
170 def terminate_all_workers(self
):
172 Terminates all workers.
174 # First send SIGTERM to all processes.
175 self
.terminate_worker(self
.keepalive
)
176 for worker
in self
.workers
:
177 self
.terminate_worker(worker
)
179 # Then wait until they all have finished.
180 self
.keepalive
.join()
181 for worker
in self
.workers
:
184 def remove_worker(self
, worker
):
186 Removes a worker from the internal list of worker processes.
188 assert not worker
.is_alive(), "Remove alive worker?"
190 log
.debug("Removing worker: %s" % worker
)
192 self
.__workers
.remove(worker
)
196 def cleanup_workers(self
):
198 Remove workers that are not alive any more.
200 for worker
in self
.workers
:
201 if worker
.is_alive():
204 self
.remove_worker(worker
)
208 return [w
for w
in self
.__workers
if isinstance(w
, PakfireWorker
)]
211 def running_workers(self
):
214 for worker
in self
.workers
:
215 if worker
.waiting
.is_set():
218 workers
.append(worker
)
223 def waiting_workers(self
):
226 for worker
in self
.workers
:
227 if worker
.waiting
.is_set():
228 workers
.append(worker
)
234 def register_signal_handlers(self
):
235 signal
.signal(signal
.SIGCHLD
, self
.handle_SIGCHLD
)
236 signal
.signal(signal
.SIGINT
, self
.handle_SIGTERM
)
237 signal
.signal(signal
.SIGTERM
, self
.handle_SIGTERM
)
239 def handle_SIGCHLD(self
, signum
, frame
):
241 Handle signal SIGCHLD.
243 # Spawn new workers if necessary.
244 self
.spawn_workers_if_needed()
246 def handle_SIGTERM(self
, signum
, frame
):
248 Handle signal SIGTERM.
254 class PakfireDaemonKeepalive(multiprocessing
.Process
):
255 def __init__(self
, config
):
256 multiprocessing
.Process
.__init
__(self
)
261 def run(self
, heartbeat
=30):
262 # Register signal handlers.
263 self
.register_signal_handlers()
265 # Create connection to the hub.
266 self
.transport
= transport
.PakfireHubTransport(self
.config
)
268 # Send our profile to the hub.
269 self
.send_builder_info()
272 time_started
= time
.time()
274 # Send keepalive message.
275 self
.send_keepalive()
277 # Get runtime of this loop iteration.
278 time_elapsed
= time
.time() - time_started
280 # Wait if the heartbeat time has not been reached, yet.
281 if time_elapsed
< heartbeat
:
282 time
.sleep(heartbeat
- time_elapsed
)
286 Ends this process immediately.
292 def register_signal_handlers(self
):
293 signal
.signal(signal
.SIGCHLD
, self
.handle_SIGCHLD
)
294 signal
.signal(signal
.SIGINT
, self
.handle_SIGTERM
)
295 signal
.signal(signal
.SIGTERM
, self
.handle_SIGTERM
)
297 def handle_SIGCHLD(self
, signum
, frame
):
299 Handle signal SIGCHLD.
301 # Must be here so that SIGCHLD won't be propagated to
305 def handle_SIGTERM(self
, signum
, frame
):
307 Handle signal SIGTERM.
312 # Talking to the hub.
314 def send_builder_info(self
):
315 log
.info(_("Sending builder information to hub..."))
319 "cpu_model" : system
.cpu_model
,
320 "cpu_count" : system
.cpu_count
,
321 "cpu_arch" : system
.native_arch
,
322 "cpu_bogomips" : system
.cpu_bogomips
,
325 "mem_total" : system
.memory
,
326 "swap_total" : system
.swap_total
,
329 "pakfire_version" : PAKFIRE_VERSION
,
330 "host_key" : self
.config
.get("signatures", "host_key", None),
331 "os_name" : system
.distro
.pretty_name
,
334 "supported_arches" : ",".join(system
.supported_arches
),
336 self
.transport
.post("/builders/info", data
=data
)
338 def send_keepalive(self
):
339 log
.debug("Sending keepalive message to hub...")
343 "loadavg1" : system
.loadavg1
,
344 "loadavg5" : system
.loadavg5
,
345 "loadavg15" : system
.loadavg15
,
348 "mem_total" : system
.memory_total
,
349 "mem_free" : system
.memory_free
,
352 "swap_total" : system
.swap_total
,
353 "swap_free" : system
.swap_free
,
356 "space_free" : self
.free_space
,
358 self
.transport
.post("/builders/keepalive", data
=data
)
361 def free_space(self
):
362 mp
= system
.get_mountpoint(BUILD_ROOT
)
367 class PakfireWorker(multiprocessing
.Process
):
368 def __init__(self
, config
, waiting
=None):
369 multiprocessing
.Process
.__init
__(self
)
374 # Waiting event. Clear if this worker is running a build.
375 self
.waiting
= multiprocessing
.Event()
378 # Indicates if this worker is running.
379 self
.__running
= True
382 # Register signal handlers.
383 self
.register_signal_handlers()
385 # Create connection to the hub.
386 self
.transport
= transport
.PakfireHubTransport(self
.config
)
388 while self
.__running
:
389 # Try to get a new build job.
390 job
= self
.get_new_build_job()
394 # If we got a job, we are not waiting anymore.
397 # Run the job and return.
398 return self
.execute_job(job
)
401 self
.__running
= False
403 # When we are just waiting, we can edit right away.
404 if self
.waiting
.is_set():
405 log
.debug("Exiting immediately")
408 # XXX figure out what to do, when a build is running.
412 def register_signal_handlers(self
):
413 signal
.signal(signal
.SIGCHLD
, self
.handle_SIGCHLD
)
414 signal
.signal(signal
.SIGINT
, self
.handle_SIGTERM
)
415 signal
.signal(signal
.SIGTERM
, self
.handle_SIGTERM
)
417 def handle_SIGCHLD(self
, signum
, frame
):
419 Handle signal SIGCHLD.
421 # Must be here so that SIGCHLD won't be propagated to
425 def handle_SIGTERM(self
, signum
, frame
):
427 Handle signal SIGTERM.
431 def get_new_build_job(self
, timeout
=600):
432 log
.debug("Requesting new job...")
435 job
= self
.transport
.get_json("/builders/jobs/queue",
436 data
={ "timeout" : timeout
, }, timeout
=timeout
)
441 # As this is a long poll request, it is okay to time out.
442 except TransportMaxTriesExceededError
:
445 def execute_job(self
, job
):
446 log
.debug("Executing job: %s" % job
)
448 # Call the function that processes the build and try to catch general
449 # exceptions and report them to the server.
450 # If everything goes okay, we tell this the server, too.
452 # Create a temporary file and a directory for the resulting files.
453 tmpdir
= tempfile
.mkdtemp()
454 tmpfile
= os
.path
.join(tmpdir
, os
.path
.basename(job
.source_url
))
455 logfile
= os
.path
.join(tmpdir
, "build.log")
457 # Create pakfire configuration instance.
458 config
= pakfire
.config
.ConfigDaemon()
459 config
.parse(job
.config
)
461 # Create pakfire instance.
464 p
= pakfire
.base
.PakfireBuilder(config
=config
, arch
=job
.arch
)
466 # Download the source package.
467 grabber
= pakfire
.downloader
.PackageDownloader(p
)
468 grabber
.urlgrab(job
.source_url
, filename
=tmpfile
)
470 # Check if the download checksum matches (if provided).
471 if job
.source_hash_sha512
:
472 h
= hashlib
.new("sha512")
473 f
= open(tmpfile
, "rb")
475 buf
= f
.read(BUFFER_SIZE
)
482 if not job
.source_hash_sha512
== h
.hexdigest():
483 raise DownloadError
, "Hash check did not succeed."
485 # Create a new instance of a build environment.
486 build
= pakfire
.builder
.BuildEnviron(p
, tmpfile
,
487 release_build
=True, build_id
=job
.id, logfile
=logfile
)
490 # Create the build environment.
493 # Update the build status on the server.
494 self
.upload_buildroot(job
, build
.installed_packages
)
495 self
.update_state(job
, "running")
497 # Run the build (without install test).
498 build
.build(install_test
=False)
500 # Copy the created packages to the tempdir.
501 build
.copy_result(tmpdir
)
504 # Cleanup the build environment.
507 # Jippie, build is finished, we are going to upload the files.
508 self
.update_state(job
, "uploading")
510 # Walk through the result directory and upload all (binary) files.
511 # Skip that for test builds.
512 if not job
.type == "test":
513 for dir, subdirs
, files
in os
.walk(tmpdir
):
515 file = os
.path
.join(dir, file)
516 if file in (logfile
, tmpfile
,):
519 self
.upload_file(job
, file, "package")
521 except DependencyError
, e
:
522 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
523 self
.update_state(job
, "dependency_error", message
)
526 except DownloadError
, e
:
527 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
528 self
.update_state(job
, "download_error", message
)
535 # Upload the logfile in any case and if it exists.
536 if os
.path
.exists(logfile
):
537 self
.upload_file(job
, logfile
, "log")
539 # Cleanup the files we created.
540 pakfire
.util
.rm(tmpdir
)
542 except DependencyError
:
543 # This has already been reported.
546 except (DownloadError
,):
547 # Do not take any further action for these exceptions.
550 except (KeyboardInterrupt, SystemExit):
551 self
.update_state(job
, "aborted")
554 # Format the exception and send it to the server.
555 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
557 self
.update_state(job
, "failed", message
)
561 self
.update_state(job
, "finished")
563 def update_state(self
, job
, state
, message
=None):
565 Update state of the build job on the hub.
568 "message" : message
or "",
571 self
.transport
.post("/builders/jobs/%s/state/%s" % (job
.id, state
),
574 def upload_file(self
, job
, filename
, type):
575 assert os
.path
.exists(filename
)
576 assert type in ("package", "log")
578 # First upload the file data and save the upload_id.
579 upload_id
= self
.transport
.upload_file(filename
)
585 # Add the file to the build.
586 self
.transport
.post("/builders/jobs/%s/addfile/%s" % (job
.id, upload_id
),
589 def upload_buildroot(self
, job
, installed_packages
):
591 for pkg
in installed_packages
:
592 pkgs
.append((pkg
.friendly_name
, pkg
.uuid
))
594 data
= { "buildroot" : json
.dumps(pkgs
) }
596 self
.transport
.post("/builders/jobs/%s/buildroot" % job
.id, data
=data
)