13 import pakfire
.builder
15 import pakfire
.downloader
18 from pakfire
.system
import system
23 from pakfire
.constants
import *
24 from pakfire
.i18n
import _
27 log
= logging
.getLogger("pakfire.daemon")
31 Wrapper class for build jobs, that are received from the hub.
33 This makes accessing attributes more easy.
35 def __getattr__(self
, key
):
39 raise AttributeError, key
42 class PakfireDaemon(object):
43 def __init__(self
, config
):
46 # Indicates if this daemon is in running mode.
49 # Create daemon that sends keep-alive messages.
50 self
.keepalive
= PakfireDaemonKeepalive(self
.config
)
52 # List of worker processes.
53 self
.__workers
= [self
.keepalive
]
56 # Number of workers in waiting state.
59 # Number of running workers.
60 self
.max_running
= system
.cpu_count
* 2
62 def run(self
, heartbeat
=30):
66 # Register signal handlers.
67 self
.register_signal_handlers()
69 # Start keepalive process.
70 self
.keepalive
.start()
74 time_started
= time
.time()
76 # Spawn a sufficient number of worker processes.
77 self
.spawn_workers_if_needed()
79 # Get runtime of this loop iteration.
80 time_elapsed
= time
.time() - time_started
82 # Wait if the heartbeat time has not been reached, yet.
83 if time_elapsed
< heartbeat
:
84 time
.sleep(heartbeat
- time_elapsed
)
86 # Main loop has ended, but we wait until all workers have finished.
87 self
.terminate_all_workers()
91 Terminates all workers and exists the daemon.
93 if not self
.__running
:
96 log
.info(_("Shutting down..."))
97 self
.__running
= False
99 def spawn_workers_if_needed(self
, *args
, **kwargs
):
101 Spawns more workers if needed.
103 # Do not create any more processes when the daemon is shutting down.
104 if not self
.__running
:
107 # Cleanup all other workers.
108 self
.cleanup_workers()
110 # Do not create more workers if there are already enough workers
112 if len(self
.workers
) >= self
.max_running
:
113 log
.warning("More workers running than allowed")
116 # Do nothing, if there is are already enough workers waiting.
117 wanted_waiting_workers
= self
.max_waiting
- len(self
.waiting_workers
)
118 if wanted_waiting_workers
<= 0:
121 # Spawn a new worker.
122 for i
in range(wanted_waiting_workers
):
123 self
.spawn_worker(*args
, **kwargs
)
125 def spawn_worker(self
, *args
, **kwargs
):
127 Spawns a new worker process.
129 worker
= PakfireWorker(config
=self
.config
, *args
, **kwargs
)
132 log
.debug("Spawned new worker process: %s" % worker
)
133 self
.__workers
.append(worker
)
135 def terminate_worker(self
, worker
):
137 Terminates the given worker.
139 log
.warning(_("Terminating worker process: %s") % worker
)
143 def terminate_all_workers(self
):
145 Terminates all workers.
147 # First send SIGTERM to all processes.
148 self
.terminate_worker(self
.keepalive
)
149 for worker
in self
.workers
:
150 self
.terminate_worker(worker
)
152 # Then wait until they all have finished.
153 self
.keepalive
.join()
154 for worker
in self
.workers
:
157 def remove_worker(self
, worker
):
159 Removes a worker from the internal list of worker processes.
161 assert not worker
.is_alive(), "Remove alive worker?"
163 log
.debug("Removing worker: %s" % worker
)
165 self
.__workers
.remove(worker
)
169 def cleanup_workers(self
):
171 Remove workers that are not alive any more.
173 for worker
in self
.workers
:
174 if worker
.is_alive():
177 self
.remove_worker(worker
)
181 return [w
for w
in self
.__workers
if isinstance(w
, PakfireWorker
)]
184 def running_workers(self
):
187 for worker
in self
.workers
:
188 if worker
.waiting
.is_set():
191 workers
.append(worker
)
196 def waiting_workers(self
):
199 for worker
in self
.workers
:
200 if worker
.waiting
.is_set():
201 workers
.append(worker
)
207 def register_signal_handlers(self
):
208 signal
.signal(signal
.SIGCHLD
, self
.handle_SIGCHLD
)
209 signal
.signal(signal
.SIGINT
, self
.handle_SIGTERM
)
210 signal
.signal(signal
.SIGTERM
, self
.handle_SIGTERM
)
212 def handle_SIGCHLD(self
, signum
, frame
):
214 Handle signal SIGCHLD.
216 # Spawn new workers if necessary.
217 self
.spawn_workers_if_needed()
219 def handle_SIGTERM(self
, signum
, frame
):
221 Handle signal SIGTERM.
227 class PakfireDaemonKeepalive(multiprocessing
.Process
):
228 def __init__(self
, config
):
229 multiprocessing
.Process
.__init
__(self
)
234 def run(self
, heartbeat
=30):
235 # Register signal handlers.
236 self
.register_signal_handlers()
238 # Create connection to the hub.
239 self
.transport
= transport
.PakfireHubTransport(self
.config
)
241 # Send our profile to the hub.
242 self
.send_builder_info()
245 time_started
= time
.time()
247 # Send keepalive message.
248 self
.send_keepalive()
250 # Get runtime of this loop iteration.
251 time_elapsed
= time
.time() - time_started
253 # Wait if the heartbeat time has not been reached, yet.
254 if time_elapsed
< heartbeat
:
255 time
.sleep(heartbeat
- time_elapsed
)
259 Ends this process immediately.
265 def register_signal_handlers(self
):
266 signal
.signal(signal
.SIGCHLD
, self
.handle_SIGCHLD
)
267 signal
.signal(signal
.SIGINT
, self
.handle_SIGTERM
)
268 signal
.signal(signal
.SIGTERM
, self
.handle_SIGTERM
)
270 def handle_SIGCHLD(self
, signum
, frame
):
272 Handle signal SIGCHLD.
274 # Must be here so that SIGCHLD won't be propagated to
278 def handle_SIGTERM(self
, signum
, frame
):
280 Handle signal SIGTERM.
285 # Talking to the hub.
287 def send_builder_info(self
):
288 log
.info(_("Sending builder information to hub..."))
292 "cpu_model" : system
.cpu_model
,
293 "cpu_count" : system
.cpu_count
,
294 "cpu_arch" : system
.native_arch
,
295 "cpu_bogomips" : system
.cpu_bogomips
,
298 "mem_total" : system
.memory
,
299 "swap_total" : system
.swap_total
,
302 "pakfire_version" : PAKFIRE_VERSION
,
303 "host_key" : self
.config
.get("signatures", "host_key", None),
304 "os_name" : system
.distro
.pretty_name
,
307 "supported_arches" : ",".join(system
.supported_arches
),
309 self
.transport
.post("/builders/info", data
=data
)
311 def send_keepalive(self
):
312 log
.debug("Sending keepalive message to hub...")
316 "loadavg1" : system
.loadavg1
,
317 "loadavg5" : system
.loadavg5
,
318 "loadavg15" : system
.loadavg15
,
321 "mem_total" : system
.memory_total
,
322 "mem_free" : system
.memory_free
,
325 "swap_total" : system
.swap_total
,
326 "swap_free" : system
.swap_free
,
329 "space_free" : self
.free_space
,
331 self
.transport
.post("/builders/keepalive", data
=data
)
334 def free_space(self
):
335 mp
= system
.get_mountpoint(BUILD_ROOT
)
340 class PakfireWorker(multiprocessing
.Process
):
341 def __init__(self
, config
, waiting
=None):
342 multiprocessing
.Process
.__init
__(self
)
347 # Waiting event. Clear if this worker is running a build.
348 self
.waiting
= multiprocessing
.Event()
351 # Indicates if this worker is running.
352 self
.__running
= True
355 # Register signal handlers.
356 self
.register_signal_handlers()
358 # Create connection to the hub.
359 self
.transport
= transport
.PakfireHubTransport(self
.config
)
361 while self
.__running
:
362 # Try to get a new build job.
363 job
= self
.get_new_build_job()
367 # If we got a job, we are not waiting anymore.
370 # Run the job and return.
371 return self
.execute_job(job
)
374 self
.__running
= False
376 # When we are just waiting, we can edit right away.
377 if self
.waiting
.is_set():
378 log
.debug("Exiting immediately")
381 # XXX figure out what to do, when a build is running.
385 def register_signal_handlers(self
):
386 signal
.signal(signal
.SIGCHLD
, self
.handle_SIGCHLD
)
387 signal
.signal(signal
.SIGINT
, self
.handle_SIGTERM
)
388 signal
.signal(signal
.SIGTERM
, self
.handle_SIGTERM
)
390 def handle_SIGCHLD(self
, signum
, frame
):
392 Handle signal SIGCHLD.
394 # Must be here so that SIGCHLD won't be propagated to
398 def handle_SIGTERM(self
, signum
, frame
):
400 Handle signal SIGTERM.
404 def get_new_build_job(self
, timeout
=600):
405 log
.debug("Requesting new job...")
408 job
= self
.transport
.get_json("/builders/jobs/queue",
409 data
={ "timeout" : timeout
, }, timeout
=timeout
)
414 # As this is a long poll request, it is okay to time out.
415 except TransportMaxTriesExceededError
:
418 def execute_job(self
, job
):
419 log
.debug("Executing job: %s" % job
)
421 # Call the function that processes the build and try to catch general
422 # exceptions and report them to the server.
423 # If everything goes okay, we tell this the server, too.
425 # Create a temporary file and a directory for the resulting files.
426 tmpdir
= tempfile
.mkdtemp()
427 tmpfile
= os
.path
.join(tmpdir
, os
.path
.basename(job
.source_url
))
428 logfile
= os
.path
.join(tmpdir
, "build.log")
430 # Create pakfire configuration instance.
431 config
= pakfire
.config
.ConfigDaemon()
432 config
.parse(job
.config
)
434 # Create pakfire instance.
437 p
= pakfire
.base
.PakfireBuilder(config
=config
, arch
=job
.arch
)
439 # Download the source package.
440 grabber
= pakfire
.downloader
.PackageDownloader(p
)
441 grabber
.urlgrab(job
.source_url
, filename
=tmpfile
)
443 # Check if the download checksum matches (if provided).
444 if job
.source_hash_sha512
:
445 h
= hashlib
.new("sha512")
446 f
= open(tmpfile
, "rb")
448 buf
= f
.read(BUFFER_SIZE
)
455 if not job
.source_hash_sha512
== h
.hexdigest():
456 raise DownloadError
, "Hash check did not succeed."
458 # Create a new instance of a build environment.
459 build
= pakfire
.builder
.BuildEnviron(p
, tmpfile
,
460 release_build
=True, build_id
=job
.id, logfile
=logfile
)
463 # Create the build environment.
466 # Update the build status on the server.
467 self
.upload_buildroot(job
, build
.installed_packages
)
468 self
.update_state(job
, "running")
470 # Run the build (without install test).
471 build
.build(install_test
=False)
473 # Copy the created packages to the tempdir.
474 build
.copy_result(tmpdir
)
477 # Cleanup the build environment.
480 # Jippie, build is finished, we are going to upload the files.
481 self
.update_state(job
, "uploading")
483 # Walk through the result directory and upload all (binary) files.
484 # Skip that for test builds.
485 if not job
.type == "test":
486 for dir, subdirs
, files
in os
.walk(tmpdir
):
488 file = os
.path
.join(dir, file)
489 if file in (logfile
, tmpfile
,):
492 self
.upload_file(job
, file, "package")
494 except DependencyError
, e
:
495 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
496 self
.update_state(job
, "dependency_error", message
)
499 except DownloadError
, e
:
500 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
501 self
.update_state(job
, "download_error", message
)
508 # Upload the logfile in any case and if it exists.
509 if os
.path
.exists(logfile
):
510 self
.upload_file(job
, logfile
, "log")
512 # Cleanup the files we created.
513 pakfire
.util
.rm(tmpdir
)
515 except DependencyError
:
516 # This has already been reported.
519 except (DownloadError
,):
520 # Do not take any further action for these exceptions.
523 except (KeyboardInterrupt, SystemExit):
524 self
.update_state(job
, "aborted")
527 # Format the exception and send it to the server.
528 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
530 self
.update_state(job
, "failed", message
)
534 self
.update_state(job
, "finished")
536 def update_state(self
, job
, state
, message
=None):
538 Update state of the build job on the hub.
541 "message" : message
or "",
544 self
.transport
.post("/builders/jobs/%s/state/%s" % (job
.id, state
),
547 def upload_file(self
, job
, filename
, type):
548 assert os
.path
.exists(filename
)
549 assert type in ("package", "log")
551 # First upload the file data and save the upload_id.
552 upload_id
= self
.transport
.upload_file(filename
)
558 # Add the file to the build.
559 self
.transport
.post("/builders/jobs/%s/addfile/%s" % (job
.id, upload_id
),
562 def upload_buildroot(self
, job
, installed_packages
):
564 for pkg
in installed_packages
:
565 pkgs
.append((pkg
.friendly_name
, pkg
.uuid
))
567 data
= { "buildroot" : json
.dumps(pkgs
) }
569 self
.transport
.post("/builders/jobs/%s/buildroot" % job
.id, data
=data
)