]>
git.ipfire.org Git - pakfire.git/blob - python/pakfire/client/builder.py
5381efe08bf7c82a8e51bbe93aae0e0cb837952b
11 import pakfire
.builder
13 import pakfire
.downloader
16 from pakfire
.system
import system
20 from pakfire
.constants
import *
23 log
= logging
.getLogger("pakfire.client")
25 def fork_builder(*args
, **kwargs
):
27 Wrapper that runs ClientBuilder in a new process and catches
28 any exception to report it to the main process.
31 # Create new instance of the builder.
32 cb
= ClientBuilder(*args
, **kwargs
)
38 # XXX catch the exception and log it.
41 # End the process with an exit code.
45 class PakfireDaemon(object):
47 The PakfireDaemon class that creates a a new process per build
48 job and also handles the keepalive/abort stuff.
50 def __init__(self
, server
, hostname
, secret
):
51 self
.client
= base
.PakfireBuilderClient(server
, hostname
, secret
)
52 self
.conn
= self
.client
.conn
54 # Save login data (to create child processes).
56 self
.hostname
= hostname
57 self
.__secret
= secret
59 # A list with all running processes.
63 # Save when last keepalive was sent.
64 self
._last
_keepalive
= 0
66 # Send an initial keepalive message.
69 def run(self
, heartbeat
=1, max_processes
=None):
70 # By default do not start more than two processes per CPU core.
71 if max_processes
is None:
72 max_processes
= system
.cpu_count
* 2
73 log
.debug("Maximum number of simultaneous processes is: %s" % max_processes
)
75 # Indicates when to try to request a new job or aborted builds.
77 last_abort_request
= 0
81 # Send the keepalive regularly.
84 # Remove all finished builds.
85 # "removed" indicates, if a process has actually finished.
86 removed
= self
.remove_finished_builders()
88 # If a build slot was freed, search immediately for a new job.
93 if time
.time() - last_abort_request
>= 60:
94 aborted
= self
.kill_aborted_jobs()
96 # If a build slot was freed, search immediately for a new job.
100 last_abort_request
= time
.time()
102 # Check if the maximum number of processes was reached.
103 # Actually the hub does manage this but this is an emergency
104 # condition if anything goes wrong.
105 if self
.num_processes
>= max_processes
:
106 log
.debug("Reached maximum number of allowed processes (%s)." % max_processes
)
108 time
.sleep(heartbeat
)
112 if time
.time() - last_job_request
>= 60 and not self
.has_overload():
113 # If the last job request is older than a minute and we don't
114 # have too much load, we go and check if there is something
118 # If we got a job, we start a child process to work on it.
120 log
.debug("Got a new job.")
121 self
.fork_builder(job
)
123 log
.debug("No new job.")
125 # Update the time when we requested a job.
126 last_job_request
= time
.time()
128 # Wait a moment before starting over.
129 time
.sleep(heartbeat
)
133 Shut down the daemon.
134 This means to kill all child processes.
136 The method blocks until all processes are shut down.
138 for process
in self
.processes
:
139 log
.info("Sending %s to terminate..." % process
)
143 log
.info("No processes to kill. Shutting down immediately.")
145 while self
.processes
:
146 log
.debug("%s process(es) is/are still running..." % len(self
.processes
))
148 for process
in self
.processes
[:]:
149 if not process
.is_alive():
150 # The process has terminated.
151 log
.info("Process %s terminated with exit code: %s" % \
152 (process
, process
.exitcode
))
154 self
.processes
.remove(process
)
157 def num_processes(self
):
158 # Return the number of processes.
159 return len(self
.processes
)
162 def free_space(self
):
163 mp
= system
.get_mountpoint(BUILD_ROOT
)
169 Get a build job from the hub.
171 if not self
.free_space
>= 2 * 1024**3:
172 log
.warning(_("Less than 2GB of free space. Cannot request a new job."))
175 log
.info("Requesting a new job from the server...")
177 # Get some information about this system.
178 s
= pakfire
.system
.System()
180 # Fetch a build job from the hub.
181 return self
.client
.conn
.build_get_job(s
.supported_arches
)
183 def has_overload(self
):
185 Checks, if the load average is not too high.
187 On this is to be decided if a new job is taken.
190 load1
, load5
, load15
= os
.getloadavg()
192 # Could not determine the current loadavg. In that case we
193 # assume that we don't have overload.
196 # If there are more than 2 processes in the process queue per CPU
197 # core we will assume that the system has heavy load and to not request
199 return load5
>= system
.cpu_count
* 2
201 def send_keepalive(self
):
203 When triggered, this method sends a keepalive to the hub.
205 # Do not send a keepalive more often than twice a minute.
206 if time
.time() - self
._last
_keepalive
< 30:
209 free_space
= self
.free_space
/ 1024**2
211 self
.client
.send_keepalive(
212 overload
=self
.has_overload(),
213 free_space
=free_space
,
215 self
._last
_keepalive
= time
.time()
217 def remove_finished_builders(self
):
218 # Return if any processes have been removed.
221 # Search for any finished processes.
222 for process
in self
.processes
[:]:
223 # If the process is not alive anymore...
224 if not process
.is_alive():
227 # ... check the exit code and log a message on errors.
228 if process
.exitcode
== 0:
229 log
.debug("Process %s exited normally." % process
)
231 elif process
.exitcode
> 0:
232 log
.error("Process did not exit normally: %s code: %s" \
233 % (process
, process
.exitcode
))
235 elif process
.exitcode
< 0:
236 log
.error("Process killed by signal: %s: code: %s" \
237 % (process
, process
.exitcode
))
239 # If a program has crashed, we send that to the hub.
240 job_id
= self
.pid2jobid
.get(process
.pid
, None)
242 self
.conn
.build_job_crashed(job_id
, process
.exitcode
)
244 # Finally, remove the process from the process list.
245 self
.processes
.remove(process
)
249 def kill_aborted_jobs(self
):
250 log
.debug("Requesting aborted jobs...")
252 # Get a list of running job ids:
253 running_jobs
= self
.pid2jobid
.values()
255 # If there are no running jobs, there is nothing to do.
259 # Ask the hub for any build jobs to abort.
260 aborted_jobs
= self
.conn
.build_jobs_aborted(running_jobs
)
262 # If no build jobs were aborted, there is nothing to do.
266 for process
in self
.processes
[:]:
267 job_id
= self
.pid2jobid
.get(process
.pid
, None)
268 if job_id
and job_id
in aborted_jobs
:
271 log
.info("Killing process %s which was aborted by the user." \
275 # Remove the process from the process list to avoid
276 # that is will be cleaned up in the normal way.
277 self
.processes
.remove(process
)
281 def fork_builder(self
, job
):
283 For a new child process to create a new independent builder.
285 # Create the Process object.
286 process
= multiprocessing
.Process(target
=fork_builder
,
287 args
=(self
.server
, self
.hostname
, self
.__secret
, job
))
288 # The process is running in daemon mode so it will try to kill
289 # all child processes when exiting.
290 process
.daemon
= True
294 log
.info("Started new process %s with PID %s." % (process
, process
.pid
))
296 # Save the PID and the build id to track down
298 self
.pid2jobid
[process
.pid
] = job
.get("id", None)
300 # Append it to the process list.
301 self
.processes
.append(process
)
304 class ClientBuilder(object):
305 def __init__(self
, server
, hostname
, secret
, job
):
306 self
.client
= base
.PakfireBuilderClient(server
, hostname
, secret
)
307 self
.conn
= self
.client
.conn
309 # Store the information sent by the server here.
312 def update_state(self
, state
, message
=None):
313 self
.conn
.build_job_update_state(self
.build_id
, state
, message
)
315 def upload_file(self
, filename
, type):
316 assert os
.path
.exists(filename
)
317 assert type in ("package", "log")
319 # First upload the file data and save the upload_id.
320 upload_id
= self
.client
._upload
_file
(filename
)
322 # Add the file to the build.
323 return self
.conn
.build_job_add_file(self
.build_id
, upload_id
, type)
325 def upload_buildroot(self
, installed_packages
):
328 for pkg
in installed_packages
:
329 pkgs
.append((pkg
.friendly_name
, pkg
.uuid
))
331 return self
.conn
.build_upload_buildroot(self
.build_id
, pkgs
)
336 return self
.build_job
.get("id", None)
339 def build_arch(self
):
341 return self
.build_job
.get("arch", None)
344 def build_source_url(self
):
346 return self
.build_job
.get("source_url", None)
349 def build_source_filename(self
):
350 if self
.build_source_url
:
351 return os
.path
.basename(self
.build_source_url
)
354 def build_source_hash512(self
):
356 return self
.build_job
.get("source_hash512", None)
359 def build_type(self
):
361 return self
.build_job
.get("type", None)
364 def build_config(self
):
366 return self
.build_job
.get("config", None)
369 # Cannot go on if I got no build job.
370 if not self
.build_job
:
371 logging
.info("No job to work on...")
374 # Call the function that processes the build and try to catch general
375 # exceptions and report them to the server.
376 # If everything goes okay, we tell this the server, too.
378 # Create a temporary file and a directory for the resulting files.
379 tmpdir
= tempfile
.mkdtemp()
380 tmpfile
= os
.path
.join(tmpdir
, self
.build_source_filename
)
381 logfile
= os
.path
.join(tmpdir
, "build.log")
382 cfgfile
= os
.path
.join(tmpdir
, "job-%s.conf" % self
.build_id
)
384 # Get a package grabber and add mirror download capabilities to it.
385 grabber
= pakfire
.downloader
.PackageDownloader(pakfire
.config
.Config())
388 ## Download the source.
389 grabber
.urlgrab(self
.build_source_url
, filename
=tmpfile
)
391 # Check if the download checksum matches (if provided).
392 if self
.build_source_hash512
:
394 f
= open(tmpfile
, "rb")
396 buf
= f
.read(BUFFER_SIZE
)
403 if not self
.build_source_hash512
== h
.hexdigest():
404 raise DownloadError
, "Hash check did not succeed."
406 # Build configuration.
407 config
= pakfire
.config
.Config(files
=["general.conf"])
409 # Parse the configuration received from the build service.
410 config
.parse(self
.build_config
)
412 # Create dict with arguments that are passed to the pakfire
417 # Of course this is a release build.
418 # i.e. don't use local packages.
419 "builder_mode" : "release",
421 # Set the build_id we got from the build service.
422 "build_id" : self
.build_id
,
424 # Files and directories (should be self explaining).
427 # Perform the build for this architecture.
428 "arch" : self
.build_arch
,
431 # Create a new instance of the builder.
432 build
= pakfire
.builder
.BuildEnviron(tmpfile
, **kwargs
)
435 # Create the build environment.
438 # Update the build status on the server.
439 self
.upload_buildroot(build
.installed_packages
)
440 self
.update_state("running")
442 # Run the build (with install test).
443 build
.build(install_test
=True)
445 # Copy the created packages to the tempdir.
446 build
.copy_result(tmpdir
)
449 # Cleanup the build environment.
452 # Jippie, build is finished, we are going to upload the files.
453 self
.update_state("uploading")
455 # Walk through the result directory and upload all (binary) files.
456 # Skip that for test builds.
457 if not self
.build_type
== "test":
458 for dir, subdirs
, files
in os
.walk(tmpdir
):
460 file = os
.path
.join(dir, file)
461 if file in (logfile
, tmpfile
,):
464 self
.upload_file(file, "package")
466 except DependencyError
, e
:
467 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
468 self
.update_state("dependency_error", message
)
471 except DownloadError
, e
:
472 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
473 self
.update_state("download_error", message
)
477 # Upload the logfile in any case and if it exists.
478 if os
.path
.exists(logfile
):
479 self
.upload_file(logfile
, "log")
481 # Cleanup the files we created.
482 pakfire
.util
.rm(tmpdir
)
484 except DependencyError
:
485 # This has already been reported.
488 except (DownloadError
,):
489 # Do not take any further action for these exceptions.
493 # Format the exception and send it to the server.
494 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
496 self
.update_state("failed", message
)
500 self
.update_state("finished")