]>
git.ipfire.org Git - people/stevee/pakfire.git/blob - python/pakfire/client/builder.py
11 import pakfire
.builder
13 import pakfire
.downloader
16 from pakfire
.system
import system
20 from pakfire
.constants
import *
21 from pakfire
.i18n
import _
24 log
= logging
.getLogger("pakfire.client")
26 def fork_builder(*args
, **kwargs
):
28 Wrapper that runs ClientBuilder in a new process and catches
29 any exception to report it to the main process.
32 # Create new instance of the builder.
33 cb
= ClientBuilder(*args
, **kwargs
)
39 # XXX catch the exception and log it.
42 # End the process with an exit code.
46 class PakfireDaemon(object):
48 The PakfireDaemon class that creates a a new process per build
49 job and also handles the keepalive/abort stuff.
52 self
.config
= pakfire
.config
.ConfigDaemon()
54 server
= self
.config
.get("daemon", "server")
55 hostname
= self
.config
.get("daemon", "hostname")
56 secret
= self
.config
.get("daemon", "secret")
58 self
.client
= base
.PakfireBuilderClient(server
, hostname
, secret
)
59 self
.conn
= self
.client
.conn
61 # Save login data (to create child processes).
63 self
.hostname
= hostname
64 self
.__secret
= secret
66 # A list with all running processes.
70 # Save when last keepalive was sent.
71 self
._last
_keepalive
= 0
73 # Send an initial keepalive message.
74 self
.send_keepalive(force
=True)
76 def run(self
, heartbeat
=1, max_processes
=None):
77 # By default do not start more than two processes per CPU core.
78 if max_processes
is None:
79 max_processes
= system
.cpu_count
* 2
80 log
.debug("Maximum number of simultaneous processes is: %s" % max_processes
)
82 # Indicates when to try to request a new job or aborted builds.
84 last_abort_request
= 0
88 # Send the keepalive regularly.
91 # Remove all finished builds.
92 # "removed" indicates, if a process has actually finished.
93 removed
= self
.remove_finished_builders()
95 # If a build slot was freed, search immediately for a new job.
100 if time
.time() - last_abort_request
>= 60:
101 aborted
= self
.kill_aborted_jobs()
103 # If a build slot was freed, search immediately for a new job.
107 last_abort_request
= time
.time()
109 # Check if the maximum number of processes was reached.
110 # Actually the hub does manage this but this is an emergency
111 # condition if anything goes wrong.
112 if self
.num_processes
>= max_processes
:
113 log
.debug("Reached maximum number of allowed processes (%s)." % max_processes
)
115 time
.sleep(heartbeat
)
119 if time
.time() - last_job_request
>= 60 and not self
.has_overload():
120 # If the last job request is older than a minute and we don't
121 # have too much load, we go and check if there is something
125 # If we got a job, we start a child process to work on it.
127 log
.debug("Got a new job.")
128 self
.fork_builder(job
)
130 log
.debug("No new job.")
132 # Update the time when we requested a job.
133 last_job_request
= time
.time()
135 # Wait a moment before starting over.
136 time
.sleep(heartbeat
)
140 Shut down the daemon.
141 This means to kill all child processes.
143 The method blocks until all processes are shut down.
145 for process
in self
.processes
:
146 log
.info("Sending %s to terminate..." % process
)
150 log
.info("No processes to kill. Shutting down immediately.")
152 while self
.processes
:
153 log
.debug("%s process(es) is/are still running..." % len(self
.processes
))
155 for process
in self
.processes
[:]:
156 if not process
.is_alive():
157 # The process has terminated.
158 log
.info("Process %s terminated with exit code: %s" % \
159 (process
, process
.exitcode
))
161 self
.processes
.remove(process
)
164 def num_processes(self
):
165 # Return the number of processes.
166 return len(self
.processes
)
169 def free_space(self
):
170 mp
= system
.get_mountpoint(BUILD_ROOT
)
176 Get a build job from the hub.
178 if not self
.free_space
>= 2 * 1024**3:
179 log
.warning(_("Less than 2GB of free space. Cannot request a new job."))
182 log
.info("Requesting a new job from the server...")
184 # Get some information about this system.
185 s
= pakfire
.system
.System()
187 # Fetch a build job from the hub.
188 return self
.client
.conn
.build_get_job(s
.supported_arches
)
190 def has_overload(self
):
192 Checks, if the load average is not too high.
194 On this is to be decided if a new job is taken.
197 load1
, load5
, load15
= os
.getloadavg()
199 # Could not determine the current loadavg. In that case we
200 # assume that we don't have overload.
203 # If there are more than 2 processes in the process queue per CPU
204 # core we will assume that the system has heavy load and to not request
206 return load5
>= system
.cpu_count
* 2
208 def send_keepalive(self
, force
=False):
210 When triggered, this method sends a keepalive to the hub.
212 # Do not send a keepalive more often than twice a minute.
213 if time
.time() - self
._last
_keepalive
< 30:
218 "overload" : self
.has_overload(),
219 "free_space" : self
.free_space
/ 1024**2,
222 self
.client
.send_keepalive(**kwargs
)
223 self
._last
_keepalive
= time
.time()
225 def remove_finished_builders(self
):
226 # Return if any processes have been removed.
229 # Search for any finished processes.
230 for process
in self
.processes
[:]:
231 # If the process is not alive anymore...
232 if not process
.is_alive():
235 # ... check the exit code and log a message on errors.
236 if process
.exitcode
== 0:
237 log
.debug("Process %s exited normally." % process
)
239 elif process
.exitcode
> 0:
240 log
.error("Process did not exit normally: %s code: %s" \
241 % (process
, process
.exitcode
))
243 elif process
.exitcode
< 0:
244 log
.error("Process killed by signal: %s: code: %s" \
245 % (process
, process
.exitcode
))
247 # If a program has crashed, we send that to the hub.
248 job_id
= self
.pid2jobid
.get(process
.pid
, None)
250 self
.conn
.build_job_crashed(job_id
, process
.exitcode
)
252 # Finally, remove the process from the process list.
253 self
.processes
.remove(process
)
257 def kill_aborted_jobs(self
):
258 log
.debug("Requesting aborted jobs...")
260 # Get a list of running job ids:
261 running_jobs
= self
.pid2jobid
.values()
263 # If there are no running jobs, there is nothing to do.
267 # Ask the hub for any build jobs to abort.
268 aborted_jobs
= self
.conn
.build_jobs_aborted(running_jobs
)
270 # If no build jobs were aborted, there is nothing to do.
274 for process
in self
.processes
[:]:
275 job_id
= self
.pid2jobid
.get(process
.pid
, None)
276 if job_id
and job_id
in aborted_jobs
:
279 log
.info("Killing process %s which was aborted by the user." \
283 # Remove the process from the process list to avoid
284 # that is will be cleaned up in the normal way.
285 self
.processes
.remove(process
)
289 def fork_builder(self
, job
):
291 For a new child process to create a new independent builder.
293 # Create the Process object.
294 process
= multiprocessing
.Process(target
=fork_builder
,
295 args
=(self
.server
, self
.hostname
, self
.__secret
, job
))
296 # The process is running in daemon mode so it will try to kill
297 # all child processes when exiting.
298 process
.daemon
= True
302 log
.info("Started new process %s with PID %s." % (process
, process
.pid
))
304 # Save the PID and the build id to track down
306 self
.pid2jobid
[process
.pid
] = job
.get("id", None)
308 # Append it to the process list.
309 self
.processes
.append(process
)
312 class ClientBuilder(object):
313 def __init__(self
, server
, hostname
, secret
, job
):
314 self
.client
= base
.PakfireBuilderClient(server
, hostname
, secret
)
315 self
.conn
= self
.client
.conn
317 # Store the information sent by the server here.
320 def update_state(self
, state
, message
=None):
321 self
.conn
.build_job_update_state(self
.build_id
, state
, message
)
323 def upload_file(self
, filename
, type):
324 assert os
.path
.exists(filename
)
325 assert type in ("package", "log")
327 # First upload the file data and save the upload_id.
328 upload_id
= self
.client
._upload
_file
(filename
)
330 # Add the file to the build.
331 return self
.conn
.build_job_add_file(self
.build_id
, upload_id
, type)
333 def upload_buildroot(self
, installed_packages
):
336 for pkg
in installed_packages
:
337 assert pkg
.uuid
, "%s has got no UUID"
338 pkgs
.append((pkg
.friendly_name
, pkg
.uuid
))
340 return self
.conn
.build_upload_buildroot(self
.build_id
, pkgs
)
345 return self
.build_job
.get("id", None)
348 def build_arch(self
):
350 return self
.build_job
.get("arch", None)
353 def build_source_url(self
):
355 return self
.build_job
.get("source_url", None)
358 def build_source_filename(self
):
359 if self
.build_source_url
:
360 return os
.path
.basename(self
.build_source_url
)
363 def build_source_hash512(self
):
365 return self
.build_job
.get("source_hash512", None)
368 def build_type(self
):
370 return self
.build_job
.get("type", None)
373 def build_config(self
):
375 return self
.build_job
.get("config", None)
378 # Cannot go on if I got no build job.
379 if not self
.build_job
:
380 logging
.info("No job to work on...")
383 # Call the function that processes the build and try to catch general
384 # exceptions and report them to the server.
385 # If everything goes okay, we tell this the server, too.
387 # Create a temporary file and a directory for the resulting files.
388 tmpdir
= tempfile
.mkdtemp()
389 tmpfile
= os
.path
.join(tmpdir
, self
.build_source_filename
)
390 logfile
= os
.path
.join(tmpdir
, "build.log")
391 cfgfile
= os
.path
.join(tmpdir
, "job-%s.conf" % self
.build_id
)
393 # Get a package grabber and add mirror download capabilities to it.
394 grabber
= pakfire
.downloader
.PackageDownloader(pakfire
.config
.Config())
396 # Create pakfire configuration instance.
397 config
= pakfire
.config
.ConfigDaemon()
398 config
.parse(self
.build_config
)
400 # Create pakfire instance.
403 p
= pakfire
.base
.PakfireBuilder(config
=config
, arch
=self
.build_arch
)
405 # Download the source package.
406 grabber
= pakfire
.downloader
.PackageDownloader(p
)
407 grabber
.urlgrab(self
.build_source_url
, filename
=tmpfile
)
409 # Check if the download checksum matches (if provided).
410 if self
.build_source_hash512
:
411 h
= hashlib
.new("sha512")
412 f
= open(tmpfile
, "rb")
414 buf
= f
.read(BUFFER_SIZE
)
421 if not self
.build_source_hash512
== h
.hexdigest():
422 raise DownloadError
, "Hash check did not succeed."
424 # Create a new instance of a build environment.
425 build
= pakfire
.builder
.BuildEnviron(p
, tmpfile
,
426 release_build
=True, build_id
=self
.build_id
, logfile
=logfile
)
429 # Create the build environment.
432 # Update the build status on the server.
433 self
.upload_buildroot(build
.installed_packages
)
434 self
.update_state("running")
436 # Run the build (with install test).
437 build
.build(install_test
=True)
439 # Copy the created packages to the tempdir.
440 build
.copy_result(tmpdir
)
443 # Cleanup the build environment.
446 # Jippie, build is finished, we are going to upload the files.
447 self
.update_state("uploading")
449 # Walk through the result directory and upload all (binary) files.
450 # Skip that for test builds.
451 if not self
.build_type
== "test":
452 for dir, subdirs
, files
in os
.walk(tmpdir
):
454 file = os
.path
.join(dir, file)
455 if file in (logfile
, tmpfile
,):
458 self
.upload_file(file, "package")
460 except DependencyError
, e
:
461 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
462 self
.update_state("dependency_error", message
)
465 except DownloadError
, e
:
466 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
467 self
.update_state("download_error", message
)
474 # Upload the logfile in any case and if it exists.
475 if os
.path
.exists(logfile
):
476 self
.upload_file(logfile
, "log")
478 # Cleanup the files we created.
479 pakfire
.util
.rm(tmpdir
)
481 except DependencyError
:
482 # This has already been reported.
485 except (DownloadError
,):
486 # Do not take any further action for these exceptions.
490 # Format the exception and send it to the server.
491 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
493 self
.update_state("failed", message
)
497 self
.update_state("finished")