]> git.ipfire.org Git - thirdparty/openembedded/openembedded-core-contrib.git/commitdiff
runqueue: implement scenequeue
authorRichard Purdie <rpurdie@linux.intel.com>
Thu, 19 Aug 2010 10:36:29 +0000 (11:36 +0100)
committerChris Larson <chris_larson@mentor.com>
Thu, 30 Dec 2010 06:51:07 +0000 (23:51 -0700)
Signed-off-by: Richard Purdie <rpurdie@linux.intel.com>
lib/bb/runqueue.py

index c4f716170d438a7543721dbc0970bc596452d51e..087edace3fe94302e2832c82970dbe45f3237feb 100644 (file)
@@ -22,6 +22,7 @@ Handles preparation and execution of a queue of tasks
 # with this program; if not, write to the Free Software Foundation, Inc.,
 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 
+import copy
 import os
 import sys
 import signal
@@ -63,12 +64,14 @@ class RunQueueStats:
 # These values indicate the next step due to be run in the
 # runQueue state machine
 runQueuePrepare = 2
-runQueueRunInit = 3
-runQueueRunning = 4
-runQueueFailed = 6
-runQueueCleanUp = 7
-runQueueComplete = 8
-runQueueChildProcess = 9
+runQueueSceneInit = 3
+runQueueSceneRun = 4
+runQueueRunInit = 5
+runQueueRunning = 6
+runQueueFailed = 7
+runQueueCleanUp = 8
+runQueueComplete = 9
+runQueueChildProcess = 10
 
 class RunQueueScheduler(object):
     """
@@ -117,13 +120,12 @@ class RunQueueSchedulerSpeed(RunQueueScheduler):
         """
         The priority map is sorted by task weight.
         """
-        from copy import deepcopy
 
         self.rq = runqueue
         self.rqdata = rqdata
 
-        sortweight = sorted(deepcopy(self.rqdata.runq_weight))
-        copyweight = deepcopy(self.rqdata.runq_weight)
+        sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
+        copyweight = copy.deepcopy(self.rqdata.runq_weight)
         self.prio_map = []
 
         for weight in sortweight:
@@ -145,12 +147,11 @@ class RunQueueSchedulerCompletion(RunQueueSchedulerSpeed):
 
     def __init__(self, runqueue, rqdata):
         RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
-        from copy import deepcopy
 
         #FIXME - whilst this groups all fnids together it does not reorder the
         #fnid groups optimally.
 
-        basemap = deepcopy(self.prio_map)
+        basemap = copy.deepcopy(self.prio_map)
         self.prio_map = []
         while (len(basemap) > 0):
             entry = basemap.pop(0)
@@ -283,7 +284,7 @@ class RunQueueData:
                         if dep in explored_deps[revdep]:
                             scan = True
                 if scan:
-                    find_chains(revdep, deepcopy(prev_chain))
+                    find_chains(revdep, copy.deepcopy(prev_chain))
                 for dep in explored_deps[revdep]:
                     if dep not in total_deps:
                         total_deps.append(dep)
@@ -683,6 +684,14 @@ class RunQueueData:
             stampfnwhitelist.append(fn)
         self.stampfnwhitelist = stampfnwhitelist
 
+        # Interate over the task list looking for tasks with a 'setscene' function
+        self.runq_setscene = []
+        for task in range(len(self.runq_fnid)):
+            setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
+            if not setscene:
+                continue
+            self.runq_setscene.append(task)
+
         return len(self.runq_fnid)
 
     def dump_data(self, taskQueue):
@@ -815,6 +824,13 @@ class RunQueue:
         return current
 
     def check_stamp_task(self, task, taskname = None):
+        def get_timestamp(f):
+            try:
+                if not os.access(f, os.F_OK):
+                    return None
+                return os.stat(f)[stat.ST_MTIME]
+            except:
+                return None
 
         if self.stamppolicy == "perfile":
             fulldeptree = False
@@ -838,23 +854,25 @@ class RunQueue:
             logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
             return False
 
+        if taskname.endswith("_setscene"):
+            return True
+
         iscurrent = True
-        t1 = os.stat(stampfile)[stat.ST_MTIME]
+        t1 = get_timestamp(stampfile)
         for dep in self.rqdata.runq_depends[task]:
             if iscurrent:
                 fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
                 taskname2 = self.rqdata.runq_task[dep]
                 stampfile2 = "%s.%s" % (self.rqdata.dataCache.stamp[fn2], taskname2)
+                t2 = get_timestamp(stampfile2)
+                t3 = get_timestamp(stampfile2 + "_setscene")
+                if t3 and t3 > t2:
+                   continue
                 if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
-                    try:
-                        t2 = os.stat(stampfile2)[stat.ST_MTIME]
-                        if t1 < t2:
-                            logger.debug(2, "Stampfile %s < %s", stampfile, stampfile2)
-                            iscurrent = False
-                    except:
-                        logger.debug(2, "Exception reading %s for %s", stampfile2, stampfile)
+                    if not t2 or t1 < t2:
+                        logger.debug(2, "Stampfile %s < %s (or does not exist)",
+                                     stampfile, stampfile2)
                         iscurrent = False
-
         return iscurrent
 
     def execute_runqueue(self):
@@ -871,7 +889,13 @@ class RunQueue:
             if self.rqdata.prepare() is 0:
                 self.state = runQueueComplete
             else:
-                self.state = runQueueRunInit
+                self.state = runQueueSceneInit
+
+        if self.state is runQueueSceneInit:
+            self.rqexe = RunQueueExecuteScenequeue(self)
+
+        if self.state is runQueueSceneRun:
+            self.rqexe.execute()
 
         if self.state is runQueueRunInit:
             logger.info("Executing runqueue")
@@ -944,8 +968,6 @@ class RunQueueExecute:
             self.task_fail(task, result[1]>>8)
         else:
             self.task_complete(task)
-            self.stats.taskCompleted()
-            bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
 
     def finish_now(self):
         if self.stats.active:
@@ -964,14 +986,10 @@ class RunQueueExecute:
         for pipe in self.build_pipes:
             self.build_pipes[pipe].read()
 
-        try:
-            while self.stats.active > 0:
-                bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
-                if self.runqueue_process_waitpid() is None:
-                    return
-        except:
-            self.finish_now()
-            raise
+        if self.stats.active > 0:
+            bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
+            self.runqueue_process_waitpid()
+            return
 
         if len(self.failed_fnids) != 0:
             self.rq.state = runQueueFailed
@@ -980,12 +998,6 @@ class RunQueueExecute:
         self.rq.state = runQueueComplete
         return
 
-    def notify_task_started(self, task):
-        bb.event.fire(runQueueTaskStarted(task, self.stats, self.rq), self.cfgData)
-
-    def notify_task_completed(self, task):
-        bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
-
     def fork_off_task(self, fn, task, taskname):
         sys.stdout.flush()
         sys.stderr.flush()
@@ -1008,8 +1020,6 @@ class RunQueueExecute:
             newsi = os.open(os.devnull, os.O_RDWR)
             os.dup2(newsi, sys.stdin.fileno())
 
-            self.notify_task_started(task)
-
             bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
             bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data)
             try:
@@ -1044,6 +1054,23 @@ class RunQueueExecuteTasks(RunQueueExecute):
                 self.runq_buildable.append(1)
             else:
                 self.runq_buildable.append(0)
+            if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
+                self.rq.scenequeue_covered.add(task)
+
+        found = True
+        while found:
+            found = False
+            for task in range(self.stats.total):
+                if task in self.rq.scenequeue_covered:
+                    continue
+                if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
+                    self.rq.scenequeue_covered.add(task)
+                    found = True
+
+        logger.info('Full skip list %s', self.rq.scenequeue_covered)
+
+        for task in self.rq.scenequeue_covered:
+            self.task_skip(task)
 
         event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
 
@@ -1078,7 +1105,7 @@ class RunQueueExecuteTasks(RunQueueExecute):
                     schedulers.add(getattr(module, name))
         return schedulers
 
-    def task_complete(self, task):
+    def task_completeoutright(self, task):
         """
         Mark a task as completed
         Look at the reverse dependencies and mark any task with
@@ -1100,6 +1127,11 @@ class RunQueueExecuteTasks(RunQueueExecute):
                 taskname = self.rqdata.runq_task[revdep]
                 logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
 
+    def task_complete(self, task):
+        self.stats.taskCompleted()
+        bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
+        self.task_completeoutright(task)
+
     def task_fail(self, task, exitcode):
         """
         Called when a task has failed
@@ -1115,7 +1147,7 @@ class RunQueueExecuteTasks(RunQueueExecute):
     def task_skip(self, task):
         self.runq_running[task] = 1
         self.runq_buildable[task] = 1
-        self.task_complete(task)
+        self.task_completeoutright(task)
         self.stats.taskCompleted()
         self.stats.taskSkipped()
 
@@ -1141,13 +1173,11 @@ class RunQueueExecuteTasks(RunQueueExecute):
                 elif self.cooker.configuration.dry_run:
                     self.runq_running[task] = 1
                     self.runq_buildable[task] = 1
-                    self.notify_task_started(task)
                     self.stats.taskActive()
                     self.task_complete(task)
-                    self.stats.taskCompleted()
-                    self.notify_task_completed(task)
                     continue
 
+                bb.event.fire(runQueueTaskStarted(task, self.stats, self.rq), self.cfgData)
                 pid, pipein, pipeout = self.fork_off_task(fn, task, taskname)
 
                 self.build_pids[pid] = task
@@ -1178,6 +1208,206 @@ class RunQueueExecuteTasks(RunQueueExecute):
             self.rq.state = runQueueComplete
             return
 
+class RunQueueExecuteScenequeue(RunQueueExecute):
+    def __init__(self, rq):
+        RunQueueExecute.__init__(self, rq)
+
+        self.scenequeue_covered = set()
+        self.scenequeue_notcovered = set()
+
+        # If we don't have any setscene functions, skip this step
+        if len(self.rqdata.runq_setscene) == 0:
+            rq.scenequeue_covered = set()
+            rq.state = runQueueRunInit
+            return
+
+        self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
+
+        endpoints = {}
+        sq_revdeps = []
+        sq_revdeps_new = []
+        sq_revdeps_squash = []
+
+        # We need to construct a dependency graph for the setscene functions. Intermediate
+        # dependencies between the setscene tasks only complicate the code. This code
+        # therefore aims to collapse the huge runqueue dependency tree into a smaller one
+        # only containing the setscene functions.
+
+        for task in range(self.stats.total):
+            self.runq_running.append(0)
+            self.runq_complete.append(0)
+            self.runq_buildable.append(0)
+
+        for task in range(len(self.rqdata.runq_fnid)):
+            sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
+            sq_revdeps_new.append(set())
+            if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
+                endpoints[task] = None
+
+        for task in self.rqdata.runq_setscene:
+            for dep in self.rqdata.runq_depends[task]:
+                    endpoints[dep] = task
+
+        def process_endpoints(endpoints):
+            newendpoints = {}
+            for point, task in endpoints.items():
+                tasks = set()
+                if task:
+                    tasks.add(task)
+                if sq_revdeps_new[point]:
+                    tasks |= sq_revdeps_new[point]
+                sq_revdeps_new[point] = set()
+                for dep in self.rqdata.runq_depends[point]:
+                    if point in sq_revdeps[dep]:
+                        sq_revdeps[dep].remove(point)
+                    if tasks:
+                        sq_revdeps_new[dep] |= tasks
+                    if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
+                        newendpoints[dep] = task
+            if len(newendpoints) != 0:
+                process_endpoints(newendpoints)
+
+        process_endpoints(endpoints)
+
+        for task in range(len(self.rqdata.runq_fnid)):
+            if task in self.rqdata.runq_setscene:
+                deps = set()
+                for dep in sq_revdeps_new[task]:
+                    deps.add(self.rqdata.runq_setscene.index(dep))
+                sq_revdeps_squash.append(deps)
+            elif len(sq_revdeps_new[task]) != 0:
+                bb.msg.fatal(bb.msg.domain.RunQueue, "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
+
+        #for task in range(len(sq_revdeps_squash)):
+        #    print "Task %s: %s.%s is %s " % (task, self.taskData.fn_index[self.runq_fnid[self.runq_setscene[task]]], self.runq_task[self.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task])
+
+        self.sq_deps = []
+        self.sq_revdeps = sq_revdeps_squash
+        self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
+
+        for task in range(len(self.sq_revdeps)):
+            self.sq_deps.append(set())
+        for task in range(len(self.sq_revdeps)):
+            for dep in self.sq_revdeps[task]:
+                self.sq_deps[dep].add(task)
+
+        for task in range(len(self.sq_revdeps)):
+            if len(self.sq_revdeps[task]) == 0:
+                self.runq_buildable[task] = 1
+
+        logger.info('Executing setscene tasks')
+
+        self.rq.state = runQueueSceneRun
+
+    def scenequeue_updatecounters(self, task):
+        for dep in self.sq_deps[task]:
+            self.sq_revdeps2[dep].remove(task)
+            if len(self.sq_revdeps2[dep]) == 0:
+                self.runq_buildable[dep] = 1
+
+    def task_completeoutright(self, task):
+        """
+        Mark a task as completed
+        Look at the reverse dependencies and mark any task with
+        completed dependencies as buildable
+        """
+
+        index = self.rqdata.runq_setscene[task]
+        logger.info('Found task %s which could be accelerated',
+                    self.rqdata.get_user_idstring(index))
+
+        self.scenequeue_covered.add(task)
+        self.scenequeue_updatecounters(task)
+
+    def task_complete(self, task):
+        self.stats.taskCompleted()
+        self.task_completeoutright(task)
+
+    def task_fail(self, task, result):
+        self.stats.taskFailed()
+        index = self.rqdata.runq_setscene[task]
+        bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData)
+        self.scenequeue_notcovered.add(task)
+        self.scenequeue_updatecounters(task)
+
+    def task_failoutright(self, task):
+        self.runq_running[task] = 1
+        self.runq_buildable[task] = 1
+        self.stats.taskCompleted()
+        self.stats.taskSkipped()
+        index = self.rqdata.runq_setscene[task]
+        self.scenequeue_notcovered.add(task)
+        self.scenequeue_updatecounters(task)
+
+    def task_skip(self, task):
+        self.runq_running[task] = 1
+        self.runq_buildable[task] = 1
+        self.task_completeoutright(task)
+        self.stats.taskCompleted()
+        self.stats.taskSkipped()
+
+    def execute(self):
+        """
+        Run the tasks in a queue prepared by prepare_runqueue
+        """
+
+        task = None
+        if self.stats.active < self.number_tasks:
+            # Find the next setscene to run
+            for nexttask in range(self.stats.total):
+                if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
+                    task = nexttask
+                    break
+        if task is not None:
+            realtask = self.rqdata.runq_setscene[task]
+            fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
+
+            taskname = self.rqdata.runq_task[realtask] + "_setscene"
+            if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]):
+                logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
+                             task, self.rqdata.get_user_idstring(task))
+                self.task_failoutright(task)
+                return True
+
+            if self.cooker.configuration.force:
+                for target in self.rqdata.target_pairs:
+                    if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
+                        self.task_failoutright(task)
+                        return True
+
+            if self.rq.check_stamp_task(realtask, taskname):
+                logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
+                             task, self.rqdata.get_user_idstring(realtask))
+                self.task_skip(task)
+                return True
+
+            pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname)
+
+            self.build_pids[pid] = task
+            self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
+            self.runq_running[task] = 1
+            self.stats.taskActive()
+            if self.stats.active < self.number_tasks:
+                return True
+
+        for pipe in self.build_pipes:
+            self.build_pipes[pipe].read()
+
+        if self.stats.active > 0:
+            if self.runqueue_process_waitpid() is None:
+                return True
+            return True
+
+        # Convert scenequeue_covered task numbers into full taskgraph ids
+        oldcovered = self.scenequeue_covered
+        self.rq.scenequeue_covered = set()
+        for task in oldcovered:
+            self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
+
+        logger.info('We can skip tasks %s', self.rq.scenequeue_covered)
+
+        self.rq.state = runQueueRunInit
+        return True
 
 class TaskFailure(Exception):
     """