# This is the pid for which we should generate the event. This is set when
# the runqueue forks off.
worker_pid = 0
+worker_pipe = None
class Event:
"""Base class for events"""
def fire(event, d):
"""Fire off an Event"""
+ if worker_pid != 0:
+ worker_fire(event, d)
+ return
+
for handler in _handlers:
h = _handlers[handler]
event.data = d
for h in errors:
del _ui_handlers[h]
+def worker_fire(event, d):
+ data = "<event>" + pickle.dumps(event) + "</event>"
+ if os.write(worker_pipe, data) != len (data):
+ print "Error sending event to server (short write)"
+
+def fire_from_worker(event, d):
+ if not event.startswith("<event>") or not event.endswith("</event>"):
+ print "Error, not an event"
+ return
+ event = pickle.loads(event[7:-8])
+ bb.event.fire(event, d)
+
def register(name, handler):
"""Register an Event handler"""
self.runq_running = []
self.runq_complete = []
self.build_pids = {}
+ self.build_pipes = {}
self.failed_fnids = []
# Mark initial buildable tasks
sys.stdout.flush()
sys.stderr.flush()
- try:
+ try:
+ pipein, pipeout = os.pipe()
pid = os.fork()
except OSError, e:
bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
if pid == 0:
+ os.close(pipein)
# Save out the PID so that the event can include it the
# events
bb.event.worker_pid = os.getpid()
+ bb.event.worker_pipe = pipeout
+
+ self.state = runQueueChildProcess
+ # Make the child the process group leader
+ os.setpgid(0, 0)
+ # No stdin
+ newsi = os.open('/dev/null', os.O_RDWR)
+ os.dup2(newsi, sys.stdin.fileno())
bb.event.fire(runQueueTaskStarted(task, self.stats, self), self.cfgData)
bb.msg.note(1, bb.msg.domain.RunQueue,
self.stats.total,
task,
self.get_user_idstring(task)))
- self.state = runQueueChildProcess
- # Make the child the process group leader
- os.setpgid(0, 0)
- newsi = os.open('/dev/null', os.O_RDWR)
- os.dup2(newsi, sys.stdin.fileno())
+
bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
try:
self.cooker.tryBuild(fn, taskname[3:])
except bb.build.EventException:
bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
- sys.exit(1)
+ os._exit(1)
except:
bb.msg.error(bb.msg.domain.Build, "Build of " + fn + " " + taskname + " failed")
- raise
- sys.exit(0)
+ os._exit(1)
+ os._exit(0)
+
self.build_pids[pid] = task
+ self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
self.runq_running[task] = 1
self.stats.taskActive()
if self.stats.active < self.number_tasks:
continue
+
+ for pipe in self.build_pipes:
+ self.build_pipes[pipe].read()
+
if self.stats.active > 0:
result = os.waitpid(-1, os.WNOHANG)
if result[0] is 0 and result[1] is 0:
return
task = self.build_pids[result[0]]
del self.build_pids[result[0]]
+ self.build_pipes[result[0]].close()
+ del self.build_pipes[result[0]]
if result[1] != 0:
self.task_fail(task, result[1])
return
os.kill(-k, signal.SIGINT)
except:
pass
+ for pipe in self.build_pipes:
+ self.build_pipes[pipe].read()
def finish_runqueue(self, now = False):
self.state = runQueueCleanUp
return
task = self.build_pids[result[0]]
del self.build_pids[result[0]]
+ self.build_pipes[result[0]].close()
+ del self.build_pipes[result[0]]
if result[1] != 0:
self.task_fail(task, result[1])
else:
if taskid is not None:
return rq.check_stamp_task(taskid)
return None
+
+class runQueuePipe():
+ """
+ Abstraction for a pipe between a worker thread and the server
+ """
+ def __init__(self, pipein, pipeout, d):
+ self.fd = pipein
+ os.close(pipeout)
+ self.queue = ""
+ self.d = d
+
+ def read(self):
+ start = len(self.queue)
+ self.queue = self.queue + os.read(self.fd, 1024)
+ end = len(self.queue)
+ index = self.queue.find("</event>")
+ while index != -1:
+ bb.event.fire_from_worker(self.queue[:index+8], self.d)
+ self.queue = self.queue[index+8:]
+ index = self.queue.find("</event>")
+ return (end > start)
+
+ def close(self):
+ while self.read():
+ continue
+ if len(self.queue) > 0:
+ print "Warning, worker left partial message"
+ os.close(self.fd)
+