]> git.ipfire.org Git - thirdparty/openembedded/openembedded-core-contrib.git/commitdiff
bitbake/runqueue.py: Create RunQueueExecute and RunQueueExecuteTasks classes, further...
authorRichard Purdie <rpurdie@linux.intel.com>
Wed, 18 Aug 2010 16:13:06 +0000 (17:13 +0100)
committerChris Larson <chris_larson@mentor.com>
Mon, 13 Dec 2010 19:34:36 +0000 (12:34 -0700)
Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
lib/bb/runqueue.py

index 7efb77736a87703d55cc180486b76eae3eeb49ad..00a63cdd225bca375353f77e00b74bcd172f843e 100644 (file)
@@ -92,7 +92,7 @@ class RunQueueScheduler(object):
         """
         Return the id of the first task we find that is buildable
         """
-        for tasknum in xrange(len(self.rq.runq_fnid)):
+        for tasknum in xrange(len(self.rqdata.runq_fnid)):
             taskid = self.prio_map[tasknum]
             if self.rq.runq_running[taskid] == 1:
                 continue
@@ -709,7 +709,6 @@ class RunQueueData:
                            self.rqdata.runq_depends[task],
                            self.rqdata.runq_revdeps[task])
 
-
 class RunQueue:
     def __init__(self, cooker, cfgData, dataCache, taskData, targets):
 
@@ -717,34 +716,10 @@ class RunQueue:
         self.cfgData = cfgData
         self.rqdata = RunQueueData(self, cooker, cfgData, dataCache, taskData, targets)
 
-        self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", cfgData, 1) or 1)
-        self.scheduler = bb.data.getVar("BB_SCHEDULER", cfgData, 1) or "speed"
         self.stamppolicy = bb.data.getVar("BB_STAMP_POLICY", cfgData, 1) or "perfile"
 
         self.state = runQueuePrepare
 
-    def get_schedulers(self):
-        schedulers = set(obj for obj in globals().values()
-                             if type(obj) is type and
-                                issubclass(obj, RunQueueScheduler))
-
-        user_schedulers = bb.data.getVar("BB_SCHEDULERS", self.cfgData, True)
-        if user_schedulers:
-            for sched in user_schedulers.split():
-                if not "." in sched:
-                    bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
-                    continue
-
-                modname, name = sched.rsplit(".", 1)
-                try:
-                    module = __import__(modname, fromlist=(name,))
-                except ImportError, exc:
-                    logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
-                    raise SystemExit(1)
-                else:
-                    schedulers.add(getattr(module, name))
-        return schedulers
-
     def check_stamps(self):
         unchecked = {}
         current = []
@@ -897,24 +872,25 @@ class RunQueue:
 
         if self.state is runQueueRunInit:
             logger.info("Executing runqueue")
-            self.execute_runqueue_initVars()
+            self.rqexe = RunQueueExecuteTasks(self)
+            self.state = runQueueRunning
 
         if self.state is runQueueRunning:
-            self.execute_runqueue_internal()
+            self.rqexe.execute()
 
         if self.state is runQueueCleanUp:
-            self.finish_runqueue()
+           self.rqexe.finish()
 
         if self.state is runQueueFailed:
             if not self.rqdata.taskData.tryaltconfigs:
-                raise bb.runqueue.TaskFailure(self.failed_fnids)
-            for fnid in self.failed_fnids:
+                raise bb.runqueue.TaskFailure(self.rqexe.failed_fnids)
+            for fnid in self.rqexe.failed_fnids:
                 self.rqdata.taskData.fail_fnid(fnid)
             self.rqdata.reset()
 
         if self.state is runQueueComplete:
             # All done
-            logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.stats.completed, self.stats.skipped, self.stats.failed)
+            logger.info("Tasks Summary: Attempted %d tasks of which %d didn't need to be rerun and %d failed.", self.rqexe.stats.completed, self.rqexe.stats.skipped, self.rqexe.stats.failed)
             return False
 
         if self.state is runQueueChildProcess:
@@ -924,9 +900,23 @@ class RunQueue:
         # Loop
         return retval
 
-    def execute_runqueue_initVars(self):
+    def finish_runqueue(self, now = False):
+        if now:
+            self.rqexe.finish_now()
+        else:
+            self.rqexe.finish()
 
-        self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
+
+class RunQueueExecute:
+
+    def __init__(self, rq):
+        self.rq = rq
+        self.cooker = rq.cooker
+        self.cfgData = rq.cfgData
+        self.rqdata = rq.rqdata
+
+        self.number_tasks = int(bb.data.getVar("BB_NUMBER_THREADS", self.cfgData, 1) or 1)
+        self.scheduler = bb.data.getVar("BB_SCHEDULER", self.cfgData, 1) or "speed"
 
         self.runq_buildable = []
         self.runq_running = []
@@ -935,6 +925,115 @@ class RunQueue:
         self.build_pipes = {}
         self.failed_fnids = []
 
+    def runqueue_process_waitpid(self):
+        """
+        Return none is there are no processes awaiting result collection, otherwise
+        collect the process exit codes and close the information pipe.
+        """
+        result = os.waitpid(-1, os.WNOHANG)
+        if result[0] is 0 and result[1] is 0:
+            return None
+        task = self.build_pids[result[0]]
+        del self.build_pids[result[0]]
+        self.build_pipes[result[0]].close()
+        del self.build_pipes[result[0]]
+        if result[1] != 0:
+            self.task_fail(task, result[1])
+        else:
+            self.task_complete(task)
+            self.stats.taskCompleted()
+            bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
+
+    def finish_now(self):
+        if self.stats.active:
+            logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active)
+            for k, v in self.build_pids.iteritems():
+                try:
+                    os.kill(-k, signal.SIGTERM)
+                except:
+                    pass
+        for pipe in self.build_pipes:
+            self.build_pipes[pipe].read()
+
+    def finish(self):
+        self.rq.state = runQueueCleanUp
+
+        for pipe in self.build_pipes:
+            self.build_pipes[pipe].read()
+
+        try:
+            while self.stats.active > 0:
+                bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
+                if self.runqueue_process_waitpid() is None:
+                    return
+        except:
+            self.finish_now()
+            raise
+
+        if len(self.failed_fnids) != 0:
+            self.rq.state = runQueueFailed
+            return
+
+        self.rq.state = runQueueComplete
+        return
+
+
+    def notify_task_started(self, task):
+        bb.event.fire(runQueueTaskStarted(task, self.stats, self.rq), self.cfgData)
+        logger.info("Running task %d of %d (ID: %s, %s)", self.stats.completed + self.stats.active + self.stats.failed + 1,
+                                                          self.stats.total,
+                                                          task,
+                                                          self.rqdata.get_user_idstring(task))
+
+    def notify_task_completed(self, task):
+        bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
+
+    def fork_off_task(self, fn, task, taskname):
+        sys.stdout.flush()
+        sys.stderr.flush()
+        try:
+            pipein, pipeout = os.pipe()
+            pid = os.fork()
+        except OSError as e:
+            bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
+        if pid == 0:
+            os.close(pipein)
+            # Save out the PID so that the event can include it the
+            # events
+            bb.event.worker_pid = os.getpid()
+            bb.event.worker_pipe = pipeout
+
+            # Child processes should send their messages to the UI
+            # process via the server process, not print them
+            # themselves
+            bblogger.handlers = [bb.event.LogHandler()]
+
+            self.rq.state = runQueueChildProcess
+            # Make the child the process group leader
+            os.setpgid(0, 0)
+            # No stdin
+            newsi = os.open('/dev/null', os.O_RDWR)
+            os.dup2(newsi, sys.stdin.fileno())
+
+            self.notify_task_started(task)
+
+            bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
+            bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data)
+            try:
+                the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
+                bb.build.exec_task(fn, taskname, the_data)
+            except Exception as exc:
+                logger.critical(str(exc))
+                os._exit(1)
+            os._exit(0)
+        return pid, pipein, pipeout
+
+class RunQueueExecuteTasks(RunQueueExecute):
+    def __init__(self, rq):
+        RunQueueExecute.__init__(self, rq)
+
+        self.stats = RunQueueStats(len(self.rqdata.runq_fnid))
+
         # Mark initial buildable tasks
         for task in xrange(self.stats.total):
             self.runq_running.append(0)
@@ -944,19 +1043,39 @@ class RunQueue:
             else:
                 self.runq_buildable.append(0)
 
-        self.state = runQueueRunning
-
         event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
 
         for scheduler in self.get_schedulers():
             if self.scheduler == scheduler.name:
-                self.sched = scheduler(self)
+                self.sched = scheduler(self, self.rqdata)
                 logger.debug(1, "Using runqueue scheduler '%s'", scheduler.name)
                 break
         else:
             bb.fatal("Invalid scheduler '%s'.  Available schedulers: %s" %
                      (self.scheduler, ", ".join(obj.name for obj in self.schedulers)))
 
+    def get_schedulers(self):
+        schedulers = set(obj for obj in globals().values()
+                             if type(obj) is type and
+                                issubclass(obj, RunQueueScheduler))
+
+        user_schedulers = bb.data.getVar("BB_SCHEDULERS", self.cfgData, True)
+        if user_schedulers:
+            for sched in user_schedulers.split():
+                if not "." in sched:
+                    bb.note("Ignoring scheduler '%s' from BB_SCHEDULERS: not an import" % sched)
+                    continue
+
+                modname, name = sched.rsplit(".", 1)
+                try:
+                    module = __import__(modname, fromlist=(name,))
+                except ImportError, exc:
+                    logger.critical("Unable to import scheduler '%s' from '%s': %s" % (name, modname, exc))
+                    raise SystemExit(1)
+                else:
+                    schedulers.add(getattr(module, name))
+        return schedulers
+
     def task_complete(self, task):
         """
         Mark a task as completed
@@ -989,25 +1108,25 @@ class RunQueue:
         self.stats.taskFailed()
         fnid = self.rqdata.runq_fnid[task]
         self.failed_fnids.append(fnid)
-        bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData)
+        bb.event.fire(runQueueTaskFailed(task, self.stats, self.rq), self.cfgData)
         if self.rqdata.taskData.abort:
-            self.state = runQueueCleanUp
+            self.rq.state = runQueueCleanUp
 
-    def execute_runqueue_internal(self):
+    def execute(self):
         """
         Run the tasks in a queue prepared by rqdata.prepare()
         """
 
         if self.stats.total == 0:
             # nothing to do
-            self.state = runQueueCleanUp
+            self.rq.state = runQueueCleanUp
 
         while True:
             for task in iter(self.sched.next, None):
                 fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[task]]
 
                 taskname = self.rqdata.runq_task[task]
-                if self.check_stamp_task(task, taskname):
+                if self.rq.check_stamp_task(task, taskname):
                     logger.debug(2, "Stamp current task %s (%s)", task,
                                  self.rqdata.get_user_idstring(task))
                     self.runq_running[task] = 1
@@ -1037,12 +1156,12 @@ class RunQueue:
                 self.build_pipes[pipe].read()
 
             if self.stats.active > 0:
-                if self.runqueue_process_waitpid(self.task_complete, self.task_fail) is None:
+                if self.runqueue_process_waitpid() is None:
                     return
                 continue
 
             if len(self.failed_fnids) != 0:
-                self.state = runQueueFailed
+                self.rq.state = runQueueFailed
                 return
 
             # Sanity Checks
@@ -1053,113 +1172,9 @@ class RunQueue:
                     logger.error("Task %s never ran!", task)
                 if self.runq_complete[task] == 0:
                     logger.error("Task %s never completed!", task)
-            self.state = runQueueComplete
+            self.rq.state = runQueueComplete
             return
 
-    def runqueue_process_waitpid(self, success, failure):
-        """
-        Return none is there are no processes awaiting result collection, otherwise
-        collect the process exit codes and close the information pipe.
-        """
-        result = os.waitpid(-1, os.WNOHANG)
-        if result[0] is 0 and result[1] is 0:
-            return None
-        task = self.build_pids[result[0]]
-        del self.build_pids[result[0]]
-        self.build_pipes[result[0]].close()
-        del self.build_pipes[result[0]]
-        if result[1] != 0:
-            failure(task, result[1]>>8)
-        else:
-            success(task)
-            self.stats.taskCompleted()
-            self.notify_task_completed(task)
-
-    def finish_runqueue_now(self):
-        if self.stats.active:
-            logger.info("Sending SIGTERM to remaining %s tasks", self.stats.active)
-            for k, v in self.build_pids.iteritems():
-                try:
-                    os.kill(-k, signal.SIGTERM)
-                except:
-                    pass
-        for pipe in self.build_pipes:
-            self.build_pipes[pipe].read()
-
-    def finish_runqueue(self, now = False):
-        self.state = runQueueCleanUp
-
-        for pipe in self.build_pipes:
-            self.build_pipes[pipe].read()
-
-        if now:
-            self.finish_runqueue_now()
-        try:
-            while self.stats.active > 0:
-                bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
-                if self.runqueue_process_waitpid(self.task_complete, self.task_fail) is None:
-                    return
-        except:
-            self.finish_runqueue_now()
-            raise
-
-        if len(self.failed_fnids) != 0:
-            self.state = runQueueFailed
-            return
-
-        self.state = runQueueComplete
-        return
-
-    def notify_task_started(self, task):
-        bb.event.fire(runQueueTaskStarted(task, self.stats, self), self.cfgData)
-        logger.info("Running task %d of %d (ID: %s, %s)", self.stats.completed + self.stats.active + self.stats.failed + 1,
-                                                          self.stats.total,
-                                                          task,
-                                                          self.get_user_idstring(task))
-
-    def notify_task_completed(self, task):
-        bb.event.fire(runQueueTaskCompleted(task, self.stats, self), self.cfgData)
-
-    def fork_off_task(self, fn, task, taskname):
-        sys.stdout.flush()
-        sys.stderr.flush()
-        try:
-            pipein, pipeout = os.pipe()
-            pid = os.fork()
-        except OSError as e:
-            bb.msg.fatal(bb.msg.domain.RunQueue, "fork failed: %d (%s)" % (e.errno, e.strerror))
-        if pid == 0:
-            os.close(pipein)
-            # Save out the PID so that the event can include it the
-            # events
-            bb.event.worker_pid = os.getpid()
-            bb.event.worker_pipe = pipeout
-
-            # Child processes should send their messages to the UI
-            # process via the server process, not print them
-            # themselves
-            bblogger.handlers = [bb.event.LogHandler()]
-
-            self.state = runQueueChildProcess
-            # Make the child the process group leader
-            os.setpgid(0, 0)
-            # No stdin
-            newsi = os.open('/dev/null', os.O_RDWR)
-            os.dup2(newsi, sys.stdin.fileno())
-
-            self.notify_task_started(task)
-
-            bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
-            bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data)
-            try:
-                the_data = bb.cache.Cache.loadDataFull(fn, self.cooker.get_file_appends(fn), self.cooker.configuration.data)
-                bb.build.exec_task(fn, taskname, the_data)
-            except Exception as exc:
-                logger.critical(str(exc))
-                os._exit(1)
-            os._exit(0)
-        return pid, pipein, pipeout
-
 
 class TaskFailure(Exception):
     """