]> git.ipfire.org Git - thirdparty/openembedded/openembedded-core-contrib.git/commitdiff
runqueue.py: Improve IPC between worker threads and the server allowing proper event...
authorRichard Purdie <rpurdie@linux.intel.com>
Tue, 19 Jan 2010 13:22:00 +0000 (13:22 +0000)
committerRichard Purdie <rpurdie@linux.intel.com>
Tue, 19 Jan 2010 13:22:00 +0000 (13:22 +0000)
Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
lib/bb/event.py
lib/bb/runqueue.py

index 8b4222bfc6934224cafaa46dc910d423a08787ae..3062dc51be7dd9fe8b51d83455bdfb3ca7819d48 100644 (file)
@@ -29,6 +29,7 @@ import pickle
 # This is the pid for which we should generate the event. This is set when
 # the runqueue forks off.
 worker_pid = 0
+worker_pipe = None
 
 class Event:
     """Base class for events"""
@@ -50,6 +51,10 @@ _ui_handler_seq = 0
 def fire(event, d):
     """Fire off an Event"""
 
+    if worker_pid != 0:
+        worker_fire(event, d)
+        return
+
     for handler in _handlers:
         h = _handlers[handler]
         event.data = d
@@ -73,6 +78,18 @@ def fire(event, d):
     for h in errors:
         del _ui_handlers[h]
 
+def worker_fire(event, d):
+    data = "<event>" + pickle.dumps(event) + "</event>"
+    if os.write(worker_pipe, data) != len (data):
+        print "Error sending event to server (short write)"
+
+def fire_from_worker(event, d):
+    if not event.startswith("<event>") or not event.endswith("</event>"):
+        print "Error, not an event"
+        return
+    event = pickle.loads(event[7:-8])
+    bb.event.fire(event, d)
+
 def register(name, handler):
     """Register an Event handler"""
 
index 35732c2db8608f445442b9b13b7b492c43db4ca0..c3ad442e478478b3ba7699a5685a5e488dbd7213 100644 (file)
@@ -857,6 +857,7 @@ class RunQueue:
         self.runq_running = []
         self.runq_complete = []
         self.build_pids = {}
+        self.build_pipes = {}
         self.failed_fnids = []
 
         # Mark initial buildable tasks
@@ -935,14 +936,24 @@ class RunQueue:
 
                 sys.stdout.flush()
                 sys.stderr.flush()
-                try: 
+                try:
+                    pipein, pipeout = os.pipe()
                     pid = os.fork() 
                 except OSError, e: 
                     bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
                 if pid == 0:
+                    os.close(pipein)
                     # Save out the PID so that the event can include it the
                     # events
                     bb.event.worker_pid = os.getpid()
+                    bb.event.worker_pipe = pipeout
+
+                    self.state = runQueueChildProcess
+                    # Make the child the process group leader
+                    os.setpgid(0, 0)
+                    # No stdin
+                    newsi = os.open('/dev/null', os.O_RDWR)
+                    os.dup2(newsi, sys.stdin.fileno())
 
                     bb.event.fire(runQueueTaskStarted(task, self.stats, self), self.cfgData)
                     bb.msg.note(1, bb.msg.domain.RunQueue,
@@ -950,32 +961,36 @@ class RunQueue:
                                                                         self.stats.total,
                                                                         task,
                                                                         self.get_user_idstring(task)))
-                    self.state = runQueueChildProcess
-                    # Make the child the process group leader
-                    os.setpgid(0, 0)
-                    newsi = os.open('/dev/null', os.O_RDWR)
-                    os.dup2(newsi, sys.stdin.fileno())
+
                     bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
                     try:
                         self.cooker.tryBuild(fn, taskname[3:])
                     except bb.build.EventException:
                         bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
-                        sys.exit(1)
+                        os._exit(1)
                     except:
                         bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
-                        raise
-                    sys.exit(0)
+                        os._exit(1)
+                    os._exit(0)
+
                 self.build_pids[pid] = task
+                self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
                 self.runq_running[task] = 1
                 self.stats.taskActive()
                 if self.stats.active < self.number_tasks:
                     continue
+
+            for pipe in self.build_pipes:
+                self.build_pipes[pipe].read()
+
             if self.stats.active > 0:
                 result = os.waitpid(-1, os.WNOHANG)
                 if result[0] is 0 and result[1] is 0:
                     return
                 task = self.build_pids[result[0]]
                 del self.build_pids[result[0]]
+                self.build_pipes[result[0]].close()
+                del self.build_pipes[result[0]]
                 if result[1] != 0:
                     self.task_fail(task, result[1])
                     return
@@ -1006,6 +1021,8 @@ class RunQueue:
                  os.kill(-k, signal.SIGINT)
              except:
                  pass
+        for pipe in self.build_pipes:
+            self.build_pipes[pipe].read()
 
     def finish_runqueue(self, now = False):
         self.state = runQueueCleanUp
@@ -1024,6 +1041,8 @@ class RunQueue:
                     return
                 task = self.build_pids[result[0]]
                 del self.build_pids[result[0]]
+                self.build_pipes[result[0]].close()
+                del self.build_pipes[result[0]]
                 if result[1] != 0:
                     self.task_fail(task, result[1])
                 else:
@@ -1124,3 +1143,32 @@ def check_stamp_fn(fn, taskname, d):
     if taskid is not None:
         return rq.check_stamp_task(taskid)
     return None
+
+class runQueuePipe():
+    """
+    Abstraction for a pipe between a worker thread and the server
+    """
+    def __init__(self, pipein, pipeout, d):
+        self.fd = pipein
+        os.close(pipeout)
+        self.queue = ""
+        self.d = d
+
+    def read(self):
+        start = len(self.queue)
+        self.queue = self.queue + os.read(self.fd, 1024)
+        end = len(self.queue)
+        index = self.queue.find("</event>")
+        while index != -1:
+            bb.event.fire_from_worker(self.queue[:index+8], self.d)
+            self.queue = self.queue[index+8:]
+            index = self.queue.find("</event>")
+        return (end > start)
+
+    def close(self):
+        while self.read():
+            continue
+        if len(self.queue) > 0:
+            print "Warning, worker left partial message"
+        os.close(self.fd)
+