]> git.ipfire.org Git - thirdparty/openembedded/openembedded-core-contrib.git/commitdiff
bitbake: cooker: parse using bb.compat.Pool
authorChristopher Larson <chris_larson@mentor.com>
Tue, 12 Feb 2013 17:28:47 +0000 (12:28 -0500)
committerRichard Purdie <richard.purdie@linuxfoundation.org>
Tue, 19 Feb 2013 16:47:36 +0000 (08:47 -0800)
(Bitbake rev: 8af519a49a3374bd9004864ef31ca8aa328e9f34)

Signed-off-by: Christopher Larson <chris_larson@mentor.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
bitbake/lib/bb/cooker.py

index 9d051fa30f0eede123fe0f7327789e83c4660393..9f7121fefcf376082fcf12e74de90cbf8604e697 100644 (file)
@@ -34,7 +34,7 @@ from cStringIO import StringIO
 from contextlib import closing
 from functools import wraps
 from collections import defaultdict
-import bb, bb.exceptions, bb.command
+import bb, bb.exceptions, bb.command, bb.compat
 from bb import utils, data, parse, event, cache, providers, taskdata, runqueue
 import Queue
 import prserv.serv
@@ -1556,87 +1556,19 @@ class ParsingFailure(Exception):
         self.recipe = recipe
         Exception.__init__(self, realexception, recipe)
 
-class Feeder(multiprocessing.Process):
-    def __init__(self, jobs, to_parsers, quit):
-        self.quit = quit
-        self.jobs = jobs
-        self.to_parsers = to_parsers
-        multiprocessing.Process.__init__(self)
-
-    def run(self):
-        while True:
-            try:
-                quit = self.quit.get_nowait()
-            except Queue.Empty:
-                pass
-            else:
-                if quit == 'cancel':
-                    self.to_parsers.cancel_join_thread()
-                break
-
-            try:
-                job = self.jobs.pop()
-            except IndexError:
-                break
-
-            try:
-                self.to_parsers.put(job, timeout=0.5)
-            except Queue.Full:
-                self.jobs.insert(0, job)
-                continue
-
-class Parser(multiprocessing.Process):
-    def __init__(self, jobs, results, quit, init):
-        self.jobs = jobs
-        self.results = results
-        self.quit = quit
-        self.init = init
-        multiprocessing.Process.__init__(self)
-
-    def run(self):
-        if self.init:
-            self.init()
-
-        pending = []
-        while True:
-            try:
-                self.quit.get_nowait()
-            except Queue.Empty:
-                pass
-            else:
-                self.results.cancel_join_thread()
-                break
-
-            if pending:
-                result = pending.pop()
-            else:
-                try:
-                    job = self.jobs.get(timeout=0.25)
-                except Queue.Empty:
-                    continue
-
-                if job is None:
-                    break
-                result = self.parse(*job)
-
-            try:
-                self.results.put(result, timeout=0.25)
-            except Queue.Full:
-                pending.append(result)
-
-    def parse(self, filename, appends, caches_array):
-        try:
-            return True, bb.cache.Cache.parse(filename, appends, self.cfg, caches_array)
-        except Exception as exc:
-            tb = sys.exc_info()[2]
-            exc.recipe = filename
-            exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3))
-            return True, exc
-        # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown
-        # and for example a worker thread doesn't just exit on its own in response to
-        # a SystemExit event for example.
-        except BaseException as exc:
-            return True, ParsingFailure(exc, filename)
+def parse_file((filename, appends, caches_array)):
+    try:
+        return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg, caches_array)
+    except Exception as exc:
+        tb = sys.exc_info()[2]
+        exc.recipe = filename
+        exc.traceback = list(bb.exceptions.extract_traceback(tb, context=3))
+        return True, exc
+    # Need to turn BaseExceptions into Exceptions here so we gracefully shutdown
+    # and for example a worker thread doesn't just exit on its own in response to
+    # a SystemExit event for example.
+    except BaseException as exc:
+        return True, ParsingFailure(exc, filename)
 
 class CookerParser(object):
     def __init__(self, cooker, filelist, masked):
@@ -1670,32 +1602,25 @@ class CookerParser(object):
                 self.fromcache.append((filename, appends))
         self.toparse = self.total - len(self.fromcache)
         self.progress_chunk = max(self.toparse / 100, 1)
+        self.chunk = int(self.cfgdata.getVar("BB_PARSE_CHUNK", True) or 1)
 
         self.start()
         self.haveshutdown = False
 
     def start(self):
         self.results = self.load_cached()
-        self.processes = []
         if self.toparse:
+            def process_init():
+                parse_file.cfg = self.cfgdata
+                multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(parse_file.cfg,), exitpriority=1)
+                multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(parse_file.cfg,), exitpriority=1)
+
             bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
-            def init():
-                Parser.cfg = self.cfgdata
-                multiprocessing.util.Finalize(None, bb.codeparser.parser_cache_save, args=(self.cfgdata,), exitpriority=1)
-                multiprocessing.util.Finalize(None, bb.fetch.fetcher_parse_save, args=(self.cfgdata,), exitpriority=1)
-
-            self.feeder_quit = multiprocessing.Queue(maxsize=1)
-            self.parser_quit = multiprocessing.Queue(maxsize=self.num_processes)
-            self.jobs = multiprocessing.Queue(maxsize=self.num_processes)
-            self.result_queue = multiprocessing.Queue()
-            self.feeder = Feeder(self.willparse, self.jobs, self.feeder_quit)
-            self.feeder.start()
-            for i in range(0, self.num_processes):
-                parser = Parser(self.jobs, self.result_queue, self.parser_quit, init)
-                parser.start()
-                self.processes.append(parser)
-
-            self.results = itertools.chain(self.results, self.parse_generator())
+
+            self.pool = bb.compat.Pool(self.num_processes, process_init)
+            parsed = self.pool.imap_unordered(parse_file, self.willparse, self.chunk)
+            self.pool.close()
+            self.results = itertools.chain(self.results, parsed)
 
     def shutdown(self, clean=True, force=False):
         if not self.toparse:
@@ -1711,25 +1636,9 @@ class CookerParser(object):
                                             self.total)
 
             bb.event.fire(event, self.cfgdata)
-            self.feeder_quit.put(None)
-            for process in self.processes:
-                self.jobs.put(None)
         else:
-            self.feeder_quit.put('cancel')
-
-            self.parser_quit.cancel_join_thread()
-            for process in self.processes:
-                self.parser_quit.put(None)
-
-            self.jobs.cancel_join_thread()
-
-        for process in self.processes:
-            if force:
-                process.join(.1)
-                process.terminate()
-            else:
-                process.join()
-        self.feeder.join()
+            self.pool.terminate()
+        self.pool.join()
 
         sync = threading.Thread(target=self.bb_cache.sync)
         sync.start()
@@ -1742,22 +1651,6 @@ class CookerParser(object):
             cached, infos = self.bb_cache.load(filename, appends, self.cfgdata)
             yield not cached, infos
 
-    def parse_generator(self):
-        while True:
-            if self.parsed >= self.toparse:
-                break
-
-            try:
-                result = self.result_queue.get(timeout=0.25)
-            except Queue.Empty:
-                pass
-            else:
-                value = result[1]
-                if isinstance(value, BaseException):
-                    raise value
-                else:
-                    yield result
-
     def parse_next(self):
         result = []
         parsed = None