]>
Commit | Line | Data |
---|---|---|
c62d93f1 MT |
1 | #!/usr/bin/python |
2 | ||
3 | import hashlib | |
4 | import multiprocessing | |
5 | import os | |
6 | import sys | |
7 | import tempfile | |
8 | import time | |
9 | ||
7f2b6c7e | 10 | import pakfire.base |
c62d93f1 MT |
11 | import pakfire.builder |
12 | import pakfire.config | |
13 | import pakfire.downloader | |
14 | import pakfire.system | |
15 | import pakfire.util | |
16 | from pakfire.system import system | |
17 | ||
18 | import base | |
19 | ||
20 | from pakfire.constants import * | |
26cdef8f | 21 | from pakfire.i18n import _ |
c62d93f1 MT |
22 | |
23 | import logging | |
24 | log = logging.getLogger("pakfire.client") | |
25 | ||
26 | def fork_builder(*args, **kwargs): | |
27 | """ | |
28 | Wrapper that runs ClientBuilder in a new process and catches | |
29 | any exception to report it to the main process. | |
30 | """ | |
31 | try: | |
32 | # Create new instance of the builder. | |
33 | cb = ClientBuilder(*args, **kwargs) | |
34 | ||
35 | # Run the build: | |
36 | cb.build() | |
37 | ||
38 | except Exception, e: | |
39 | # XXX catch the exception and log it. | |
40 | print e | |
41 | ||
42 | # End the process with an exit code. | |
43 | sys.exit(1) | |
44 | ||
45 | ||
46 | class PakfireDaemon(object): | |
47 | """ | |
48 | The PakfireDaemon class that creates a a new process per build | |
49 | job and also handles the keepalive/abort stuff. | |
50 | """ | |
2dbaea05 MT |
51 | def __init__(self): |
52 | self.config = pakfire.config.ConfigDaemon() | |
53 | ||
54 | server = self.config.get("daemon", "server") | |
55 | hostname = self.config.get("daemon", "hostname") | |
56 | secret = self.config.get("daemon", "secret") | |
57 | ||
c62d93f1 MT |
58 | self.client = base.PakfireBuilderClient(server, hostname, secret) |
59 | self.conn = self.client.conn | |
60 | ||
61 | # Save login data (to create child processes). | |
62 | self.server = server | |
63 | self.hostname = hostname | |
64 | self.__secret = secret | |
65 | ||
66 | # A list with all running processes. | |
67 | self.processes = [] | |
68 | self.pid2jobid = {} | |
69 | ||
70 | # Save when last keepalive was sent. | |
71 | self._last_keepalive = 0 | |
72 | ||
d5166686 | 73 | # Send an initial keepalive message. |
96ccd5d5 | 74 | self.send_keepalive(force=True) |
d5166686 | 75 | |
c62d93f1 MT |
76 | def run(self, heartbeat=1, max_processes=None): |
77 | # By default do not start more than two processes per CPU core. | |
78 | if max_processes is None: | |
79 | max_processes = system.cpu_count * 2 | |
80 | log.debug("Maximum number of simultaneous processes is: %s" % max_processes) | |
81 | ||
82 | # Indicates when to try to request a new job or aborted builds. | |
83 | last_job_request = 0 | |
84 | last_abort_request = 0 | |
85 | ||
86 | # Main loop. | |
87 | while True: | |
88 | # Send the keepalive regularly. | |
89 | self.send_keepalive() | |
90 | ||
91 | # Remove all finished builds. | |
92 | # "removed" indicates, if a process has actually finished. | |
93 | removed = self.remove_finished_builders() | |
94 | ||
95 | # If a build slot was freed, search immediately for a new job. | |
96 | if removed: | |
97 | last_job_request = 0 | |
98 | ||
99 | # Kill aborted jobs. | |
100 | if time.time() - last_abort_request >= 60: | |
101 | aborted = self.kill_aborted_jobs() | |
102 | ||
103 | # If a build slot was freed, search immediately for a new job. | |
104 | if aborted: | |
105 | last_job_request = 0 | |
106 | ||
107 | last_abort_request = time.time() | |
108 | ||
109 | # Check if the maximum number of processes was reached. | |
110 | # Actually the hub does manage this but this is an emergency | |
111 | # condition if anything goes wrong. | |
112 | if self.num_processes >= max_processes: | |
113 | log.debug("Reached maximum number of allowed processes (%s)." % max_processes) | |
114 | ||
115 | time.sleep(heartbeat) | |
116 | continue | |
117 | ||
118 | # Get new job. | |
119 | if time.time() - last_job_request >= 60 and not self.has_overload(): | |
120 | # If the last job request is older than a minute and we don't | |
121 | # have too much load, we go and check if there is something | |
122 | # to do for us. | |
123 | job = self.get_job() | |
124 | ||
125 | # If we got a job, we start a child process to work on it. | |
126 | if job: | |
127 | log.debug("Got a new job.") | |
128 | self.fork_builder(job) | |
129 | else: | |
130 | log.debug("No new job.") | |
131 | ||
132 | # Update the time when we requested a job. | |
133 | last_job_request = time.time() | |
134 | ||
135 | # Wait a moment before starting over. | |
136 | time.sleep(heartbeat) | |
137 | ||
138 | def shutdown(self): | |
139 | """ | |
140 | Shut down the daemon. | |
141 | This means to kill all child processes. | |
142 | ||
143 | The method blocks until all processes are shut down. | |
144 | """ | |
145 | for process in self.processes: | |
146 | log.info("Sending %s to terminate..." % process) | |
147 | ||
148 | process.terminate() | |
149 | else: | |
150 | log.info("No processes to kill. Shutting down immediately.") | |
151 | ||
152 | while self.processes: | |
153 | log.debug("%s process(es) is/are still running..." % len(self.processes)) | |
154 | ||
155 | for process in self.processes[:]: | |
156 | if not process.is_alive(): | |
157 | # The process has terminated. | |
158 | log.info("Process %s terminated with exit code: %s" % \ | |
159 | (process, process.exitcode)) | |
160 | ||
161 | self.processes.remove(process) | |
162 | ||
163 | @property | |
164 | def num_processes(self): | |
165 | # Return the number of processes. | |
166 | return len(self.processes) | |
167 | ||
cd88c6d8 MT |
168 | @property |
169 | def free_space(self): | |
170 | mp = system.get_mountpoint(BUILD_ROOT) | |
171 | ||
172 | return mp.space_left | |
173 | ||
c62d93f1 MT |
174 | def get_job(self): |
175 | """ | |
176 | Get a build job from the hub. | |
177 | """ | |
cd88c6d8 MT |
178 | if not self.free_space >= 2 * 1024**3: |
179 | log.warning(_("Less than 2GB of free space. Cannot request a new job.")) | |
180 | return | |
181 | ||
c62d93f1 MT |
182 | log.info("Requesting a new job from the server...") |
183 | ||
184 | # Get some information about this system. | |
185 | s = pakfire.system.System() | |
186 | ||
187 | # Fetch a build job from the hub. | |
188 | return self.client.conn.build_get_job(s.supported_arches) | |
189 | ||
190 | def has_overload(self): | |
191 | """ | |
192 | Checks, if the load average is not too high. | |
193 | ||
194 | On this is to be decided if a new job is taken. | |
195 | """ | |
196 | try: | |
197 | load1, load5, load15 = os.getloadavg() | |
198 | except OSError: | |
199 | # Could not determine the current loadavg. In that case we | |
200 | # assume that we don't have overload. | |
201 | return False | |
202 | ||
203 | # If there are more than 2 processes in the process queue per CPU | |
204 | # core we will assume that the system has heavy load and to not request | |
205 | # a new job. | |
206 | return load5 >= system.cpu_count * 2 | |
207 | ||
96ccd5d5 | 208 | def send_keepalive(self, force=False): |
c62d93f1 MT |
209 | """ |
210 | When triggered, this method sends a keepalive to the hub. | |
211 | """ | |
212 | # Do not send a keepalive more often than twice a minute. | |
213 | if time.time() - self._last_keepalive < 30: | |
214 | return | |
215 | ||
2dbaea05 | 216 | kwargs = { |
96ccd5d5 | 217 | "force" : force, |
2dbaea05 MT |
218 | "overload" : self.has_overload(), |
219 | "free_space" : self.free_space / 1024**2, | |
220 | } | |
cd88c6d8 | 221 | |
2dbaea05 | 222 | self.client.send_keepalive(**kwargs) |
c62d93f1 MT |
223 | self._last_keepalive = time.time() |
224 | ||
225 | def remove_finished_builders(self): | |
226 | # Return if any processes have been removed. | |
227 | ret = False | |
228 | ||
229 | # Search for any finished processes. | |
230 | for process in self.processes[:]: | |
231 | # If the process is not alive anymore... | |
232 | if not process.is_alive(): | |
233 | ret = True | |
234 | ||
235 | # ... check the exit code and log a message on errors. | |
236 | if process.exitcode == 0: | |
237 | log.debug("Process %s exited normally." % process) | |
238 | ||
239 | elif process.exitcode > 0: | |
240 | log.error("Process did not exit normally: %s code: %s" \ | |
241 | % (process, process.exitcode)) | |
242 | ||
243 | elif process.exitcode < 0: | |
244 | log.error("Process killed by signal: %s: code: %s" \ | |
245 | % (process, process.exitcode)) | |
246 | ||
247 | # If a program has crashed, we send that to the hub. | |
248 | job_id = self.pid2jobid.get(process.pid, None) | |
249 | if job_id: | |
250 | self.conn.build_job_crashed(job_id, process.exitcode) | |
251 | ||
252 | # Finally, remove the process from the process list. | |
253 | self.processes.remove(process) | |
254 | ||
255 | return ret | |
256 | ||
257 | def kill_aborted_jobs(self): | |
258 | log.debug("Requesting aborted jobs...") | |
259 | ||
260 | # Get a list of running job ids: | |
261 | running_jobs = self.pid2jobid.values() | |
262 | ||
263 | # If there are no running jobs, there is nothing to do. | |
264 | if not running_jobs: | |
265 | return False | |
266 | ||
267 | # Ask the hub for any build jobs to abort. | |
268 | aborted_jobs = self.conn.build_jobs_aborted(running_jobs) | |
269 | ||
270 | # If no build jobs were aborted, there is nothing to do. | |
271 | if not aborted_jobs: | |
272 | return False | |
273 | ||
274 | for process in self.processes[:]: | |
275 | job_id = self.pid2jobid.get(process.pid, None) | |
276 | if job_id and job_id in aborted_jobs: | |
2121826f | 277 | |
c62d93f1 MT |
278 | # Kill the process. |
279 | log.info("Killing process %s which was aborted by the user." \ | |
280 | % process.pid) | |
281 | process.terminate() | |
282 | ||
283 | # Remove the process from the process list to avoid | |
284 | # that is will be cleaned up in the normal way. | |
285 | self.processes.remove(process) | |
286 | ||
287 | return True | |
288 | ||
289 | def fork_builder(self, job): | |
290 | """ | |
291 | For a new child process to create a new independent builder. | |
292 | """ | |
293 | # Create the Process object. | |
294 | process = multiprocessing.Process(target=fork_builder, | |
295 | args=(self.server, self.hostname, self.__secret, job)) | |
296 | # The process is running in daemon mode so it will try to kill | |
297 | # all child processes when exiting. | |
298 | process.daemon = True | |
299 | ||
300 | # Start the process. | |
301 | process.start() | |
302 | log.info("Started new process %s with PID %s." % (process, process.pid)) | |
303 | ||
304 | # Save the PID and the build id to track down | |
305 | # crashed builds. | |
306 | self.pid2jobid[process.pid] = job.get("id", None) | |
307 | ||
308 | # Append it to the process list. | |
309 | self.processes.append(process) | |
310 | ||
311 | ||
312 | class ClientBuilder(object): | |
313 | def __init__(self, server, hostname, secret, job): | |
314 | self.client = base.PakfireBuilderClient(server, hostname, secret) | |
315 | self.conn = self.client.conn | |
316 | ||
317 | # Store the information sent by the server here. | |
318 | self.build_job = job | |
319 | ||
320 | def update_state(self, state, message=None): | |
321 | self.conn.build_job_update_state(self.build_id, state, message) | |
322 | ||
323 | def upload_file(self, filename, type): | |
324 | assert os.path.exists(filename) | |
325 | assert type in ("package", "log") | |
326 | ||
327 | # First upload the file data and save the upload_id. | |
328 | upload_id = self.client._upload_file(filename) | |
329 | ||
330 | # Add the file to the build. | |
331 | return self.conn.build_job_add_file(self.build_id, upload_id, type) | |
332 | ||
333 | def upload_buildroot(self, installed_packages): | |
334 | pkgs = [] | |
335 | ||
336 | for pkg in installed_packages: | |
b856696a | 337 | assert pkg.uuid, "%s has got no UUID" |
c62d93f1 MT |
338 | pkgs.append((pkg.friendly_name, pkg.uuid)) |
339 | ||
340 | return self.conn.build_upload_buildroot(self.build_id, pkgs) | |
341 | ||
342 | @property | |
343 | def build_id(self): | |
344 | if self.build_job: | |
345 | return self.build_job.get("id", None) | |
346 | ||
347 | @property | |
348 | def build_arch(self): | |
349 | if self.build_job: | |
350 | return self.build_job.get("arch", None) | |
351 | ||
352 | @property | |
353 | def build_source_url(self): | |
354 | if self.build_job: | |
355 | return self.build_job.get("source_url", None) | |
356 | ||
357 | @property | |
358 | def build_source_filename(self): | |
359 | if self.build_source_url: | |
360 | return os.path.basename(self.build_source_url) | |
361 | ||
362 | @property | |
363 | def build_source_hash512(self): | |
364 | if self.build_job: | |
365 | return self.build_job.get("source_hash512", None) | |
366 | ||
367 | @property | |
368 | def build_type(self): | |
369 | if self.build_job: | |
370 | return self.build_job.get("type", None) | |
371 | ||
a6bd96bc MT |
372 | @property |
373 | def build_config(self): | |
374 | if self.build_job: | |
375 | return self.build_job.get("config", None) | |
376 | ||
c62d93f1 MT |
377 | def build(self): |
378 | # Cannot go on if I got no build job. | |
379 | if not self.build_job: | |
380 | logging.info("No job to work on...") | |
381 | return | |
382 | ||
383 | # Call the function that processes the build and try to catch general | |
384 | # exceptions and report them to the server. | |
385 | # If everything goes okay, we tell this the server, too. | |
386 | try: | |
387 | # Create a temporary file and a directory for the resulting files. | |
388 | tmpdir = tempfile.mkdtemp() | |
389 | tmpfile = os.path.join(tmpdir, self.build_source_filename) | |
390 | logfile = os.path.join(tmpdir, "build.log") | |
a6bd96bc | 391 | cfgfile = os.path.join(tmpdir, "job-%s.conf" % self.build_id) |
c62d93f1 MT |
392 | |
393 | # Get a package grabber and add mirror download capabilities to it. | |
394 | grabber = pakfire.downloader.PackageDownloader(pakfire.config.Config()) | |
395 | ||
7f2b6c7e MT |
396 | # Create pakfire configuration instance. |
397 | config = pakfire.config.ConfigDaemon() | |
398 | config.parse(self.build_config) | |
399 | ||
400 | # Create pakfire instance. | |
401 | p = None | |
c62d93f1 | 402 | try: |
7f2b6c7e MT |
403 | p = pakfire.base.PakfireBuilder(config=config, arch=self.build_arch) |
404 | ||
405 | # Download the source package. | |
406 | grabber = pakfire.downloader.PackageDownloader(p) | |
c62d93f1 MT |
407 | grabber.urlgrab(self.build_source_url, filename=tmpfile) |
408 | ||
409 | # Check if the download checksum matches (if provided). | |
410 | if self.build_source_hash512: | |
411 | h = hashlib.sha512() | |
412 | f = open(tmpfile, "rb") | |
413 | while True: | |
414 | buf = f.read(BUFFER_SIZE) | |
415 | if not buf: | |
416 | break | |
417 | ||
418 | h.update(buf) | |
419 | f.close() | |
420 | ||
421 | if not self.build_source_hash512 == h.hexdigest(): | |
422 | raise DownloadError, "Hash check did not succeed." | |
423 | ||
7f2b6c7e MT |
424 | # Create a new instance of a build environment. |
425 | build = pakfire.builder.BuildEnviron(p, tmpfile, | |
426 | release_build=True, build_id=self.build_id, logfile=logfile) | |
c62d93f1 MT |
427 | |
428 | try: | |
429 | # Create the build environment. | |
430 | build.start() | |
431 | ||
432 | # Update the build status on the server. | |
433 | self.upload_buildroot(build.installed_packages) | |
434 | self.update_state("running") | |
435 | ||
436 | # Run the build (with install test). | |
437 | build.build(install_test=True) | |
438 | ||
439 | # Copy the created packages to the tempdir. | |
440 | build.copy_result(tmpdir) | |
441 | ||
442 | finally: | |
443 | # Cleanup the build environment. | |
444 | build.stop() | |
445 | ||
446 | # Jippie, build is finished, we are going to upload the files. | |
447 | self.update_state("uploading") | |
448 | ||
449 | # Walk through the result directory and upload all (binary) files. | |
450 | # Skip that for test builds. | |
451 | if not self.build_type == "test": | |
452 | for dir, subdirs, files in os.walk(tmpdir): | |
453 | for file in files: | |
454 | file = os.path.join(dir, file) | |
455 | if file in (logfile, tmpfile,): | |
456 | continue | |
457 | ||
458 | self.upload_file(file, "package") | |
459 | ||
460 | except DependencyError, e: | |
461 | message = "%s: %s" % (e.__class__.__name__, e) | |
462 | self.update_state("dependency_error", message) | |
463 | raise | |
464 | ||
465 | except DownloadError, e: | |
466 | message = "%s: %s" % (e.__class__.__name__, e) | |
467 | self.update_state("download_error", message) | |
468 | raise | |
469 | ||
470 | finally: | |
7f2b6c7e MT |
471 | if p: |
472 | p.destroy() | |
473 | ||
c62d93f1 MT |
474 | # Upload the logfile in any case and if it exists. |
475 | if os.path.exists(logfile): | |
476 | self.upload_file(logfile, "log") | |
477 | ||
478 | # Cleanup the files we created. | |
479 | pakfire.util.rm(tmpdir) | |
480 | ||
481 | except DependencyError: | |
482 | # This has already been reported. | |
483 | raise | |
484 | ||
485 | except (DownloadError,): | |
486 | # Do not take any further action for these exceptions. | |
487 | pass | |
488 | ||
489 | except Exception, e: | |
490 | # Format the exception and send it to the server. | |
491 | message = "%s: %s" % (e.__class__.__name__, e) | |
492 | ||
493 | self.update_state("failed", message) | |
494 | raise | |
495 | ||
496 | else: | |
497 | self.update_state("finished") |