# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+import copy
import os
import sys
import signal
# These values indicate the next step due to be run in the
# runQueue state machine
runQueuePrepare = 2
-runQueueRunInit = 3
-runQueueRunning = 4
-runQueueFailed = 6
-runQueueCleanUp = 7
-runQueueComplete = 8
-runQueueChildProcess = 9
+runQueueSceneInit = 3
+runQueueSceneRun = 4
+runQueueRunInit = 5
+runQueueRunning = 6
+runQueueFailed = 7
+runQueueCleanUp = 8
+runQueueComplete = 9
+runQueueChildProcess = 10
class RunQueueScheduler(object):
"""
"""
The priority map is sorted by task weight.
"""
- from copy import deepcopy
self.rq = runqueue
self.rqdata = rqdata
- sortweight = sorted(deepcopy(self.rqdata.runq_weight))
- copyweight = deepcopy(self.rqdata.runq_weight)
+ sortweight = sorted(copy.deepcopy(self.rqdata.runq_weight))
+ copyweight = copy.deepcopy(self.rqdata.runq_weight)
self.prio_map = []
for weight in sortweight:
def __init__(self, runqueue, rqdata):
RunQueueSchedulerSpeed.__init__(self, runqueue, rqdata)
- from copy import deepcopy
#FIXME - whilst this groups all fnids together it does not reorder the
#fnid groups optimally.
- basemap = deepcopy(self.prio_map)
+ basemap = copy.deepcopy(self.prio_map)
self.prio_map = []
while (len(basemap) > 0):
entry = basemap.pop(0)
if dep in explored_deps[revdep]:
scan = True
if scan:
- find_chains(revdep, deepcopy(prev_chain))
+ find_chains(revdep, copy.deepcopy(prev_chain))
for dep in explored_deps[revdep]:
if dep not in total_deps:
total_deps.append(dep)
stampfnwhitelist.append(fn)
self.stampfnwhitelist = stampfnwhitelist
+ # Interate over the task list looking for tasks with a 'setscene' function
+ self.runq_setscene = []
+ for task in range(len(self.runq_fnid)):
+ setscene = taskData.gettask_id(self.taskData.fn_index[self.runq_fnid[task]], self.runq_task[task] + "_setscene", False)
+ if not setscene:
+ continue
+ self.runq_setscene.append(task)
+
return len(self.runq_fnid)
def dump_data(self, taskQueue):
return current
def check_stamp_task(self, task, taskname = None):
+ def get_timestamp(f):
+ try:
+ if not os.access(f, os.F_OK):
+ return None
+ return os.stat(f)[stat.ST_MTIME]
+ except:
+ return None
if self.stamppolicy == "perfile":
fulldeptree = False
logger.debug(2, "%s.%s is nostamp\n", fn, taskname)
return False
+ if taskname.endswith("_setscene"):
+ return True
+
iscurrent = True
- t1 = os.stat(stampfile)[stat.ST_MTIME]
+ t1 = get_timestamp(stampfile)
for dep in self.rqdata.runq_depends[task]:
if iscurrent:
fn2 = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[dep]]
taskname2 = self.rqdata.runq_task[dep]
stampfile2 = "%s.%s" % (self.rqdata.dataCache.stamp[fn2], taskname2)
+ t2 = get_timestamp(stampfile2)
+ t3 = get_timestamp(stampfile2 + "_setscene")
+ if t3 and t3 > t2:
+ continue
if fn == fn2 or (fulldeptree and fn2 not in stampwhitelist):
- try:
- t2 = os.stat(stampfile2)[stat.ST_MTIME]
- if t1 < t2:
- logger.debug(2, "Stampfile %s < %s", stampfile, stampfile2)
- iscurrent = False
- except:
- logger.debug(2, "Exception reading %s for %s", stampfile2, stampfile)
+ if not t2 or t1 < t2:
+ logger.debug(2, "Stampfile %s < %s (or does not exist)",
+ stampfile, stampfile2)
iscurrent = False
-
return iscurrent
def execute_runqueue(self):
if self.rqdata.prepare() is 0:
self.state = runQueueComplete
else:
- self.state = runQueueRunInit
+ self.state = runQueueSceneInit
+
+ if self.state is runQueueSceneInit:
+ self.rqexe = RunQueueExecuteScenequeue(self)
+
+ if self.state is runQueueSceneRun:
+ self.rqexe.execute()
if self.state is runQueueRunInit:
logger.info("Executing runqueue")
self.task_fail(task, result[1]>>8)
else:
self.task_complete(task)
- self.stats.taskCompleted()
- bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
def finish_now(self):
if self.stats.active:
for pipe in self.build_pipes:
self.build_pipes[pipe].read()
- try:
- while self.stats.active > 0:
- bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
- if self.runqueue_process_waitpid() is None:
- return
- except:
- self.finish_now()
- raise
+ if self.stats.active > 0:
+ bb.event.fire(runQueueExitWait(self.stats.active), self.cfgData)
+ self.runqueue_process_waitpid()
+ return
if len(self.failed_fnids) != 0:
self.rq.state = runQueueFailed
self.rq.state = runQueueComplete
return
- def notify_task_started(self, task):
- bb.event.fire(runQueueTaskStarted(task, self.stats, self.rq), self.cfgData)
-
- def notify_task_completed(self, task):
- bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
-
def fork_off_task(self, fn, task, taskname):
sys.stdout.flush()
sys.stderr.flush()
newsi = os.open(os.devnull, os.O_RDWR)
os.dup2(newsi, sys.stdin.fileno())
- self.notify_task_started(task)
-
bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY", self, self.cooker.configuration.data)
bb.data.setVar("__RUNQUEUE_DO_NOT_USE_EXTERNALLY2", fn, self.cooker.configuration.data)
try:
self.runq_buildable.append(1)
else:
self.runq_buildable.append(0)
+ if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
+ self.rq.scenequeue_covered.add(task)
+
+ found = True
+ while found:
+ found = False
+ for task in range(self.stats.total):
+ if task in self.rq.scenequeue_covered:
+ continue
+ if len(self.rqdata.runq_revdeps[task]) > 0 and self.rqdata.runq_revdeps[task].issubset(self.rq.scenequeue_covered):
+ self.rq.scenequeue_covered.add(task)
+ found = True
+
+ logger.info('Full skip list %s', self.rq.scenequeue_covered)
+
+ for task in self.rq.scenequeue_covered:
+ self.task_skip(task)
event.fire(bb.event.StampUpdate(self.rqdata.target_pairs, self.rqdata.dataCache.stamp), self.cfgData)
schedulers.add(getattr(module, name))
return schedulers
- def task_complete(self, task):
+ def task_completeoutright(self, task):
"""
Mark a task as completed
Look at the reverse dependencies and mark any task with
taskname = self.rqdata.runq_task[revdep]
logger.debug(1, "Marking task %s (%s, %s) as buildable", revdep, fn, taskname)
+ def task_complete(self, task):
+ self.stats.taskCompleted()
+ bb.event.fire(runQueueTaskCompleted(task, self.stats, self.rq), self.cfgData)
+ self.task_completeoutright(task)
+
def task_fail(self, task, exitcode):
"""
Called when a task has failed
def task_skip(self, task):
self.runq_running[task] = 1
self.runq_buildable[task] = 1
- self.task_complete(task)
+ self.task_completeoutright(task)
self.stats.taskCompleted()
self.stats.taskSkipped()
elif self.cooker.configuration.dry_run:
self.runq_running[task] = 1
self.runq_buildable[task] = 1
- self.notify_task_started(task)
self.stats.taskActive()
self.task_complete(task)
- self.stats.taskCompleted()
- self.notify_task_completed(task)
continue
+ bb.event.fire(runQueueTaskStarted(task, self.stats, self.rq), self.cfgData)
pid, pipein, pipeout = self.fork_off_task(fn, task, taskname)
self.build_pids[pid] = task
self.rq.state = runQueueComplete
return
+class RunQueueExecuteScenequeue(RunQueueExecute):
+ def __init__(self, rq):
+ RunQueueExecute.__init__(self, rq)
+
+ self.scenequeue_covered = set()
+ self.scenequeue_notcovered = set()
+
+ # If we don't have any setscene functions, skip this step
+ if len(self.rqdata.runq_setscene) == 0:
+ rq.scenequeue_covered = set()
+ rq.state = runQueueRunInit
+ return
+
+ self.stats = RunQueueStats(len(self.rqdata.runq_setscene))
+
+ endpoints = {}
+ sq_revdeps = []
+ sq_revdeps_new = []
+ sq_revdeps_squash = []
+
+ # We need to construct a dependency graph for the setscene functions. Intermediate
+ # dependencies between the setscene tasks only complicate the code. This code
+ # therefore aims to collapse the huge runqueue dependency tree into a smaller one
+ # only containing the setscene functions.
+
+ for task in range(self.stats.total):
+ self.runq_running.append(0)
+ self.runq_complete.append(0)
+ self.runq_buildable.append(0)
+
+ for task in range(len(self.rqdata.runq_fnid)):
+ sq_revdeps.append(copy.copy(self.rqdata.runq_revdeps[task]))
+ sq_revdeps_new.append(set())
+ if (len(self.rqdata.runq_revdeps[task]) == 0) and task not in self.rqdata.runq_setscene:
+ endpoints[task] = None
+
+ for task in self.rqdata.runq_setscene:
+ for dep in self.rqdata.runq_depends[task]:
+ endpoints[dep] = task
+
+ def process_endpoints(endpoints):
+ newendpoints = {}
+ for point, task in endpoints.items():
+ tasks = set()
+ if task:
+ tasks.add(task)
+ if sq_revdeps_new[point]:
+ tasks |= sq_revdeps_new[point]
+ sq_revdeps_new[point] = set()
+ for dep in self.rqdata.runq_depends[point]:
+ if point in sq_revdeps[dep]:
+ sq_revdeps[dep].remove(point)
+ if tasks:
+ sq_revdeps_new[dep] |= tasks
+ if (len(sq_revdeps[dep]) == 0 or len(sq_revdeps_new[dep]) != 0) and dep not in self.rqdata.runq_setscene:
+ newendpoints[dep] = task
+ if len(newendpoints) != 0:
+ process_endpoints(newendpoints)
+
+ process_endpoints(endpoints)
+
+ for task in range(len(self.rqdata.runq_fnid)):
+ if task in self.rqdata.runq_setscene:
+ deps = set()
+ for dep in sq_revdeps_new[task]:
+ deps.add(self.rqdata.runq_setscene.index(dep))
+ sq_revdeps_squash.append(deps)
+ elif len(sq_revdeps_new[task]) != 0:
+ bb.msg.fatal(bb.msg.domain.RunQueue, "Something went badly wrong during scenequeue generation, aborting. Please report this problem.")
+
+ #for task in range(len(sq_revdeps_squash)):
+ # print "Task %s: %s.%s is %s " % (task, self.taskData.fn_index[self.runq_fnid[self.runq_setscene[task]]], self.runq_task[self.runq_setscene[task]] + "_setscene", sq_revdeps_squash[task])
+
+ self.sq_deps = []
+ self.sq_revdeps = sq_revdeps_squash
+ self.sq_revdeps2 = copy.deepcopy(self.sq_revdeps)
+
+ for task in range(len(self.sq_revdeps)):
+ self.sq_deps.append(set())
+ for task in range(len(self.sq_revdeps)):
+ for dep in self.sq_revdeps[task]:
+ self.sq_deps[dep].add(task)
+
+ for task in range(len(self.sq_revdeps)):
+ if len(self.sq_revdeps[task]) == 0:
+ self.runq_buildable[task] = 1
+
+ logger.info('Executing setscene tasks')
+
+ self.rq.state = runQueueSceneRun
+
+ def scenequeue_updatecounters(self, task):
+ for dep in self.sq_deps[task]:
+ self.sq_revdeps2[dep].remove(task)
+ if len(self.sq_revdeps2[dep]) == 0:
+ self.runq_buildable[dep] = 1
+
+ def task_completeoutright(self, task):
+ """
+ Mark a task as completed
+ Look at the reverse dependencies and mark any task with
+ completed dependencies as buildable
+ """
+
+ index = self.rqdata.runq_setscene[task]
+ logger.info('Found task %s which could be accelerated',
+ self.rqdata.get_user_idstring(index))
+
+ self.scenequeue_covered.add(task)
+ self.scenequeue_updatecounters(task)
+
+ def task_complete(self, task):
+ self.stats.taskCompleted()
+ self.task_completeoutright(task)
+
+ def task_fail(self, task, result):
+ self.stats.taskFailed()
+ index = self.rqdata.runq_setscene[task]
+ bb.event.fire(runQueueTaskFailed(task, self.stats, self), self.cfgData)
+ self.scenequeue_notcovered.add(task)
+ self.scenequeue_updatecounters(task)
+
+ def task_failoutright(self, task):
+ self.runq_running[task] = 1
+ self.runq_buildable[task] = 1
+ self.stats.taskCompleted()
+ self.stats.taskSkipped()
+ index = self.rqdata.runq_setscene[task]
+ self.scenequeue_notcovered.add(task)
+ self.scenequeue_updatecounters(task)
+
+ def task_skip(self, task):
+ self.runq_running[task] = 1
+ self.runq_buildable[task] = 1
+ self.task_completeoutright(task)
+ self.stats.taskCompleted()
+ self.stats.taskSkipped()
+
+ def execute(self):
+ """
+ Run the tasks in a queue prepared by prepare_runqueue
+ """
+
+ task = None
+ if self.stats.active < self.number_tasks:
+ # Find the next setscene to run
+ for nexttask in range(self.stats.total):
+ if self.runq_buildable[nexttask] == 1 and self.runq_running[nexttask] != 1:
+ task = nexttask
+ break
+ if task is not None:
+ realtask = self.rqdata.runq_setscene[task]
+ fn = self.rqdata.taskData.fn_index[self.rqdata.runq_fnid[realtask]]
+
+ taskname = self.rqdata.runq_task[realtask] + "_setscene"
+ if self.rq.check_stamp_task(realtask, self.rqdata.runq_task[realtask]):
+ logger.debug(2, 'Stamp for underlying task %s(%s) is current, so skipping setscene variant',
+ task, self.rqdata.get_user_idstring(task))
+ self.task_failoutright(task)
+ return True
+
+ if self.cooker.configuration.force:
+ for target in self.rqdata.target_pairs:
+ if target[0] == fn and target[1] == self.rqdata.runq_task[realtask]:
+ self.task_failoutright(task)
+ return True
+
+ if self.rq.check_stamp_task(realtask, taskname):
+ logger.debug(2, 'Setscene stamp current task %s(%s), so skip it and its dependencies',
+ task, self.rqdata.get_user_idstring(realtask))
+ self.task_skip(task)
+ return True
+
+ pid, pipein, pipeout = self.fork_off_task(fn, realtask, taskname)
+
+ self.build_pids[pid] = task
+ self.build_pipes[pid] = runQueuePipe(pipein, pipeout, self.cfgData)
+ self.runq_running[task] = 1
+ self.stats.taskActive()
+ if self.stats.active < self.number_tasks:
+ return True
+
+ for pipe in self.build_pipes:
+ self.build_pipes[pipe].read()
+
+ if self.stats.active > 0:
+ if self.runqueue_process_waitpid() is None:
+ return True
+ return True
+
+ # Convert scenequeue_covered task numbers into full taskgraph ids
+ oldcovered = self.scenequeue_covered
+ self.rq.scenequeue_covered = set()
+ for task in oldcovered:
+ self.rq.scenequeue_covered.add(self.rqdata.runq_setscene[task])
+
+ logger.info('We can skip tasks %s', self.rq.scenequeue_covered)
+
+ self.rq.state = runQueueRunInit
+ return True
class TaskFailure(Exception):
"""