13 import pakfire
.builder
15 import pakfire
.downloader
18 from pakfire
.system
import system
23 from pakfire
.constants
import *
24 from pakfire
.i18n
import _
27 log
= logging
.getLogger("pakfire.daemon")
31 Wrapper class for build jobs, that are received from the hub.
33 This makes accessing attributes more easy.
35 def __getattr__(self
, key
):
39 raise AttributeError, key
42 class PakfireDaemon(object):
43 def __init__(self
, config
):
46 # Indicates if this daemon is in running mode.
49 # List of worker processes.
52 # Create connection to the hub.
53 self
.transport
= transport
.PakfireHubTransport(self
.config
)
56 # Number of workers in waiting state.
59 # Number of running workers.
60 self
.max_running
= system
.cpu_count
* 2
62 def run(self
, heartbeat
=30):
66 # Register signal handlers.
67 self
.register_signal_handlers()
69 # Send our profile to the hub.
70 self
.send_builder_info()
73 time_started
= time
.time()
75 # Send keepalive message.
78 # Spawn a sufficient number of worker processes.
79 self
.spawn_workers_if_needed()
81 # Get runtime of this loop iteration.
82 time_elapsed
= time
.time() - time_started
84 # Wait if the heartbeat time has not been reached, yet.
85 if time_elapsed
< heartbeat
:
86 time
.sleep(heartbeat
- time_elapsed
)
88 # Main loop has ended, but we wait until all workers have finished.
89 self
.terminate_all_workers()
93 Terminates all workers and exists the daemon.
95 if not self
.__running
:
98 log
.info(_("Shutting down..."))
99 self
.__running
= False
101 def spawn_workers_if_needed(self
, *args
, **kwargs
):
103 Spawns more workers if needed.
105 # Do not create any more processes when the daemon is shutting down.
106 if not self
.__running
:
109 # Cleanup all other workers.
110 self
.cleanup_workers()
112 # Do not create more workers if there are already enough workers
114 if len(self
.workers
) >= self
.max_running
:
115 log
.warning("More workers running than allowed")
118 # Do nothing, if there is are already enough workers waiting.
119 wanted_waiting_workers
= self
.max_waiting
- len(self
.waiting_workers
)
120 if wanted_waiting_workers
<= 0:
123 # Spawn a new worker.
124 for i
in range(wanted_waiting_workers
):
125 self
.spawn_worker(*args
, **kwargs
)
127 def spawn_worker(self
, *args
, **kwargs
):
129 Spawns a new worker process.
131 worker
= PakfireWorker(config
=self
.config
, *args
, **kwargs
)
134 log
.debug("Spawned new worker process: %s" % worker
)
135 self
.__workers
.append(worker
)
137 def terminate_worker(self
, worker
):
139 Terminates the given worker.
141 log
.warning(_("Terminating worker process: %s") % worker
)
145 def terminate_all_workers(self
):
147 Terminates all workers.
149 for worker
in self
.workers
:
150 self
.terminate_worker(worker
)
152 # Wait until the worker has finished.
155 def remove_worker(self
, worker
):
157 Removes a worker from the internal list of worker processes.
159 assert not worker
.is_alive(), "Remove alive worker?"
161 log
.debug("Removing worker: %s" % worker
)
163 self
.__workers
.remove(worker
)
167 def cleanup_workers(self
):
169 Remove workers that are not alive any more.
171 for worker
in self
.workers
:
172 if worker
.is_alive():
175 self
.remove_worker(worker
)
179 return self
.__workers
[:]
182 def running_workers(self
):
185 for worker
in self
.workers
:
186 if worker
.waiting
.is_set():
189 workers
.append(worker
)
194 def waiting_workers(self
):
197 for worker
in self
.workers
:
198 if worker
.waiting
.is_set():
199 workers
.append(worker
)
205 def register_signal_handlers(self
):
206 signal
.signal(signal
.SIGCHLD
, self
.handle_SIGCHLD
)
207 signal
.signal(signal
.SIGINT
, self
.handle_SIGTERM
)
208 signal
.signal(signal
.SIGTERM
, self
.handle_SIGTERM
)
210 def handle_SIGCHLD(self
, signum
, frame
):
212 Handle signal SIGCHLD.
214 # Spawn new workers if necessary.
215 self
.spawn_workers_if_needed()
217 def handle_SIGTERM(self
, signum
, frame
):
219 Handle signal SIGTERM.
224 # Talking to the hub.
226 def send_builder_info(self
):
227 log
.info(_("Sending builder information to hub..."))
231 "cpu_model" : system
.cpu_model
,
232 "cpu_count" : system
.cpu_count
,
233 "cpu_arch" : system
.native_arch
,
234 "cpu_bogomips" : system
.cpu_bogomips
,
237 "mem_total" : system
.memory
,
238 "swap_total" : system
.swap_total
,
241 "pakfire_version" : PAKFIRE_VERSION
,
242 "host_key" : self
.config
.get("signatures", "host_key", None),
243 "os_name" : system
.distro
.pretty_name
,
246 "supported_arches" : ",".join(system
.supported_arches
),
248 self
.transport
.post("/builders/info", data
=data
)
250 def send_keepalive(self
):
251 log
.debug("Sending keepalive message to hub...")
255 "loadavg1" : system
.loadavg1
,
256 "loadavg5" : system
.loadavg5
,
257 "loadavg15" : system
.loadavg15
,
260 "mem_total" : system
.memory_total
,
261 "mem_free" : system
.memory_free
,
264 "swap_total" : system
.swap_total
,
265 "swap_free" : system
.swap_free
,
268 "space_free" : self
.free_space
,
270 self
.transport
.post("/builders/keepalive", data
=data
)
273 def free_space(self
):
274 mp
= system
.get_mountpoint(BUILD_ROOT
)
279 class PakfireWorker(multiprocessing
.Process
):
280 def __init__(self
, config
, waiting
=None):
281 multiprocessing
.Process
.__init
__(self
)
286 # Waiting event. Clear if this worker is running a build.
287 self
.waiting
= multiprocessing
.Event()
290 # Indicates if this worker is running.
291 self
.__running
= True
294 # Register signal handlers.
295 self
.register_signal_handlers()
297 # Create connection to the hub.
298 self
.transport
= transport
.PakfireHubTransport(self
.config
)
299 self
.transport
.fork()
301 while self
.__running
:
302 # Try to get a new build job.
303 job
= self
.get_new_build_job()
307 # If we got a job, we are not waiting anymore.
310 # Run the job and return.
311 return self
.execute_job(job
)
314 self
.__running
= False
316 # When we are just waiting, we can edit right away.
317 if self
.waiting
.is_set():
318 log
.debug("Exiting immediately")
321 # XXX figure out what to do, when a build is running.
325 def register_signal_handlers(self
):
326 signal
.signal(signal
.SIGCHLD
, self
.handle_SIGCHLD
)
327 signal
.signal(signal
.SIGINT
, self
.handle_SIGTERM
)
328 signal
.signal(signal
.SIGTERM
, self
.handle_SIGTERM
)
330 def handle_SIGCHLD(self
, signum
, frame
):
332 Handle signal SIGCHLD.
334 # Must be here so that SIGCHLD won't be propagated to
338 def handle_SIGTERM(self
, signum
, frame
):
340 Handle signal SIGTERM.
344 def get_new_build_job(self
, timeout
=600):
345 log
.debug("Requesting new job...")
348 job
= self
.transport
.get_json("/builders/jobs/queue",
349 data
={ "timeout" : timeout
, }, timeout
=timeout
)
354 # As this is a long poll request, it is okay to time out.
355 except TransportMaxTriesExceededError
:
358 def execute_job(self
, job
):
359 log
.debug("Executing job: %s" % job
)
361 # Call the function that processes the build and try to catch general
362 # exceptions and report them to the server.
363 # If everything goes okay, we tell this the server, too.
365 # Create a temporary file and a directory for the resulting files.
366 tmpdir
= tempfile
.mkdtemp()
367 tmpfile
= os
.path
.join(tmpdir
, os
.path
.basename(job
.source_url
))
368 logfile
= os
.path
.join(tmpdir
, "build.log")
370 # Create pakfire configuration instance.
371 config
= pakfire
.config
.ConfigDaemon()
372 config
.parse(job
.config
)
374 # Create pakfire instance.
377 p
= pakfire
.base
.PakfireBuilder(config
=config
, arch
=job
.arch
)
379 # Download the source package.
380 grabber
= pakfire
.downloader
.PackageDownloader(p
)
381 grabber
.urlgrab(job
.source_url
, filename
=tmpfile
)
383 # Check if the download checksum matches (if provided).
384 if job
.source_hash_sha512
:
385 h
= hashlib
.new("sha512")
386 f
= open(tmpfile
, "rb")
388 buf
= f
.read(BUFFER_SIZE
)
395 if not job
.source_hash_sha512
== h
.hexdigest():
396 raise DownloadError
, "Hash check did not succeed."
398 # Create a new instance of a build environment.
399 build
= pakfire
.builder
.BuildEnviron(p
, tmpfile
,
400 release_build
=True, build_id
=job
.id, logfile
=logfile
)
403 # Create the build environment.
406 # Update the build status on the server.
407 self
.upload_buildroot(job
, build
.installed_packages
)
408 self
.update_state(job
, "running")
410 # Run the build (without install test).
411 build
.build(install_test
=False)
413 # Copy the created packages to the tempdir.
414 build
.copy_result(tmpdir
)
417 # Cleanup the build environment.
420 # Jippie, build is finished, we are going to upload the files.
421 self
.update_state(job
, "uploading")
423 # Walk through the result directory and upload all (binary) files.
424 # Skip that for test builds.
425 if not job
.type == "test":
426 for dir, subdirs
, files
in os
.walk(tmpdir
):
428 file = os
.path
.join(dir, file)
429 if file in (logfile
, tmpfile
,):
432 self
.upload_file(job
, file, "package")
434 except DependencyError
, e
:
435 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
436 self
.update_state(job
, "dependency_error", message
)
439 except DownloadError
, e
:
440 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
441 self
.update_state(job
, "download_error", message
)
448 # Upload the logfile in any case and if it exists.
449 if os
.path
.exists(logfile
):
450 self
.upload_file(job
, logfile
, "log")
452 # Cleanup the files we created.
453 pakfire
.util
.rm(tmpdir
)
455 except DependencyError
:
456 # This has already been reported.
459 except (DownloadError
,):
460 # Do not take any further action for these exceptions.
463 except (KeyboardInterrupt, SystemExit):
464 self
.update_state(job
, "aborted")
467 # Format the exception and send it to the server.
468 message
= "%s: %s" % (e
.__class
__.__name
__, e
)
470 self
.update_state(job
, "failed", message
)
474 self
.update_state(job
, "finished")
476 def update_state(self
, job
, state
, message
=None):
478 Update state of the build job on the hub.
481 "message" : message
or "",
484 self
.transport
.post("/builders/jobs/%s/state/%s" % (job
.id, state
),
487 def upload_file(self
, job
, filename
, type):
488 assert os
.path
.exists(filename
)
489 assert type in ("package", "log")
491 # First upload the file data and save the upload_id.
492 upload_id
= self
.transport
.upload_file(filename
)
498 # Add the file to the build.
499 self
.transport
.post("/builders/jobs/%s/addfile/%s" % (job
.id, upload_id
),
502 def upload_buildroot(self
, job
, installed_packages
):
504 for pkg
in installed_packages
:
505 pkgs
.append((pkg
.friendly_name
, pkg
.uuid
))
507 data
= { "buildroot" : json
.dumps(pkgs
) }
509 self
.transport
.post("/builders/jobs/%s/buildroot" % job
.id, data
=data
)