]> git.ipfire.org Git - pakfire.git/blob - python/pakfire/client/builder.py
5381efe08bf7c82a8e51bbe93aae0e0cb837952b
[pakfire.git] / python / pakfire / client / builder.py
1 #!/usr/bin/python
2
3 import hashlib
4 import multiprocessing
5 import os
6 import sys
7 import tempfile
8 import time
9
10 import pakfire.api
11 import pakfire.builder
12 import pakfire.config
13 import pakfire.downloader
14 import pakfire.system
15 import pakfire.util
16 from pakfire.system import system
17
18 import base
19
20 from pakfire.constants import *
21
22 import logging
23 log = logging.getLogger("pakfire.client")
24
25 def fork_builder(*args, **kwargs):
26 """
27 Wrapper that runs ClientBuilder in a new process and catches
28 any exception to report it to the main process.
29 """
30 try:
31 # Create new instance of the builder.
32 cb = ClientBuilder(*args, **kwargs)
33
34 # Run the build:
35 cb.build()
36
37 except Exception, e:
38 # XXX catch the exception and log it.
39 print e
40
41 # End the process with an exit code.
42 sys.exit(1)
43
44
45 class PakfireDaemon(object):
46 """
47 The PakfireDaemon class that creates a a new process per build
48 job and also handles the keepalive/abort stuff.
49 """
50 def __init__(self, server, hostname, secret):
51 self.client = base.PakfireBuilderClient(server, hostname, secret)
52 self.conn = self.client.conn
53
54 # Save login data (to create child processes).
55 self.server = server
56 self.hostname = hostname
57 self.__secret = secret
58
59 # A list with all running processes.
60 self.processes = []
61 self.pid2jobid = {}
62
63 # Save when last keepalive was sent.
64 self._last_keepalive = 0
65
66 # Send an initial keepalive message.
67 self.send_keepalive()
68
69 def run(self, heartbeat=1, max_processes=None):
70 # By default do not start more than two processes per CPU core.
71 if max_processes is None:
72 max_processes = system.cpu_count * 2
73 log.debug("Maximum number of simultaneous processes is: %s" % max_processes)
74
75 # Indicates when to try to request a new job or aborted builds.
76 last_job_request = 0
77 last_abort_request = 0
78
79 # Main loop.
80 while True:
81 # Send the keepalive regularly.
82 self.send_keepalive()
83
84 # Remove all finished builds.
85 # "removed" indicates, if a process has actually finished.
86 removed = self.remove_finished_builders()
87
88 # If a build slot was freed, search immediately for a new job.
89 if removed:
90 last_job_request = 0
91
92 # Kill aborted jobs.
93 if time.time() - last_abort_request >= 60:
94 aborted = self.kill_aborted_jobs()
95
96 # If a build slot was freed, search immediately for a new job.
97 if aborted:
98 last_job_request = 0
99
100 last_abort_request = time.time()
101
102 # Check if the maximum number of processes was reached.
103 # Actually the hub does manage this but this is an emergency
104 # condition if anything goes wrong.
105 if self.num_processes >= max_processes:
106 log.debug("Reached maximum number of allowed processes (%s)." % max_processes)
107
108 time.sleep(heartbeat)
109 continue
110
111 # Get new job.
112 if time.time() - last_job_request >= 60 and not self.has_overload():
113 # If the last job request is older than a minute and we don't
114 # have too much load, we go and check if there is something
115 # to do for us.
116 job = self.get_job()
117
118 # If we got a job, we start a child process to work on it.
119 if job:
120 log.debug("Got a new job.")
121 self.fork_builder(job)
122 else:
123 log.debug("No new job.")
124
125 # Update the time when we requested a job.
126 last_job_request = time.time()
127
128 # Wait a moment before starting over.
129 time.sleep(heartbeat)
130
131 def shutdown(self):
132 """
133 Shut down the daemon.
134 This means to kill all child processes.
135
136 The method blocks until all processes are shut down.
137 """
138 for process in self.processes:
139 log.info("Sending %s to terminate..." % process)
140
141 process.terminate()
142 else:
143 log.info("No processes to kill. Shutting down immediately.")
144
145 while self.processes:
146 log.debug("%s process(es) is/are still running..." % len(self.processes))
147
148 for process in self.processes[:]:
149 if not process.is_alive():
150 # The process has terminated.
151 log.info("Process %s terminated with exit code: %s" % \
152 (process, process.exitcode))
153
154 self.processes.remove(process)
155
156 @property
157 def num_processes(self):
158 # Return the number of processes.
159 return len(self.processes)
160
161 @property
162 def free_space(self):
163 mp = system.get_mountpoint(BUILD_ROOT)
164
165 return mp.space_left
166
167 def get_job(self):
168 """
169 Get a build job from the hub.
170 """
171 if not self.free_space >= 2 * 1024**3:
172 log.warning(_("Less than 2GB of free space. Cannot request a new job."))
173 return
174
175 log.info("Requesting a new job from the server...")
176
177 # Get some information about this system.
178 s = pakfire.system.System()
179
180 # Fetch a build job from the hub.
181 return self.client.conn.build_get_job(s.supported_arches)
182
183 def has_overload(self):
184 """
185 Checks, if the load average is not too high.
186
187 On this is to be decided if a new job is taken.
188 """
189 try:
190 load1, load5, load15 = os.getloadavg()
191 except OSError:
192 # Could not determine the current loadavg. In that case we
193 # assume that we don't have overload.
194 return False
195
196 # If there are more than 2 processes in the process queue per CPU
197 # core we will assume that the system has heavy load and to not request
198 # a new job.
199 return load5 >= system.cpu_count * 2
200
201 def send_keepalive(self):
202 """
203 When triggered, this method sends a keepalive to the hub.
204 """
205 # Do not send a keepalive more often than twice a minute.
206 if time.time() - self._last_keepalive < 30:
207 return
208
209 free_space = self.free_space / 1024**2
210
211 self.client.send_keepalive(
212 overload=self.has_overload(),
213 free_space=free_space,
214 )
215 self._last_keepalive = time.time()
216
217 def remove_finished_builders(self):
218 # Return if any processes have been removed.
219 ret = False
220
221 # Search for any finished processes.
222 for process in self.processes[:]:
223 # If the process is not alive anymore...
224 if not process.is_alive():
225 ret = True
226
227 # ... check the exit code and log a message on errors.
228 if process.exitcode == 0:
229 log.debug("Process %s exited normally." % process)
230
231 elif process.exitcode > 0:
232 log.error("Process did not exit normally: %s code: %s" \
233 % (process, process.exitcode))
234
235 elif process.exitcode < 0:
236 log.error("Process killed by signal: %s: code: %s" \
237 % (process, process.exitcode))
238
239 # If a program has crashed, we send that to the hub.
240 job_id = self.pid2jobid.get(process.pid, None)
241 if job_id:
242 self.conn.build_job_crashed(job_id, process.exitcode)
243
244 # Finally, remove the process from the process list.
245 self.processes.remove(process)
246
247 return ret
248
249 def kill_aborted_jobs(self):
250 log.debug("Requesting aborted jobs...")
251
252 # Get a list of running job ids:
253 running_jobs = self.pid2jobid.values()
254
255 # If there are no running jobs, there is nothing to do.
256 if not running_jobs:
257 return False
258
259 # Ask the hub for any build jobs to abort.
260 aborted_jobs = self.conn.build_jobs_aborted(running_jobs)
261
262 # If no build jobs were aborted, there is nothing to do.
263 if not aborted_jobs:
264 return False
265
266 for process in self.processes[:]:
267 job_id = self.pid2jobid.get(process.pid, None)
268 if job_id and job_id in aborted_jobs:
269
270 # Kill the process.
271 log.info("Killing process %s which was aborted by the user." \
272 % process.pid)
273 process.terminate()
274
275 # Remove the process from the process list to avoid
276 # that is will be cleaned up in the normal way.
277 self.processes.remove(process)
278
279 return True
280
281 def fork_builder(self, job):
282 """
283 For a new child process to create a new independent builder.
284 """
285 # Create the Process object.
286 process = multiprocessing.Process(target=fork_builder,
287 args=(self.server, self.hostname, self.__secret, job))
288 # The process is running in daemon mode so it will try to kill
289 # all child processes when exiting.
290 process.daemon = True
291
292 # Start the process.
293 process.start()
294 log.info("Started new process %s with PID %s." % (process, process.pid))
295
296 # Save the PID and the build id to track down
297 # crashed builds.
298 self.pid2jobid[process.pid] = job.get("id", None)
299
300 # Append it to the process list.
301 self.processes.append(process)
302
303
304 class ClientBuilder(object):
305 def __init__(self, server, hostname, secret, job):
306 self.client = base.PakfireBuilderClient(server, hostname, secret)
307 self.conn = self.client.conn
308
309 # Store the information sent by the server here.
310 self.build_job = job
311
312 def update_state(self, state, message=None):
313 self.conn.build_job_update_state(self.build_id, state, message)
314
315 def upload_file(self, filename, type):
316 assert os.path.exists(filename)
317 assert type in ("package", "log")
318
319 # First upload the file data and save the upload_id.
320 upload_id = self.client._upload_file(filename)
321
322 # Add the file to the build.
323 return self.conn.build_job_add_file(self.build_id, upload_id, type)
324
325 def upload_buildroot(self, installed_packages):
326 pkgs = []
327
328 for pkg in installed_packages:
329 pkgs.append((pkg.friendly_name, pkg.uuid))
330
331 return self.conn.build_upload_buildroot(self.build_id, pkgs)
332
333 @property
334 def build_id(self):
335 if self.build_job:
336 return self.build_job.get("id", None)
337
338 @property
339 def build_arch(self):
340 if self.build_job:
341 return self.build_job.get("arch", None)
342
343 @property
344 def build_source_url(self):
345 if self.build_job:
346 return self.build_job.get("source_url", None)
347
348 @property
349 def build_source_filename(self):
350 if self.build_source_url:
351 return os.path.basename(self.build_source_url)
352
353 @property
354 def build_source_hash512(self):
355 if self.build_job:
356 return self.build_job.get("source_hash512", None)
357
358 @property
359 def build_type(self):
360 if self.build_job:
361 return self.build_job.get("type", None)
362
363 @property
364 def build_config(self):
365 if self.build_job:
366 return self.build_job.get("config", None)
367
368 def build(self):
369 # Cannot go on if I got no build job.
370 if not self.build_job:
371 logging.info("No job to work on...")
372 return
373
374 # Call the function that processes the build and try to catch general
375 # exceptions and report them to the server.
376 # If everything goes okay, we tell this the server, too.
377 try:
378 # Create a temporary file and a directory for the resulting files.
379 tmpdir = tempfile.mkdtemp()
380 tmpfile = os.path.join(tmpdir, self.build_source_filename)
381 logfile = os.path.join(tmpdir, "build.log")
382 cfgfile = os.path.join(tmpdir, "job-%s.conf" % self.build_id)
383
384 # Get a package grabber and add mirror download capabilities to it.
385 grabber = pakfire.downloader.PackageDownloader(pakfire.config.Config())
386
387 try:
388 ## Download the source.
389 grabber.urlgrab(self.build_source_url, filename=tmpfile)
390
391 # Check if the download checksum matches (if provided).
392 if self.build_source_hash512:
393 h = hashlib.sha512()
394 f = open(tmpfile, "rb")
395 while True:
396 buf = f.read(BUFFER_SIZE)
397 if not buf:
398 break
399
400 h.update(buf)
401 f.close()
402
403 if not self.build_source_hash512 == h.hexdigest():
404 raise DownloadError, "Hash check did not succeed."
405
406 # Build configuration.
407 config = pakfire.config.Config(files=["general.conf"])
408
409 # Parse the configuration received from the build service.
410 config.parse(self.build_config)
411
412 # Create dict with arguments that are passed to the pakfire
413 # builder.
414 kwargs = {
415 "config" : config,
416
417 # Of course this is a release build.
418 # i.e. don't use local packages.
419 "builder_mode" : "release",
420
421 # Set the build_id we got from the build service.
422 "build_id" : self.build_id,
423
424 # Files and directories (should be self explaining).
425 "logfile" : logfile,
426
427 # Perform the build for this architecture.
428 "arch" : self.build_arch,
429 }
430
431 # Create a new instance of the builder.
432 build = pakfire.builder.BuildEnviron(tmpfile, **kwargs)
433
434 try:
435 # Create the build environment.
436 build.start()
437
438 # Update the build status on the server.
439 self.upload_buildroot(build.installed_packages)
440 self.update_state("running")
441
442 # Run the build (with install test).
443 build.build(install_test=True)
444
445 # Copy the created packages to the tempdir.
446 build.copy_result(tmpdir)
447
448 finally:
449 # Cleanup the build environment.
450 build.stop()
451
452 # Jippie, build is finished, we are going to upload the files.
453 self.update_state("uploading")
454
455 # Walk through the result directory and upload all (binary) files.
456 # Skip that for test builds.
457 if not self.build_type == "test":
458 for dir, subdirs, files in os.walk(tmpdir):
459 for file in files:
460 file = os.path.join(dir, file)
461 if file in (logfile, tmpfile,):
462 continue
463
464 self.upload_file(file, "package")
465
466 except DependencyError, e:
467 message = "%s: %s" % (e.__class__.__name__, e)
468 self.update_state("dependency_error", message)
469 raise
470
471 except DownloadError, e:
472 message = "%s: %s" % (e.__class__.__name__, e)
473 self.update_state("download_error", message)
474 raise
475
476 finally:
477 # Upload the logfile in any case and if it exists.
478 if os.path.exists(logfile):
479 self.upload_file(logfile, "log")
480
481 # Cleanup the files we created.
482 pakfire.util.rm(tmpdir)
483
484 except DependencyError:
485 # This has already been reported.
486 raise
487
488 except (DownloadError,):
489 # Do not take any further action for these exceptions.
490 pass
491
492 except Exception, e:
493 # Format the exception and send it to the server.
494 message = "%s: %s" % (e.__class__.__name__, e)
495
496 self.update_state("failed", message)
497 raise
498
499 else:
500 self.update_state("finished")