]>
Commit | Line | Data |
---|---|---|
c62d93f1 MT |
1 | #!/usr/bin/python |
2 | ||
3 | import hashlib | |
4 | import multiprocessing | |
5 | import os | |
6 | import sys | |
7 | import tempfile | |
8 | import time | |
9 | ||
10 | import pakfire.api | |
11 | import pakfire.builder | |
12 | import pakfire.config | |
13 | import pakfire.downloader | |
14 | import pakfire.system | |
15 | import pakfire.util | |
16 | from pakfire.system import system | |
17 | ||
18 | import base | |
19 | ||
20 | from pakfire.constants import * | |
21 | ||
22 | import logging | |
23 | log = logging.getLogger("pakfire.client") | |
24 | ||
25 | def fork_builder(*args, **kwargs): | |
26 | """ | |
27 | Wrapper that runs ClientBuilder in a new process and catches | |
28 | any exception to report it to the main process. | |
29 | """ | |
30 | try: | |
31 | # Create new instance of the builder. | |
32 | cb = ClientBuilder(*args, **kwargs) | |
33 | ||
34 | # Run the build: | |
35 | cb.build() | |
36 | ||
37 | except Exception, e: | |
38 | # XXX catch the exception and log it. | |
39 | print e | |
40 | ||
41 | # End the process with an exit code. | |
42 | sys.exit(1) | |
43 | ||
44 | ||
45 | class PakfireDaemon(object): | |
46 | """ | |
47 | The PakfireDaemon class that creates a a new process per build | |
48 | job and also handles the keepalive/abort stuff. | |
49 | """ | |
50 | def __init__(self, server, hostname, secret): | |
51 | self.client = base.PakfireBuilderClient(server, hostname, secret) | |
52 | self.conn = self.client.conn | |
53 | ||
54 | # Save login data (to create child processes). | |
55 | self.server = server | |
56 | self.hostname = hostname | |
57 | self.__secret = secret | |
58 | ||
59 | # A list with all running processes. | |
60 | self.processes = [] | |
61 | self.pid2jobid = {} | |
62 | ||
63 | # Save when last keepalive was sent. | |
64 | self._last_keepalive = 0 | |
65 | ||
66 | def run(self, heartbeat=1, max_processes=None): | |
67 | # By default do not start more than two processes per CPU core. | |
68 | if max_processes is None: | |
69 | max_processes = system.cpu_count * 2 | |
70 | log.debug("Maximum number of simultaneous processes is: %s" % max_processes) | |
71 | ||
72 | # Indicates when to try to request a new job or aborted builds. | |
73 | last_job_request = 0 | |
74 | last_abort_request = 0 | |
75 | ||
76 | # Main loop. | |
77 | while True: | |
78 | # Send the keepalive regularly. | |
79 | self.send_keepalive() | |
80 | ||
81 | # Remove all finished builds. | |
82 | # "removed" indicates, if a process has actually finished. | |
83 | removed = self.remove_finished_builders() | |
84 | ||
85 | # If a build slot was freed, search immediately for a new job. | |
86 | if removed: | |
87 | last_job_request = 0 | |
88 | ||
89 | # Kill aborted jobs. | |
90 | if time.time() - last_abort_request >= 60: | |
91 | aborted = self.kill_aborted_jobs() | |
92 | ||
93 | # If a build slot was freed, search immediately for a new job. | |
94 | if aborted: | |
95 | last_job_request = 0 | |
96 | ||
97 | last_abort_request = time.time() | |
98 | ||
99 | # Check if the maximum number of processes was reached. | |
100 | # Actually the hub does manage this but this is an emergency | |
101 | # condition if anything goes wrong. | |
102 | if self.num_processes >= max_processes: | |
103 | log.debug("Reached maximum number of allowed processes (%s)." % max_processes) | |
104 | ||
105 | time.sleep(heartbeat) | |
106 | continue | |
107 | ||
108 | # Get new job. | |
109 | if time.time() - last_job_request >= 60 and not self.has_overload(): | |
110 | # If the last job request is older than a minute and we don't | |
111 | # have too much load, we go and check if there is something | |
112 | # to do for us. | |
113 | job = self.get_job() | |
114 | ||
115 | # If we got a job, we start a child process to work on it. | |
116 | if job: | |
117 | log.debug("Got a new job.") | |
118 | self.fork_builder(job) | |
119 | else: | |
120 | log.debug("No new job.") | |
121 | ||
122 | # Update the time when we requested a job. | |
123 | last_job_request = time.time() | |
124 | ||
125 | # Wait a moment before starting over. | |
126 | time.sleep(heartbeat) | |
127 | ||
128 | def shutdown(self): | |
129 | """ | |
130 | Shut down the daemon. | |
131 | This means to kill all child processes. | |
132 | ||
133 | The method blocks until all processes are shut down. | |
134 | """ | |
135 | for process in self.processes: | |
136 | log.info("Sending %s to terminate..." % process) | |
137 | ||
138 | process.terminate() | |
139 | else: | |
140 | log.info("No processes to kill. Shutting down immediately.") | |
141 | ||
142 | while self.processes: | |
143 | log.debug("%s process(es) is/are still running..." % len(self.processes)) | |
144 | ||
145 | for process in self.processes[:]: | |
146 | if not process.is_alive(): | |
147 | # The process has terminated. | |
148 | log.info("Process %s terminated with exit code: %s" % \ | |
149 | (process, process.exitcode)) | |
150 | ||
151 | self.processes.remove(process) | |
152 | ||
153 | @property | |
154 | def num_processes(self): | |
155 | # Return the number of processes. | |
156 | return len(self.processes) | |
157 | ||
158 | def get_job(self): | |
159 | """ | |
160 | Get a build job from the hub. | |
161 | """ | |
162 | log.info("Requesting a new job from the server...") | |
163 | ||
164 | # Get some information about this system. | |
165 | s = pakfire.system.System() | |
166 | ||
167 | # Fetch a build job from the hub. | |
168 | return self.client.conn.build_get_job(s.supported_arches) | |
169 | ||
170 | def has_overload(self): | |
171 | """ | |
172 | Checks, if the load average is not too high. | |
173 | ||
174 | On this is to be decided if a new job is taken. | |
175 | """ | |
176 | try: | |
177 | load1, load5, load15 = os.getloadavg() | |
178 | except OSError: | |
179 | # Could not determine the current loadavg. In that case we | |
180 | # assume that we don't have overload. | |
181 | return False | |
182 | ||
183 | # If there are more than 2 processes in the process queue per CPU | |
184 | # core we will assume that the system has heavy load and to not request | |
185 | # a new job. | |
186 | return load5 >= system.cpu_count * 2 | |
187 | ||
188 | def send_keepalive(self): | |
189 | """ | |
190 | When triggered, this method sends a keepalive to the hub. | |
191 | """ | |
192 | # Do not send a keepalive more often than twice a minute. | |
193 | if time.time() - self._last_keepalive < 30: | |
194 | return | |
195 | ||
196 | self.client.send_keepalive(overload=self.has_overload()) | |
197 | self._last_keepalive = time.time() | |
198 | ||
199 | def remove_finished_builders(self): | |
200 | # Return if any processes have been removed. | |
201 | ret = False | |
202 | ||
203 | # Search for any finished processes. | |
204 | for process in self.processes[:]: | |
205 | # If the process is not alive anymore... | |
206 | if not process.is_alive(): | |
207 | ret = True | |
208 | ||
209 | # ... check the exit code and log a message on errors. | |
210 | if process.exitcode == 0: | |
211 | log.debug("Process %s exited normally." % process) | |
212 | ||
213 | elif process.exitcode > 0: | |
214 | log.error("Process did not exit normally: %s code: %s" \ | |
215 | % (process, process.exitcode)) | |
216 | ||
217 | elif process.exitcode < 0: | |
218 | log.error("Process killed by signal: %s: code: %s" \ | |
219 | % (process, process.exitcode)) | |
220 | ||
221 | # If a program has crashed, we send that to the hub. | |
222 | job_id = self.pid2jobid.get(process.pid, None) | |
223 | if job_id: | |
224 | self.conn.build_job_crashed(job_id, process.exitcode) | |
225 | ||
226 | # Finally, remove the process from the process list. | |
227 | self.processes.remove(process) | |
228 | ||
229 | return ret | |
230 | ||
231 | def kill_aborted_jobs(self): | |
232 | log.debug("Requesting aborted jobs...") | |
233 | ||
234 | # Get a list of running job ids: | |
235 | running_jobs = self.pid2jobid.values() | |
236 | ||
237 | # If there are no running jobs, there is nothing to do. | |
238 | if not running_jobs: | |
239 | return False | |
240 | ||
241 | # Ask the hub for any build jobs to abort. | |
242 | aborted_jobs = self.conn.build_jobs_aborted(running_jobs) | |
243 | ||
244 | # If no build jobs were aborted, there is nothing to do. | |
245 | if not aborted_jobs: | |
246 | return False | |
247 | ||
248 | for process in self.processes[:]: | |
249 | job_id = self.pid2jobid.get(process.pid, None) | |
250 | if job_id and job_id in aborted_jobs: | |
251 | ||
252 | # Kill the process. | |
253 | log.info("Killing process %s which was aborted by the user." \ | |
254 | % process.pid) | |
255 | process.terminate() | |
256 | ||
257 | # Remove the process from the process list to avoid | |
258 | # that is will be cleaned up in the normal way. | |
259 | self.processes.remove(process) | |
260 | ||
261 | return True | |
262 | ||
263 | def fork_builder(self, job): | |
264 | """ | |
265 | For a new child process to create a new independent builder. | |
266 | """ | |
267 | # Create the Process object. | |
268 | process = multiprocessing.Process(target=fork_builder, | |
269 | args=(self.server, self.hostname, self.__secret, job)) | |
270 | # The process is running in daemon mode so it will try to kill | |
271 | # all child processes when exiting. | |
272 | process.daemon = True | |
273 | ||
274 | # Start the process. | |
275 | process.start() | |
276 | log.info("Started new process %s with PID %s." % (process, process.pid)) | |
277 | ||
278 | # Save the PID and the build id to track down | |
279 | # crashed builds. | |
280 | self.pid2jobid[process.pid] = job.get("id", None) | |
281 | ||
282 | # Append it to the process list. | |
283 | self.processes.append(process) | |
284 | ||
285 | ||
286 | class ClientBuilder(object): | |
287 | def __init__(self, server, hostname, secret, job): | |
288 | self.client = base.PakfireBuilderClient(server, hostname, secret) | |
289 | self.conn = self.client.conn | |
290 | ||
291 | # Store the information sent by the server here. | |
292 | self.build_job = job | |
293 | ||
294 | def update_state(self, state, message=None): | |
295 | self.conn.build_job_update_state(self.build_id, state, message) | |
296 | ||
297 | def upload_file(self, filename, type): | |
298 | assert os.path.exists(filename) | |
299 | assert type in ("package", "log") | |
300 | ||
301 | # First upload the file data and save the upload_id. | |
302 | upload_id = self.client._upload_file(filename) | |
303 | ||
304 | # Add the file to the build. | |
305 | return self.conn.build_job_add_file(self.build_id, upload_id, type) | |
306 | ||
307 | def upload_buildroot(self, installed_packages): | |
308 | pkgs = [] | |
309 | ||
310 | for pkg in installed_packages: | |
311 | pkgs.append((pkg.friendly_name, pkg.uuid)) | |
312 | ||
313 | return self.conn.build_upload_buildroot(self.build_id, pkgs) | |
314 | ||
315 | @property | |
316 | def build_id(self): | |
317 | if self.build_job: | |
318 | return self.build_job.get("id", None) | |
319 | ||
320 | @property | |
321 | def build_arch(self): | |
322 | if self.build_job: | |
323 | return self.build_job.get("arch", None) | |
324 | ||
325 | @property | |
326 | def build_source_url(self): | |
327 | if self.build_job: | |
328 | return self.build_job.get("source_url", None) | |
329 | ||
330 | @property | |
331 | def build_source_filename(self): | |
332 | if self.build_source_url: | |
333 | return os.path.basename(self.build_source_url) | |
334 | ||
335 | @property | |
336 | def build_source_hash512(self): | |
337 | if self.build_job: | |
338 | return self.build_job.get("source_hash512", None) | |
339 | ||
340 | @property | |
341 | def build_type(self): | |
342 | if self.build_job: | |
343 | return self.build_job.get("type", None) | |
344 | ||
345 | def build(self): | |
346 | # Cannot go on if I got no build job. | |
347 | if not self.build_job: | |
348 | logging.info("No job to work on...") | |
349 | return | |
350 | ||
351 | # Call the function that processes the build and try to catch general | |
352 | # exceptions and report them to the server. | |
353 | # If everything goes okay, we tell this the server, too. | |
354 | try: | |
355 | # Create a temporary file and a directory for the resulting files. | |
356 | tmpdir = tempfile.mkdtemp() | |
357 | tmpfile = os.path.join(tmpdir, self.build_source_filename) | |
358 | logfile = os.path.join(tmpdir, "build.log") | |
359 | ||
360 | # Get a package grabber and add mirror download capabilities to it. | |
361 | grabber = pakfire.downloader.PackageDownloader(pakfire.config.Config()) | |
362 | ||
363 | try: | |
364 | ## Download the source. | |
365 | grabber.urlgrab(self.build_source_url, filename=tmpfile) | |
366 | ||
367 | # Check if the download checksum matches (if provided). | |
368 | if self.build_source_hash512: | |
369 | h = hashlib.sha512() | |
370 | f = open(tmpfile, "rb") | |
371 | while True: | |
372 | buf = f.read(BUFFER_SIZE) | |
373 | if not buf: | |
374 | break | |
375 | ||
376 | h.update(buf) | |
377 | f.close() | |
378 | ||
379 | if not self.build_source_hash512 == h.hexdigest(): | |
380 | raise DownloadError, "Hash check did not succeed." | |
381 | ||
382 | # Create dist with arguments that are passed to the pakfire | |
383 | # builder. | |
384 | kwargs = { | |
385 | # Of course this is a release build. | |
386 | # i.e. don't use local packages. | |
387 | "builder_mode" : "release", | |
388 | ||
389 | # Set the build_id we got from the build service. | |
390 | "build_id" : self.build_id, | |
391 | ||
392 | # Files and directories (should be self explaining). | |
393 | "logfile" : logfile, | |
394 | ||
395 | # Distro configuration. | |
396 | "distro_config" : { | |
397 | "arch" : self.build_arch, | |
398 | }, | |
399 | } | |
400 | ||
401 | # Create a new instance of the builder. | |
402 | build = pakfire.builder.BuildEnviron(tmpfile, **kwargs) | |
403 | ||
404 | try: | |
405 | # Create the build environment. | |
406 | build.start() | |
407 | ||
408 | # Update the build status on the server. | |
409 | self.upload_buildroot(build.installed_packages) | |
410 | self.update_state("running") | |
411 | ||
412 | # Run the build (with install test). | |
413 | build.build(install_test=True) | |
414 | ||
415 | # Copy the created packages to the tempdir. | |
416 | build.copy_result(tmpdir) | |
417 | ||
418 | finally: | |
419 | # Cleanup the build environment. | |
420 | build.stop() | |
421 | ||
422 | # Jippie, build is finished, we are going to upload the files. | |
423 | self.update_state("uploading") | |
424 | ||
425 | # Walk through the result directory and upload all (binary) files. | |
426 | # Skip that for test builds. | |
427 | if not self.build_type == "test": | |
428 | for dir, subdirs, files in os.walk(tmpdir): | |
429 | for file in files: | |
430 | file = os.path.join(dir, file) | |
431 | if file in (logfile, tmpfile,): | |
432 | continue | |
433 | ||
434 | self.upload_file(file, "package") | |
435 | ||
436 | except DependencyError, e: | |
437 | message = "%s: %s" % (e.__class__.__name__, e) | |
438 | self.update_state("dependency_error", message) | |
439 | raise | |
440 | ||
441 | except DownloadError, e: | |
442 | message = "%s: %s" % (e.__class__.__name__, e) | |
443 | self.update_state("download_error", message) | |
444 | raise | |
445 | ||
446 | finally: | |
447 | # Upload the logfile in any case and if it exists. | |
448 | if os.path.exists(logfile): | |
449 | self.upload_file(logfile, "log") | |
450 | ||
451 | # Cleanup the files we created. | |
452 | pakfire.util.rm(tmpdir) | |
453 | ||
454 | except DependencyError: | |
455 | # This has already been reported. | |
456 | raise | |
457 | ||
458 | except (DownloadError,): | |
459 | # Do not take any further action for these exceptions. | |
460 | pass | |
461 | ||
462 | except Exception, e: | |
463 | # Format the exception and send it to the server. | |
464 | message = "%s: %s" % (e.__class__.__name__, e) | |
465 | ||
466 | self.update_state("failed", message) | |
467 | raise | |
468 | ||
469 | else: | |
470 | self.update_state("finished") |