]> git.ipfire.org Git - pakfire.git/blob - python/pakfire/client/builder.py
Huge change: Introduce pakfire-client and -daemon.
[pakfire.git] / python / pakfire / client / builder.py
1 #!/usr/bin/python
2
3 import hashlib
4 import multiprocessing
5 import os
6 import sys
7 import tempfile
8 import time
9
10 import pakfire.api
11 import pakfire.builder
12 import pakfire.config
13 import pakfire.downloader
14 import pakfire.system
15 import pakfire.util
16 from pakfire.system import system
17
18 import base
19
20 from pakfire.constants import *
21
22 import logging
23 log = logging.getLogger("pakfire.client")
24
25 def fork_builder(*args, **kwargs):
26 """
27 Wrapper that runs ClientBuilder in a new process and catches
28 any exception to report it to the main process.
29 """
30 try:
31 # Create new instance of the builder.
32 cb = ClientBuilder(*args, **kwargs)
33
34 # Run the build:
35 cb.build()
36
37 except Exception, e:
38 # XXX catch the exception and log it.
39 print e
40
41 # End the process with an exit code.
42 sys.exit(1)
43
44
45 class PakfireDaemon(object):
46 """
47 The PakfireDaemon class that creates a a new process per build
48 job and also handles the keepalive/abort stuff.
49 """
50 def __init__(self, server, hostname, secret):
51 self.client = base.PakfireBuilderClient(server, hostname, secret)
52 self.conn = self.client.conn
53
54 # Save login data (to create child processes).
55 self.server = server
56 self.hostname = hostname
57 self.__secret = secret
58
59 # A list with all running processes.
60 self.processes = []
61 self.pid2jobid = {}
62
63 # Save when last keepalive was sent.
64 self._last_keepalive = 0
65
66 def run(self, heartbeat=1, max_processes=None):
67 # By default do not start more than two processes per CPU core.
68 if max_processes is None:
69 max_processes = system.cpu_count * 2
70 log.debug("Maximum number of simultaneous processes is: %s" % max_processes)
71
72 # Indicates when to try to request a new job or aborted builds.
73 last_job_request = 0
74 last_abort_request = 0
75
76 # Main loop.
77 while True:
78 # Send the keepalive regularly.
79 self.send_keepalive()
80
81 # Remove all finished builds.
82 # "removed" indicates, if a process has actually finished.
83 removed = self.remove_finished_builders()
84
85 # If a build slot was freed, search immediately for a new job.
86 if removed:
87 last_job_request = 0
88
89 # Kill aborted jobs.
90 if time.time() - last_abort_request >= 60:
91 aborted = self.kill_aborted_jobs()
92
93 # If a build slot was freed, search immediately for a new job.
94 if aborted:
95 last_job_request = 0
96
97 last_abort_request = time.time()
98
99 # Check if the maximum number of processes was reached.
100 # Actually the hub does manage this but this is an emergency
101 # condition if anything goes wrong.
102 if self.num_processes >= max_processes:
103 log.debug("Reached maximum number of allowed processes (%s)." % max_processes)
104
105 time.sleep(heartbeat)
106 continue
107
108 # Get new job.
109 if time.time() - last_job_request >= 60 and not self.has_overload():
110 # If the last job request is older than a minute and we don't
111 # have too much load, we go and check if there is something
112 # to do for us.
113 job = self.get_job()
114
115 # If we got a job, we start a child process to work on it.
116 if job:
117 log.debug("Got a new job.")
118 self.fork_builder(job)
119 else:
120 log.debug("No new job.")
121
122 # Update the time when we requested a job.
123 last_job_request = time.time()
124
125 # Wait a moment before starting over.
126 time.sleep(heartbeat)
127
128 def shutdown(self):
129 """
130 Shut down the daemon.
131 This means to kill all child processes.
132
133 The method blocks until all processes are shut down.
134 """
135 for process in self.processes:
136 log.info("Sending %s to terminate..." % process)
137
138 process.terminate()
139 else:
140 log.info("No processes to kill. Shutting down immediately.")
141
142 while self.processes:
143 log.debug("%s process(es) is/are still running..." % len(self.processes))
144
145 for process in self.processes[:]:
146 if not process.is_alive():
147 # The process has terminated.
148 log.info("Process %s terminated with exit code: %s" % \
149 (process, process.exitcode))
150
151 self.processes.remove(process)
152
153 @property
154 def num_processes(self):
155 # Return the number of processes.
156 return len(self.processes)
157
158 def get_job(self):
159 """
160 Get a build job from the hub.
161 """
162 log.info("Requesting a new job from the server...")
163
164 # Get some information about this system.
165 s = pakfire.system.System()
166
167 # Fetch a build job from the hub.
168 return self.client.conn.build_get_job(s.supported_arches)
169
170 def has_overload(self):
171 """
172 Checks, if the load average is not too high.
173
174 On this is to be decided if a new job is taken.
175 """
176 try:
177 load1, load5, load15 = os.getloadavg()
178 except OSError:
179 # Could not determine the current loadavg. In that case we
180 # assume that we don't have overload.
181 return False
182
183 # If there are more than 2 processes in the process queue per CPU
184 # core we will assume that the system has heavy load and to not request
185 # a new job.
186 return load5 >= system.cpu_count * 2
187
188 def send_keepalive(self):
189 """
190 When triggered, this method sends a keepalive to the hub.
191 """
192 # Do not send a keepalive more often than twice a minute.
193 if time.time() - self._last_keepalive < 30:
194 return
195
196 self.client.send_keepalive(overload=self.has_overload())
197 self._last_keepalive = time.time()
198
199 def remove_finished_builders(self):
200 # Return if any processes have been removed.
201 ret = False
202
203 # Search for any finished processes.
204 for process in self.processes[:]:
205 # If the process is not alive anymore...
206 if not process.is_alive():
207 ret = True
208
209 # ... check the exit code and log a message on errors.
210 if process.exitcode == 0:
211 log.debug("Process %s exited normally." % process)
212
213 elif process.exitcode > 0:
214 log.error("Process did not exit normally: %s code: %s" \
215 % (process, process.exitcode))
216
217 elif process.exitcode < 0:
218 log.error("Process killed by signal: %s: code: %s" \
219 % (process, process.exitcode))
220
221 # If a program has crashed, we send that to the hub.
222 job_id = self.pid2jobid.get(process.pid, None)
223 if job_id:
224 self.conn.build_job_crashed(job_id, process.exitcode)
225
226 # Finally, remove the process from the process list.
227 self.processes.remove(process)
228
229 return ret
230
231 def kill_aborted_jobs(self):
232 log.debug("Requesting aborted jobs...")
233
234 # Get a list of running job ids:
235 running_jobs = self.pid2jobid.values()
236
237 # If there are no running jobs, there is nothing to do.
238 if not running_jobs:
239 return False
240
241 # Ask the hub for any build jobs to abort.
242 aborted_jobs = self.conn.build_jobs_aborted(running_jobs)
243
244 # If no build jobs were aborted, there is nothing to do.
245 if not aborted_jobs:
246 return False
247
248 for process in self.processes[:]:
249 job_id = self.pid2jobid.get(process.pid, None)
250 if job_id and job_id in aborted_jobs:
251
252 # Kill the process.
253 log.info("Killing process %s which was aborted by the user." \
254 % process.pid)
255 process.terminate()
256
257 # Remove the process from the process list to avoid
258 # that is will be cleaned up in the normal way.
259 self.processes.remove(process)
260
261 return True
262
263 def fork_builder(self, job):
264 """
265 For a new child process to create a new independent builder.
266 """
267 # Create the Process object.
268 process = multiprocessing.Process(target=fork_builder,
269 args=(self.server, self.hostname, self.__secret, job))
270 # The process is running in daemon mode so it will try to kill
271 # all child processes when exiting.
272 process.daemon = True
273
274 # Start the process.
275 process.start()
276 log.info("Started new process %s with PID %s." % (process, process.pid))
277
278 # Save the PID and the build id to track down
279 # crashed builds.
280 self.pid2jobid[process.pid] = job.get("id", None)
281
282 # Append it to the process list.
283 self.processes.append(process)
284
285
286 class ClientBuilder(object):
287 def __init__(self, server, hostname, secret, job):
288 self.client = base.PakfireBuilderClient(server, hostname, secret)
289 self.conn = self.client.conn
290
291 # Store the information sent by the server here.
292 self.build_job = job
293
294 def update_state(self, state, message=None):
295 self.conn.build_job_update_state(self.build_id, state, message)
296
297 def upload_file(self, filename, type):
298 assert os.path.exists(filename)
299 assert type in ("package", "log")
300
301 # First upload the file data and save the upload_id.
302 upload_id = self.client._upload_file(filename)
303
304 # Add the file to the build.
305 return self.conn.build_job_add_file(self.build_id, upload_id, type)
306
307 def upload_buildroot(self, installed_packages):
308 pkgs = []
309
310 for pkg in installed_packages:
311 pkgs.append((pkg.friendly_name, pkg.uuid))
312
313 return self.conn.build_upload_buildroot(self.build_id, pkgs)
314
315 @property
316 def build_id(self):
317 if self.build_job:
318 return self.build_job.get("id", None)
319
320 @property
321 def build_arch(self):
322 if self.build_job:
323 return self.build_job.get("arch", None)
324
325 @property
326 def build_source_url(self):
327 if self.build_job:
328 return self.build_job.get("source_url", None)
329
330 @property
331 def build_source_filename(self):
332 if self.build_source_url:
333 return os.path.basename(self.build_source_url)
334
335 @property
336 def build_source_hash512(self):
337 if self.build_job:
338 return self.build_job.get("source_hash512", None)
339
340 @property
341 def build_type(self):
342 if self.build_job:
343 return self.build_job.get("type", None)
344
345 def build(self):
346 # Cannot go on if I got no build job.
347 if not self.build_job:
348 logging.info("No job to work on...")
349 return
350
351 # Call the function that processes the build and try to catch general
352 # exceptions and report them to the server.
353 # If everything goes okay, we tell this the server, too.
354 try:
355 # Create a temporary file and a directory for the resulting files.
356 tmpdir = tempfile.mkdtemp()
357 tmpfile = os.path.join(tmpdir, self.build_source_filename)
358 logfile = os.path.join(tmpdir, "build.log")
359
360 # Get a package grabber and add mirror download capabilities to it.
361 grabber = pakfire.downloader.PackageDownloader(pakfire.config.Config())
362
363 try:
364 ## Download the source.
365 grabber.urlgrab(self.build_source_url, filename=tmpfile)
366
367 # Check if the download checksum matches (if provided).
368 if self.build_source_hash512:
369 h = hashlib.sha512()
370 f = open(tmpfile, "rb")
371 while True:
372 buf = f.read(BUFFER_SIZE)
373 if not buf:
374 break
375
376 h.update(buf)
377 f.close()
378
379 if not self.build_source_hash512 == h.hexdigest():
380 raise DownloadError, "Hash check did not succeed."
381
382 # Create dist with arguments that are passed to the pakfire
383 # builder.
384 kwargs = {
385 # Of course this is a release build.
386 # i.e. don't use local packages.
387 "builder_mode" : "release",
388
389 # Set the build_id we got from the build service.
390 "build_id" : self.build_id,
391
392 # Files and directories (should be self explaining).
393 "logfile" : logfile,
394
395 # Distro configuration.
396 "distro_config" : {
397 "arch" : self.build_arch,
398 },
399 }
400
401 # Create a new instance of the builder.
402 build = pakfire.builder.BuildEnviron(tmpfile, **kwargs)
403
404 try:
405 # Create the build environment.
406 build.start()
407
408 # Update the build status on the server.
409 self.upload_buildroot(build.installed_packages)
410 self.update_state("running")
411
412 # Run the build (with install test).
413 build.build(install_test=True)
414
415 # Copy the created packages to the tempdir.
416 build.copy_result(tmpdir)
417
418 finally:
419 # Cleanup the build environment.
420 build.stop()
421
422 # Jippie, build is finished, we are going to upload the files.
423 self.update_state("uploading")
424
425 # Walk through the result directory and upload all (binary) files.
426 # Skip that for test builds.
427 if not self.build_type == "test":
428 for dir, subdirs, files in os.walk(tmpdir):
429 for file in files:
430 file = os.path.join(dir, file)
431 if file in (logfile, tmpfile,):
432 continue
433
434 self.upload_file(file, "package")
435
436 except DependencyError, e:
437 message = "%s: %s" % (e.__class__.__name__, e)
438 self.update_state("dependency_error", message)
439 raise
440
441 except DownloadError, e:
442 message = "%s: %s" % (e.__class__.__name__, e)
443 self.update_state("download_error", message)
444 raise
445
446 finally:
447 # Upload the logfile in any case and if it exists.
448 if os.path.exists(logfile):
449 self.upload_file(logfile, "log")
450
451 # Cleanup the files we created.
452 pakfire.util.rm(tmpdir)
453
454 except DependencyError:
455 # This has already been reported.
456 raise
457
458 except (DownloadError,):
459 # Do not take any further action for these exceptions.
460 pass
461
462 except Exception, e:
463 # Format the exception and send it to the server.
464 message = "%s: %s" % (e.__class__.__name__, e)
465
466 self.update_state("failed", message)
467 raise
468
469 else:
470 self.update_state("finished")