From 36953fd09e134331a9e1aa7dd5619c628362ff4d Mon Sep 17 00:00:00 2001 From: Richard Purdie Date: Tue, 19 Jan 2010 13:22:00 +0000 Subject: [PATCH] runqueue.py: Improve IPC between worker threads and the server allowing proper event handling Signed-off-by: Richard Purdie --- lib/bb/event.py | 17 ++++++++++++ lib/bb/runqueue.py | 66 +++++++++++++++++++++++++++++++++++++++------- 2 files changed, 74 insertions(+), 9 deletions(-) diff --git a/lib/bb/event.py b/lib/bb/event.py index 8b4222bfc69..3062dc51be7 100644 --- a/lib/bb/event.py +++ b/lib/bb/event.py @@ -29,6 +29,7 @@ import pickle # This is the pid for which we should generate the event. This is set when # the runqueue forks off. worker_pid = 0 +worker_pipe = None class Event: """Base class for events""" @@ -50,6 +51,10 @@ _ui_handler_seq = 0 def fire(event, d): """Fire off an Event""" + if worker_pid != 0: + worker_fire(event, d) + return + for handler in _handlers: h = _handlers[handler] event.data = d @@ -73,6 +78,18 @@ def fire(event, d): for h in errors: del _ui_handlers[h] +def worker_fire(event, d): + data = "" + pickle.dumps(event) + "" + if os.write(worker_pipe, data) != len (data): + print "Error sending event to server (short write)" + +def fire_from_worker(event, d): + if not event.startswith("") or not event.endswith(""): + print "Error, not an event" + return + event = pickle.loads(event[7:-8]) + bb.event.fire(event, d) + def register(name, handler): """Register an Event handler""" diff --git a/lib/bb/runqueue.py b/lib/bb/runqueue.py index 35732c2db86..c3ad442e478 100644 --- a/lib/bb/runqueue.py +++ b/lib/bb/runqueue.py @@ -857,6 +857,7 @@ class RunQueue: self.runq_running = [] self.runq_complete = [] self.build_pids = {} + self.build_pipes = {} self.failed_fnids = [] # Mark initial buildable tasks @@ -935,14 +936,24 @@ class RunQueue: sys.stdout.flush() sys.stderr.flush() - try: + try: + pipein, pipeout = os.pipe() pid = os.fork() except OSError, e: bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror)) if pid == 0: + os.close(pipein) # Save out the PID so that the event can include it the # events bb.event.worker_pid = os.getpid() + bb.event.worker_pipe = pipeout + + self.state = runQueueChildProcess + # Make the child the process group leader + os.setpgid(0, 0) + # No stdin + newsi = os.open('/dev/null', os.O_RDWR) + os.dup2(newsi, sys.stdin.fileno()) bb.event.fire(runQueueTaskStarted(task, self.stats, self), self.cfgData) bb.msg.note(1, bb.msg.domain.RunQueue, @@ -950,32 +961,36 @@ class RunQueue: self.stats.total, task, self.get_user_idstring(task))) - self.state = runQueueChildProcess - # Make the child the process group leader - os.setpgid(0, 0) - newsi = os.open('/dev/null', os.O_RDWR) - os.dup2(newsi, sys.stdin.fileno()) + bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data) try: self.cooker.tryBuild(fn, taskname[3:]) except bb.build.EventException: bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") - sys.exit(1) + os._exit(1) except: bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed") - raise - sys.exit(0) + os._exit(1) + os._exit(0) + self.build_pids[pid] = task + self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData) self.runq_running[task] = 1 self.stats.taskActive() if self.stats.active < self.number_tasks: continue + + for pipe in self.build_pipes: + self.build_pipes[pipe].read() + if self.stats.active > 0: result = os.waitpid(-1, os.WNOHANG) if result[0] is 0 and result[1] is 0: return task = self.build_pids[result[0]] del self.build_pids[result[0]] + self.build_pipes[result[0]].close() + del self.build_pipes[result[0]] if result[1] != 0: self.task_fail(task, result[1]) return @@ -1006,6 +1021,8 @@ class RunQueue: os.kill(-k, signal.SIGINT) except: pass + for pipe in self.build_pipes: + self.build_pipes[pipe].read() def finish_runqueue(self, now = False): self.state = runQueueCleanUp @@ -1024,6 +1041,8 @@ class RunQueue: return task = self.build_pids[result[0]] del self.build_pids[result[0]] + self.build_pipes[result[0]].close() + del self.build_pipes[result[0]] if result[1] != 0: self.task_fail(task, result[1]) else: @@ -1124,3 +1143,32 @@ def check_stamp_fn(fn, taskname, d): if taskid is not None: return rq.check_stamp_task(taskid) return None + +class runQueuePipe(): + """ + Abstraction for a pipe between a worker thread and the server + """ + def __init__(self, pipein, pipeout, d): + self.fd = pipein + os.close(pipeout) + self.queue = "" + self.d = d + + def read(self): + start = len(self.queue) + self.queue = self.queue + os.read(self.fd, 1024) + end = len(self.queue) + index = self.queue.find("") + while index != -1: + bb.event.fire_from_worker(self.queue[:index+8], self.d) + self.queue = self.queue[index+8:] + index = self.queue.find("") + return (end > start) + + def close(self): + while self.read(): + continue + if len(self.queue) > 0: + print "Warning, worker left partial message" + os.close(self.fd) + -- 2.47.3