]>
git.ipfire.org Git - pakfire.git/blob - python/pakfire/client/builder.py
11 import pakfire
.builder
13 import pakfire
.downloader
16 from pakfire
.system
import system
20 from pakfire
.constants
import *
23 log
= logging
.getLogger("pakfire.client")
25 def fork_builder(*args
, **kwargs
):
27 Wrapper that runs ClientBuilder in a new process and catches
28 any exception to report it to the main process.
31 # Create new instance of the builder.
32 cb
= ClientBuilder(*args
, **kwargs
)
38 # XXX catch the exception and log it.
41 # End the process with an exit code.
45 class PakfireDaemon(object):
47 The PakfireDaemon class that creates a a new process per build
48 job and also handles the keepalive/abort stuff.
50 def __init__(self
, server
, hostname
, secret
):
51 self
.client
= base
.PakfireBuilderClient(server
, hostname
, secret
)
52 self
.conn
= self
.client
.conn
54 # Save login data (to create child processes).
56 self
.hostname
= hostname
57 self
.__secret
= secret
59 # A list with all running processes.
63 # Save when last keepalive was sent.
64 self
._last
_keepalive
= 0
66 def run(self
, heartbeat
=1, max_processes
=None):
67 # By default do not start more than two processes per CPU core.
68 if max_processes
is None:
69 max_processes
= system
.cpu_count
* 2
70 log
.debug("Maximum number of simultaneous processes is: %s" % max_processes
)
72 # Indicates when to try to request a new job or aborted builds.
74 last_abort_request
= 0
78 # Send the keepalive regularly.
81 # Remove all finished builds.
82 # "removed" indicates, if a process has actually finished.
83 removed
= self
.remove_finished_builders()
85 # If a build slot was freed, search immediately for a new job.
90 if time
.time() - last_abort_request
>= 60:
91 aborted
= self
.kill_aborted_jobs()
93 # If a build slot was freed, search immediately for a new job.
97 last_abort_request
= time
.time()
99 # Check if the maximum number of processes was reached.
100 # Actually the hub does manage this but this is an emergency
101 # condition if anything goes wrong.
102 if self
.num_processes
>= max_processes
:
103 log
.debug("Reached maximum number of allowed processes (%s)." % max_processes
)
105 time
.sleep(heartbeat
)
109 if time
.time() - last_job_request
>= 60 and not self
.has_overload():
110 # If the last job request is older than a minute and we don't
111 # have too much load, we go and check if there is something
115 # If we got a job, we start a child process to work on it.
117 log
.debug("Got a new job.")
118 self
.fork_builder(job
)
120 log
.debug("No new job.")
122 # Update the time when we requested a job.
123 last_job_request
= time
.time()
125 # Wait a moment before starting over.
126 time
.sleep(heartbeat
)
130 Shut down the daemon.
131 This means to kill all child processes.
133 The method blocks until all processes are shut down.
135 for process
in self
.processes
:
136 log
.info("Sending %s to terminate..." % process
)
140 log
.info("No processes to kill. Shutting down immediately.")
142 while self
.processes
:
143 log
.debug("%s process(es) is/are still running..." % len(self
.processes
))
145 for process
in self
.processes
[:]:
146 if not process
.is_alive():
147 # The process has terminated.
148 log
.info("Process %s terminated with exit code: %s" % \
149 (process
, process
.exitcode
))
151 self
.processes
.remove(process
)
154 def num_processes(self
):
155 # Return the number of processes.
156 return len(self
.processes
)
160 Get a build job from the hub.
162 log
.info("Requesting a new job from the server...")
164 # Get some information about this system.
165 s
= pakfire
.system
.System()
167 # Fetch a build job from the hub.
168 return self
.client
.conn
.build_get_job(s
.supported_arches
)
170 def has_overload(self
):
172 Checks, if the load average is not too high.
174 On this is to be decided if a new job is taken.
177 load1
, load5
, load15
= os
.getloadavg()
179 # Could not determine the current loadavg. In that case we
180 # assume that we don't have overload.
183 # If there are more than 2 processes in the process queue per CPU
184 # core we will assume that the system has heavy load and to not request
186 return load5
>= system
.cpu_count
* 2
188 def send_keepalive(self
):
190 When triggered, this method sends a keepalive to the hub.
192 # Do not send a keepalive more often than twice a minute.
193 if time
.time() - self
._last
_keepalive
< 30:
196 self
.client
.send_keepalive(overload
=self
.has_overload())
197 self
._last
_keepalive
= time
.time()
199 def remove_finished_builders(self
):
200 # Return if any processes have been removed.
203 # Search for any finished processes.
204 for process
in self
.processes
[:]:
205 # If the process is not alive anymore...
206 if not process
.is_alive():
209 # ... check the exit code and log a message on errors.
210 if process
.exitcode
== 0:
211 log
.debug("Process %s exited normally." % process
)
213 elif process
.exitcode
> 0:
214 log
.error("Process did not exit normally: %s code: %s" \
215 % (process
, process
.exitcode
))
217 elif process
.exitcode
< 0:
218 log
.error("Process killed by signal: %s: code: %s" \
219 % (process
, process
.exitcode
))
221 # If a program has crashed, we send that to the hub.
222 job_id
= self
.pid2jobid
.get(process
.pid
, None)
224 self
.conn
.build_job_crashed(job_id
, process
.exitcode
)
226 # Finally, remove the process from the process list.
227 self
.processes
.remove(process
)
231 def kill_aborted_jobs(self
):
232 log
.debug("Requesting aborted jobs...")
234 # Get a list of running job ids:
235 running_jobs
= self
.pid2jobid
.values()
237 # If there are no running jobs, there is nothing to do.
241 # Ask the hub for any build jobs to abort.
242 aborted_jobs
= self
.conn
.build_jobs_aborted(running_jobs
)
244 # If no build jobs were aborted, there is nothing to do.
248 for process
in self
.processes
[:]:
249 job_id
= self
.pid2jobid
.get(process
.pid
, None)
250 if job_id
and job_id
in aborted_jobs
:
253 log
.info("Killing process %s which was aborted by the user." \
257 # Remove the process from the process list to avoid
258 # that is will be cleaned up in the normal way.
259 self
.processes
.remove(process
)
263 def fork_builder(self
, job
):
265 For a new child process to create a new independent builder.
267 # Create the Process object.
268 process
= multiprocessing
.Process(target
=fork_builder
,
269 args
=(self
.server
, self
.hostname
, self
.__secret
, job
))
270 # The process is running in daemon mode so it will try to kill
271 # all child processes when exiting.
272 process
.daemon
= True
276 log
.info("Started new process %s with PID %s." % (process
, process
.pid
))
278 # Save the PID and the build id to track down
280 self
.pid2jobid
[process
.pid
] = job
.get("id", None)
282 # Append it to the process list.
283 self
.processes
.append(process
)
286 class ClientBuilder(object):
287 def __init__(self
, server
, hostname
, secret
, job
):
288 self
.client
= base
.PakfireBuilderClient(server
, hostname
, secret
)
289 self
.conn
= self
.client
.conn
291 # Store the information sent by the server here.
294 def update_state(self
, state
, message
=None):
295 self
.conn
.build_job_update_state(self
.build_id
, state
, message
)
297 def upload_file(self
, filename
, type):
298 assert os
.path
.exists(filename
)
299 assert type in ("package", "log")
301 # First upload the file data and save the upload_id.
302 upload_id
= self
.client
._upload
_file
(filename
)
304 # Add the file to the build.
305 return self
.conn
.build_job_add_file(self
.build_id
, upload_id
, type)
307 def upload_buildroot(self
, installed_packages
):
310 for pkg
in installed_packages
:
311 pkgs
.append((pkg
.friendly_name
, pkg
.uuid
))
313 return self
.conn
.build_upload_buildroot(self
.build_id
, pkgs
)
318 return self
.build_job
.get("id", None)
321 def build_arch(self
):
323 return self
.build_job
.get("arch", None)
326 def build_source_url(self
):
328 return self
.build_job
.get("source_url", None)
331 def build_source_filename(self
):
332 if self
.build_source_url
:
333 return os
.path
.basename(self
.build_source_url
)
336 def build_source_hash512(self
):
338 return self
.build_job
.get("source_hash512", None)
341 def build_type(self
):
343 return self
.build_job
.get("type", None)
346 # Cannot go on if I got no build job.
347 if not self
.build_job
:
348 logging
.info("No job to work on...")
351 # Call the function that processes the build and try to catch general
352 # exceptions and report them to the server.
353 # If everything goes okay, we tell this the server, too.
355 # Create a temporary file and a directory for the resulting files.
356 tmpdir
= tempfile
.mkdtemp()
357 tmpfile
= os
.path
.join(tmpdir
, self
.build_source_filename
)
358 logfile
= os
.path
.join(tmpdir
, "build.log")
360 # Get a package grabber and add mirror download capabilities to it.
361 grabber
= pakfire
.downloader
.PackageDownloader(pakfire
.config
.Config())
364 ## Download the source.
365 grabber
.urlgrab(self
.build_source_url
, filename
=tmpfile
)
367 # Check if the download checksum matches (if provided).
368 if self
.build_source_hash512
:
370 f
= open(tmpfile
, "rb")
372 buf
= f
.read(BUFFER_SIZE
)
379 if not self
.build_source_hash512
== h
.hexdigest():
380 raise DownloadError
, "Hash check did not succeed."
382 # Create dist with arguments that are passed to the pakfire
385 # Of course this is a release build.
386 # i.e. don't use local packages.
387 "builder_mode" : "release",
389 # Set the build_id we got from the build service.
390 "build_id" : self
.build_id
,
392 # Files and directories (should be self explaining).
395 # Distro configuration.
397 "arch" : self
.build_arch
,
401 # Create a new instance of the builder.
402 build
= pakfire
.builder
.BuildEnviron(tmpfile
, **kwargs
)
405 # Create the build environment.
408 # Update the build status on the server.
409 self
.upload_buildroot(build
.installed_packages
)
410 self
.update_state("running")
412 # Run the build (with install test).
413 build
.build(install_test
=True)
415 # Copy the created packages to the tempdir.
416 build
.copy_result(tmpdir
)
419 # Cleanup the build environment.
422 # Jippie, build is finished, we are going to upload the files.
423 self
.update_state("uploading")
425 # Walk through the result directory and upload all (binary) files.
426 # Skip that for test builds.
427 if not self
.build_type
== "test":
428 for dir, subdirs
, files
in os
.walk(tmpdir
):
430 file = os
.path
.join(dir, file)
431 if file in (logfile
, tmpfile
,):
434 self
.upload_file(file, "package")
436 except DependencyError
, e
:
437 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
438 self
.update_state("dependency_error", message
)
441 except DownloadError
, e
:
442 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
443 self
.update_state("download_error", message
)
447 # Upload the logfile in any case and if it exists.
448 if os
.path
.exists(logfile
):
449 self
.upload_file(logfile
, "log")
451 # Cleanup the files we created.
452 pakfire
.util
.rm(tmpdir
)
454 except DependencyError
:
455 # This has already been reported.
458 except (DownloadError
,):
459 # Do not take any further action for these exceptions.
463 # Format the exception and send it to the server.
464 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
466 self
.update_state("failed", message
)
470 self
.update_state("finished")