]> git.ipfire.org Git - people/stevee/pakfire.git/blob - python/pakfire/client/builder.py
9798cc48e3e413d0ac2b929c59089306b5650e5c
[people/stevee/pakfire.git] / python / pakfire / client / builder.py
1 #!/usr/bin/python
2
3 import hashlib
4 import multiprocessing
5 import os
6 import sys
7 import tempfile
8 import time
9
10 import pakfire.base
11 import pakfire.builder
12 import pakfire.config
13 import pakfire.downloader
14 import pakfire.system
15 import pakfire.util
16 from pakfire.system import system
17
18 import base
19
20 from pakfire.constants import *
21 from pakfire.i18n import _
22
23 import logging
24 log = logging.getLogger("pakfire.client")
25
26 def fork_builder(*args, **kwargs):
27 """
28 Wrapper that runs ClientBuilder in a new process and catches
29 any exception to report it to the main process.
30 """
31 try:
32 # Create new instance of the builder.
33 cb = ClientBuilder(*args, **kwargs)
34
35 # Run the build:
36 cb.build()
37
38 except Exception, e:
39 # XXX catch the exception and log it.
40 print e
41
42 # End the process with an exit code.
43 sys.exit(1)
44
45
46 class PakfireDaemon(object):
47 """
48 The PakfireDaemon class that creates a a new process per build
49 job and also handles the keepalive/abort stuff.
50 """
51 def __init__(self):
52 self.config = pakfire.config.ConfigDaemon()
53
54 server = self.config.get("daemon", "server")
55 hostname = self.config.get("daemon", "hostname")
56 secret = self.config.get("daemon", "secret")
57
58 self.client = base.PakfireBuilderClient(server, hostname, secret)
59 self.conn = self.client.conn
60
61 # Save login data (to create child processes).
62 self.server = server
63 self.hostname = hostname
64 self.__secret = secret
65
66 # A list with all running processes.
67 self.processes = []
68 self.pid2jobid = {}
69
70 # Save when last keepalive was sent.
71 self._last_keepalive = 0
72
73 # Send an initial keepalive message.
74 self.send_keepalive(force=True)
75
76 def run(self, heartbeat=1, max_processes=None):
77 # By default do not start more than two processes per CPU core.
78 if max_processes is None:
79 max_processes = system.cpu_count * 2
80 log.debug("Maximum number of simultaneous processes is: %s" % max_processes)
81
82 # Indicates when to try to request a new job or aborted builds.
83 last_job_request = 0
84 last_abort_request = 0
85
86 # Main loop.
87 while True:
88 # Send the keepalive regularly.
89 self.send_keepalive()
90
91 # Remove all finished builds.
92 # "removed" indicates, if a process has actually finished.
93 removed = self.remove_finished_builders()
94
95 # If a build slot was freed, search immediately for a new job.
96 if removed:
97 last_job_request = 0
98
99 # Kill aborted jobs.
100 if time.time() - last_abort_request >= 60:
101 aborted = self.kill_aborted_jobs()
102
103 # If a build slot was freed, search immediately for a new job.
104 if aborted:
105 last_job_request = 0
106
107 last_abort_request = time.time()
108
109 # Check if the maximum number of processes was reached.
110 # Actually the hub does manage this but this is an emergency
111 # condition if anything goes wrong.
112 if self.num_processes >= max_processes:
113 log.debug("Reached maximum number of allowed processes (%s)." % max_processes)
114
115 time.sleep(heartbeat)
116 continue
117
118 # Get new job.
119 if time.time() - last_job_request >= 60 and not self.has_overload():
120 # If the last job request is older than a minute and we don't
121 # have too much load, we go and check if there is something
122 # to do for us.
123 job = self.get_job()
124
125 # If we got a job, we start a child process to work on it.
126 if job:
127 log.debug("Got a new job.")
128 self.fork_builder(job)
129 else:
130 log.debug("No new job.")
131
132 # Update the time when we requested a job.
133 last_job_request = time.time()
134
135 # Wait a moment before starting over.
136 time.sleep(heartbeat)
137
138 def shutdown(self):
139 """
140 Shut down the daemon.
141 This means to kill all child processes.
142
143 The method blocks until all processes are shut down.
144 """
145 for process in self.processes:
146 log.info("Sending %s to terminate..." % process)
147
148 process.terminate()
149 else:
150 log.info("No processes to kill. Shutting down immediately.")
151
152 while self.processes:
153 log.debug("%s process(es) is/are still running..." % len(self.processes))
154
155 for process in self.processes[:]:
156 if not process.is_alive():
157 # The process has terminated.
158 log.info("Process %s terminated with exit code: %s" % \
159 (process, process.exitcode))
160
161 self.processes.remove(process)
162
163 @property
164 def num_processes(self):
165 # Return the number of processes.
166 return len(self.processes)
167
168 @property
169 def free_space(self):
170 mp = system.get_mountpoint(BUILD_ROOT)
171
172 return mp.space_left
173
174 def get_job(self):
175 """
176 Get a build job from the hub.
177 """
178 if not self.free_space >= 2 * 1024**3:
179 log.warning(_("Less than 2GB of free space. Cannot request a new job."))
180 return
181
182 log.info("Requesting a new job from the server...")
183
184 # Get some information about this system.
185 s = pakfire.system.System()
186
187 # Fetch a build job from the hub.
188 return self.client.conn.build_get_job(s.supported_arches)
189
190 def has_overload(self):
191 """
192 Checks, if the load average is not too high.
193
194 On this is to be decided if a new job is taken.
195 """
196 try:
197 load1, load5, load15 = os.getloadavg()
198 except OSError:
199 # Could not determine the current loadavg. In that case we
200 # assume that we don't have overload.
201 return False
202
203 # If there are more than 2 processes in the process queue per CPU
204 # core we will assume that the system has heavy load and to not request
205 # a new job.
206 return load5 >= system.cpu_count * 2
207
208 def send_keepalive(self, force=False):
209 """
210 When triggered, this method sends a keepalive to the hub.
211 """
212 # Do not send a keepalive more often than twice a minute.
213 if time.time() - self._last_keepalive < 30:
214 return
215
216 kwargs = {
217 "force" : force,
218 "overload" : self.has_overload(),
219 "free_space" : self.free_space / 1024**2,
220 }
221
222 self.client.send_keepalive(**kwargs)
223 self._last_keepalive = time.time()
224
225 def remove_finished_builders(self):
226 # Return if any processes have been removed.
227 ret = False
228
229 # Search for any finished processes.
230 for process in self.processes[:]:
231 # If the process is not alive anymore...
232 if not process.is_alive():
233 ret = True
234
235 # ... check the exit code and log a message on errors.
236 if process.exitcode == 0:
237 log.debug("Process %s exited normally." % process)
238
239 elif process.exitcode > 0:
240 log.error("Process did not exit normally: %s code: %s" \
241 % (process, process.exitcode))
242
243 elif process.exitcode < 0:
244 log.error("Process killed by signal: %s: code: %s" \
245 % (process, process.exitcode))
246
247 # If a program has crashed, we send that to the hub.
248 job_id = self.pid2jobid.get(process.pid, None)
249 if job_id:
250 self.conn.build_job_crashed(job_id, process.exitcode)
251
252 # Finally, remove the process from the process list.
253 self.processes.remove(process)
254
255 return ret
256
257 def kill_aborted_jobs(self):
258 log.debug("Requesting aborted jobs...")
259
260 # Get a list of running job ids:
261 running_jobs = self.pid2jobid.values()
262
263 # If there are no running jobs, there is nothing to do.
264 if not running_jobs:
265 return False
266
267 # Ask the hub for any build jobs to abort.
268 aborted_jobs = self.conn.build_jobs_aborted(running_jobs)
269
270 # If no build jobs were aborted, there is nothing to do.
271 if not aborted_jobs:
272 return False
273
274 for process in self.processes[:]:
275 job_id = self.pid2jobid.get(process.pid, None)
276 if job_id and job_id in aborted_jobs:
277
278 # Kill the process.
279 log.info("Killing process %s which was aborted by the user." \
280 % process.pid)
281 process.terminate()
282
283 # Remove the process from the process list to avoid
284 # that is will be cleaned up in the normal way.
285 self.processes.remove(process)
286
287 return True
288
289 def fork_builder(self, job):
290 """
291 For a new child process to create a new independent builder.
292 """
293 # Create the Process object.
294 process = multiprocessing.Process(target=fork_builder,
295 args=(self.server, self.hostname, self.__secret, job))
296 # The process is running in daemon mode so it will try to kill
297 # all child processes when exiting.
298 process.daemon = True
299
300 # Start the process.
301 process.start()
302 log.info("Started new process %s with PID %s." % (process, process.pid))
303
304 # Save the PID and the build id to track down
305 # crashed builds.
306 self.pid2jobid[process.pid] = job.get("id", None)
307
308 # Append it to the process list.
309 self.processes.append(process)
310
311
312 class ClientBuilder(object):
313 def __init__(self, server, hostname, secret, job):
314 self.client = base.PakfireBuilderClient(server, hostname, secret)
315 self.conn = self.client.conn
316
317 # Store the information sent by the server here.
318 self.build_job = job
319
320 def update_state(self, state, message=None):
321 self.conn.build_job_update_state(self.build_id, state, message)
322
323 def upload_file(self, filename, type):
324 assert os.path.exists(filename)
325 assert type in ("package", "log")
326
327 # First upload the file data and save the upload_id.
328 upload_id = self.client._upload_file(filename)
329
330 # Add the file to the build.
331 return self.conn.build_job_add_file(self.build_id, upload_id, type)
332
333 def upload_buildroot(self, installed_packages):
334 pkgs = []
335
336 for pkg in installed_packages:
337 assert pkg.uuid, "%s has got no UUID"
338 pkgs.append((pkg.friendly_name, pkg.uuid))
339
340 return self.conn.build_upload_buildroot(self.build_id, pkgs)
341
342 @property
343 def build_id(self):
344 if self.build_job:
345 return self.build_job.get("id", None)
346
347 @property
348 def build_arch(self):
349 if self.build_job:
350 return self.build_job.get("arch", None)
351
352 @property
353 def build_source_url(self):
354 if self.build_job:
355 return self.build_job.get("source_url", None)
356
357 @property
358 def build_source_filename(self):
359 if self.build_source_url:
360 return os.path.basename(self.build_source_url)
361
362 @property
363 def build_source_hash512(self):
364 if self.build_job:
365 return self.build_job.get("source_hash512", None)
366
367 @property
368 def build_type(self):
369 if self.build_job:
370 return self.build_job.get("type", None)
371
372 @property
373 def build_config(self):
374 if self.build_job:
375 return self.build_job.get("config", None)
376
377 def build(self):
378 # Cannot go on if I got no build job.
379 if not self.build_job:
380 logging.info("No job to work on...")
381 return
382
383 # Call the function that processes the build and try to catch general
384 # exceptions and report them to the server.
385 # If everything goes okay, we tell this the server, too.
386 try:
387 # Create a temporary file and a directory for the resulting files.
388 tmpdir = tempfile.mkdtemp()
389 tmpfile = os.path.join(tmpdir, self.build_source_filename)
390 logfile = os.path.join(tmpdir, "build.log")
391 cfgfile = os.path.join(tmpdir, "job-%s.conf" % self.build_id)
392
393 # Get a package grabber and add mirror download capabilities to it.
394 grabber = pakfire.downloader.PackageDownloader(pakfire.config.Config())
395
396 # Create pakfire configuration instance.
397 config = pakfire.config.ConfigDaemon()
398 config.parse(self.build_config)
399
400 # Create pakfire instance.
401 p = None
402 try:
403 p = pakfire.base.PakfireBuilder(config=config, arch=self.build_arch)
404
405 # Download the source package.
406 grabber = pakfire.downloader.PackageDownloader(p)
407 grabber.urlgrab(self.build_source_url, filename=tmpfile)
408
409 # Check if the download checksum matches (if provided).
410 if self.build_source_hash512:
411 h = hashlib.sha512()
412 f = open(tmpfile, "rb")
413 while True:
414 buf = f.read(BUFFER_SIZE)
415 if not buf:
416 break
417
418 h.update(buf)
419 f.close()
420
421 if not self.build_source_hash512 == h.hexdigest():
422 raise DownloadError, "Hash check did not succeed."
423
424 # Create a new instance of a build environment.
425 build = pakfire.builder.BuildEnviron(p, tmpfile,
426 release_build=True, build_id=self.build_id, logfile=logfile)
427
428 try:
429 # Create the build environment.
430 build.start()
431
432 # Update the build status on the server.
433 self.upload_buildroot(build.installed_packages)
434 self.update_state("running")
435
436 # Run the build (with install test).
437 build.build(install_test=True)
438
439 # Copy the created packages to the tempdir.
440 build.copy_result(tmpdir)
441
442 finally:
443 # Cleanup the build environment.
444 build.stop()
445
446 # Jippie, build is finished, we are going to upload the files.
447 self.update_state("uploading")
448
449 # Walk through the result directory and upload all (binary) files.
450 # Skip that for test builds.
451 if not self.build_type == "test":
452 for dir, subdirs, files in os.walk(tmpdir):
453 for file in files:
454 file = os.path.join(dir, file)
455 if file in (logfile, tmpfile,):
456 continue
457
458 self.upload_file(file, "package")
459
460 except DependencyError, e:
461 message = "%s: %s" % (e.__class__.__name__, e)
462 self.update_state("dependency_error", message)
463 raise
464
465 except DownloadError, e:
466 message = "%s: %s" % (e.__class__.__name__, e)
467 self.update_state("download_error", message)
468 raise
469
470 finally:
471 if p:
472 p.destroy()
473
474 # Upload the logfile in any case and if it exists.
475 if os.path.exists(logfile):
476 self.upload_file(logfile, "log")
477
478 # Cleanup the files we created.
479 pakfire.util.rm(tmpdir)
480
481 except DependencyError:
482 # This has already been reported.
483 raise
484
485 except (DownloadError,):
486 # Do not take any further action for these exceptions.
487 pass
488
489 except Exception, e:
490 # Format the exception and send it to the server.
491 message = "%s: %s" % (e.__class__.__name__, e)
492
493 self.update_state("failed", message)
494 raise
495
496 else:
497 self.update_state("finished")