]> git.ipfire.org Git - people/stevee/pakfire.git/blame - python/pakfire/daemon.py
Update pakfire-daemon:
[people/stevee/pakfire.git] / python / pakfire / daemon.py
CommitLineData
aa14071d
MT
1#!/usr/bin/python
2
3import hashlib
4import json
5import multiprocessing
6import os
7import signal
8import sys
9import tempfile
10import time
11
12import pakfire.base
13import pakfire.builder
14import pakfire.config
15import pakfire.downloader
16import pakfire.system
17import pakfire.util
18from pakfire.system import system
19
20import base
21import transport
22
23from pakfire.constants import *
24from pakfire.i18n import _
25
26import logging
27log = logging.getLogger("pakfire.daemon")
28
29class BuildJob(dict):
30 """
31 Wrapper class for build jobs, that are received from the hub.
32
33 This makes accessing attributes more easy.
34 """
35 def __getattr__(self, key):
36 try:
37 return self[key]
38 except KeyError:
39 raise AttributeError, key
40
41
42class PakfireDaemon(object):
43 def __init__(self, config):
44 self.config = config
45
46 # Indicates if this daemon is in running mode.
47 self.__running = True
48
49 # List of worker processes.
50 self.__workers = []
51
52 # Create connection to the hub.
53 self.transport = transport.PakfireHubTransport(self.config)
54
55 ### Configuration
56 # Number of workers in waiting state.
57 self.max_waiting = 1
58
59 # Number of running workers.
60 self.max_running = system.cpu_count * 2
61
62 def run(self, heartbeat=30):
63 """
64 Main loop.
65 """
66 # Register signal handlers.
67 self.register_signal_handlers()
68
69 # Send our profile to the hub.
70 self.send_builder_info()
71
72 while self.__running:
73 time_started = time.time()
74
75 # Send keepalive message.
76 self.send_keepalive()
77
78 # Spawn a sufficient number of worker processes.
79 self.spawn_workers_if_needed()
80
81 # Get runtime of this loop iteration.
82 time_elapsed = time.time() - time_started
83
84 # Wait if the heartbeat time has not been reached, yet.
85 if time_elapsed < heartbeat:
86 time.sleep(heartbeat - time_elapsed)
87
88 # Main loop has ended, but we wait until all workers have finished.
89 self.terminate_all_workers()
90
91 def shutdown(self):
92 """
93 Terminates all workers and exists the daemon.
94 """
95 if not self.__running:
96 return
97
98 log.info(_("Shutting down..."))
99 self.__running = False
100
101 def spawn_workers_if_needed(self, *args, **kwargs):
102 """
103 Spawns more workers if needed.
104 """
105 # Do not create any more processes when the daemon is shutting down.
106 if not self.__running:
107 return
108
109 # Cleanup all other workers.
110 self.cleanup_workers()
111
112 # Do not create more workers if there are already enough workers
113 # active.
114 if len(self.workers) >= self.max_running:
115 log.warning("More workers running than allowed")
116 return
117
118 # Do nothing, if there is are already enough workers waiting.
119 wanted_waiting_workers = self.max_waiting - len(self.waiting_workers)
120 if wanted_waiting_workers <= 0:
121 return
122
123 # Spawn a new worker.
124 for i in range(wanted_waiting_workers):
125 self.spawn_worker(*args, **kwargs)
126
127 def spawn_worker(self, *args, **kwargs):
128 """
129 Spawns a new worker process.
130 """
131 worker = PakfireWorker(config=self.config, *args, **kwargs)
132 worker.start()
133
134 log.debug("Spawned new worker process: %s" % worker)
135 self.__workers.append(worker)
136
137 def terminate_worker(self, worker):
138 """
139 Terminates the given worker.
140 """
141 log.warning(_("Terminating worker process: %s") % worker)
142
143 worker.terminate()
144
145 def terminate_all_workers(self):
146 """
147 Terminates all workers.
148 """
149 for worker in self.workers:
150 self.terminate_worker(worker)
151
152 # Wait until the worker has finished.
153 worker.join()
154
155 def remove_worker(self, worker):
156 """
157 Removes a worker from the internal list of worker processes.
158 """
159 assert not worker.is_alive(), "Remove alive worker?"
160
161 log.debug("Removing worker: %s" % worker)
162 try:
163 self.__workers.remove(worker)
164 except:
165 pass
166
167 def cleanup_workers(self):
168 """
169 Remove workers that are not alive any more.
170 """
171 for worker in self.workers:
172 if worker.is_alive():
173 continue
174
175 self.remove_worker(worker)
176
177 @property
178 def workers(self):
179 return self.__workers[:]
180
181 @property
182 def running_workers(self):
183 workers = []
184
185 for worker in self.workers:
186 if worker.waiting.is_set():
187 continue
188
189 workers.append(worker)
190
191 return workers
192
193 @property
194 def waiting_workers(self):
195 workers = []
196
197 for worker in self.workers:
198 if worker.waiting.is_set():
199 workers.append(worker)
200
201 return workers
202
203 # Signal handling.
204
205 def register_signal_handlers(self):
206 signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
207 signal.signal(signal.SIGINT, self.handle_SIGTERM)
208 signal.signal(signal.SIGTERM, self.handle_SIGTERM)
209
210 def handle_SIGCHLD(self, signum, frame):
211 """
212 Handle signal SIGCHLD.
213 """
214 # Spawn new workers if necessary.
215 self.spawn_workers_if_needed()
216
217 def handle_SIGTERM(self, signum, frame):
218 """
219 Handle signal SIGTERM.
220 """
221 # Just shut down.
222 self.shutdown()
223
224 # Talking to the hub.
225
226 def send_builder_info(self):
227 log.info(_("Sending builder information to hub..."))
228
229 data = {
230 # CPU info
231 "cpu_model" : system.cpu_model,
232 "cpu_count" : system.cpu_count,
233 "cpu_arch" : system.native_arch,
234 "cpu_bogomips" : system.cpu_bogomips,
235
236 # Memory + swap
237 "mem_total" : system.memory,
238 "swap_total" : system.swap_total,
239
240 # Pakfire + OS
241 "pakfire_version" : PAKFIRE_VERSION,
242 "host_key" : self.config.get("signatures", "host_key", None),
243 "os_name" : system.distro.pretty_name,
244
245 # Supported arches
246 "supported_arches" : ",".join(system.supported_arches),
247 }
248 self.transport.post("/builders/info", data=data)
249
250 def send_keepalive(self):
251 log.debug("Sending keepalive message to hub...")
252
253 data = {
254 # Load average.
255 "loadavg1" : system.loadavg1,
256 "loadavg5" : system.loadavg5,
257 "loadavg15" : system.loadavg15,
258
259 # Memory
260 "mem_total" : system.memory_total,
261 "mem_free" : system.memory_free,
262
263 # Swap
264 "swap_total" : system.swap_total,
265 "swap_free" : system.swap_free,
266
267 # Disk space
268 "space_free" : self.free_space,
269 }
270 self.transport.post("/builders/keepalive", data=data)
271
272 @property
273 def free_space(self):
274 mp = system.get_mountpoint(BUILD_ROOT)
275
276 return mp.space_left
277
278
279class PakfireWorker(multiprocessing.Process):
280 def __init__(self, config, waiting=None):
281 multiprocessing.Process.__init__(self)
282
283 # Save config.
284 self.config = config
285
286 # Waiting event. Clear if this worker is running a build.
287 self.waiting = multiprocessing.Event()
288 self.waiting.set()
289
290 # Indicates if this worker is running.
291 self.__running = True
292
293 def run(self):
294 # Register signal handlers.
295 self.register_signal_handlers()
296
297 # Create connection to the hub.
298 self.transport = transport.PakfireHubTransport(self.config)
299 self.transport.fork()
300
301 while self.__running:
302 # Try to get a new build job.
303 job = self.get_new_build_job()
304 if not job:
305 continue
306
307 # If we got a job, we are not waiting anymore.
308 self.waiting.clear()
309
310 # Run the job and return.
311 return self.execute_job(job)
312
313 def shutdown(self):
314 self.__running = False
315
316 # When we are just waiting, we can edit right away.
317 if self.waiting.is_set():
318 log.debug("Exiting immediately")
319 sys.exit(1)
320
321 # XXX figure out what to do, when a build is running.
322
323 # Signal handling.
324
325 def register_signal_handlers(self):
326 signal.signal(signal.SIGCHLD, self.handle_SIGCHLD)
327 signal.signal(signal.SIGINT, self.handle_SIGTERM)
328 signal.signal(signal.SIGTERM, self.handle_SIGTERM)
329
330 def handle_SIGCHLD(self, signum, frame):
331 """
332 Handle signal SIGCHLD.
333 """
334 # Must be here so that SIGCHLD won't be propagated to
335 # PakfireDaemon.
336 pass
337
338 def handle_SIGTERM(self, signum, frame):
339 """
340 Handle signal SIGTERM.
341 """
342 self.shutdown()
343
344 def get_new_build_job(self, timeout=600):
345 log.debug("Requesting new job...")
346
347 try:
348 job = self.transport.get_json("/builders/jobs/queue",
349 data={ "timeout" : timeout, }, timeout=timeout)
350
351 if job:
352 return BuildJob(job)
353
354 # As this is a long poll request, it is okay to time out.
355 except TransportMaxTriesExceededError:
356 pass
357
358 def execute_job(self, job):
359 log.debug("Executing job: %s" % job)
360
361 # Call the function that processes the build and try to catch general
362 # exceptions and report them to the server.
363 # If everything goes okay, we tell this the server, too.
364 try:
365 # Create a temporary file and a directory for the resulting files.
366 tmpdir = tempfile.mkdtemp()
367 tmpfile = os.path.join(tmpdir, os.path.basename(job.source_url))
368 logfile = os.path.join(tmpdir, "build.log")
369
370 # Create pakfire configuration instance.
371 config = pakfire.config.ConfigDaemon()
372 config.parse(job.config)
373
374 # Create pakfire instance.
375 p = None
376 try:
377 p = pakfire.base.PakfireBuilder(config=config, arch=job.arch)
378
379 # Download the source package.
380 grabber = pakfire.downloader.PackageDownloader(p)
381 grabber.urlgrab(job.source_url, filename=tmpfile)
382
383 # Check if the download checksum matches (if provided).
384 if job.source_hash_sha512:
385 h = hashlib.new("sha512")
386 f = open(tmpfile, "rb")
387 while True:
388 buf = f.read(BUFFER_SIZE)
389 if not buf:
390 break
391
392 h.update(buf)
393 f.close()
394
395 if not job.source_hash_sha512 == h.hexdigest():
396 raise DownloadError, "Hash check did not succeed."
397
398 # Create a new instance of a build environment.
399 build = pakfire.builder.BuildEnviron(p, tmpfile,
400 release_build=True, build_id=job.id, logfile=logfile)
401
402 try:
403 # Create the build environment.
404 build.start()
405
406 # Update the build status on the server.
407 self.upload_buildroot(job, build.installed_packages)
408 self.update_state(job, "running")
409
410 # Run the build (without install test).
411 build.build(install_test=False)
412
413 # Copy the created packages to the tempdir.
414 build.copy_result(tmpdir)
415
416 finally:
417 # Cleanup the build environment.
418 build.stop()
419
420 # Jippie, build is finished, we are going to upload the files.
421 self.update_state(job, "uploading")
422
423 # Walk through the result directory and upload all (binary) files.
424 # Skip that for test builds.
425 if not job.type == "test":
426 for dir, subdirs, files in os.walk(tmpdir):
427 for file in files:
428 file = os.path.join(dir, file)
429 if file in (logfile, tmpfile,):
430 continue
431
432 self.upload_file(job, file, "package")
433
434 except DependencyError, e:
435 message = "%s: %s" % (e.__class__.__name__, e)
436 self.update_state(job, "dependency_error", message)
437 raise
438
439 except DownloadError, e:
440 message = "%s: %s" % (e.__class__.__name__, e)
441 self.update_state(job, "download_error", message)
442 raise
443
444 finally:
445 if p:
446 p.destroy()
447
448 # Upload the logfile in any case and if it exists.
449 if os.path.exists(logfile):
450 self.upload_file(job, logfile, "log")
451
452 # Cleanup the files we created.
453 pakfire.util.rm(tmpdir)
454
455 except DependencyError:
456 # This has already been reported.
457 raise
458
459 except (DownloadError,):
460 # Do not take any further action for these exceptions.
461 pass
462
463 except (KeyboardInterrupt, SystemExit):
464 self.update_state(job, "aborted")
465
466 except Exception, e:
467 # Format the exception and send it to the server.
468 message = "%s: %s" % (e.__class__.__name__, e)
469
470 self.update_state(job, "failed", message)
471 raise
472
473 else:
474 self.update_state(job, "finished")
475
476 def update_state(self, job, state, message=None):
477 """
478 Update state of the build job on the hub.
479 """
480 data = {
481 "message" : message or "",
482 }
483
484 self.transport.post("/builders/jobs/%s/state/%s" % (job.id, state),
485 data=data)
486
487 def upload_file(self, job, filename, type):
488 assert os.path.exists(filename)
489 assert type in ("package", "log")
490
491 # First upload the file data and save the upload_id.
492 upload_id = self.transport.upload_file(filename)
493
494 data = {
495 "type" : type,
496 }
497
498 # Add the file to the build.
499 self.transport.post("/builders/jobs/%s/addfile/%s" % (job.id, upload_id),
500 data=data)
501
502 def upload_buildroot(self, job, installed_packages):
503 pkgs = []
504 for pkg in installed_packages:
505 pkgs.append((pkg.friendly_name, pkg.uuid))
506
507 data = { "buildroot" : json.dumps(pkgs) }
508
509 self.transport.post("/builders/jobs/%s/buildroot" % job.id, data=data)