]> git.ipfire.org Git - thirdparty/openembedded/openembedded-core-contrib.git/commitdiff
cooker: use a pool, abort on first parse error
authorChris Larson <chris_larson@mentor.com>
Tue, 7 Dec 2010 18:00:22 +0000 (13:00 -0500)
committerChris Larson <chris_larson@mentor.com>
Tue, 7 Dec 2010 19:42:04 +0000 (14:42 -0500)
Signed-off-by: Chris Larson <chris_larson@mentor.com>
lib/bb/cooker.py

index 30ff8f2d0eea277ab5f5a86e37f41e6cf788d4b3..2844101f32768c8129cf3126a0683ec63a1adc04 100644 (file)
 
 from __future__ import print_function
 import sys, os, glob, os.path, re, time
+import atexit
+import itertools
 import logging
-import sre_constants
-import threading
 import multiprocessing
 import signal
-import atexit
+import sre_constants
+import threading
 from cStringIO import StringIO
 from contextlib import closing
 import bb
@@ -46,11 +47,6 @@ class MultipleMatches(Exception):
     Exception raised when multiple file matches are found
     """
 
-class ParsingErrorsFound(Exception):
-    """
-    Exception raised when parsing errors are found
-    """
-
 class NothingToBuild(Exception):
     """
     Exception raised when there is nothing to build
@@ -944,6 +940,10 @@ class CookerExit(bb.event.Event):
     def __init__(self):
         bb.event.Event.__init__(self)
 
+def parse_file(task):
+    filename, appends = task
+    return True, bb.cache.Cache.parse(filename, appends, parse_file.cfg)
+
 class CookerParser(object):
     def __init__(self, cooker, filelist, masked):
         self.filelist = filelist
@@ -961,110 +961,85 @@ class CookerParser(object):
         self.total = len(filelist)
 
         self.current = 0
-        self.bb_cache = None
-        self.task_queue = None
-        self.result_queue = None
-        self.fromcache = None
         self.num_processes = int(self.cfgdata.getVar("BB_NUMBER_PARSE_THREADS", True) or
                                  multiprocessing.cpu_count())
 
-    def launch_processes(self):
-        self.task_queue = multiprocessing.Queue()
-        self.result_queue = multiprocessing.Queue()
-
+        self.bb_cache = bb.cache.Cache(self.cfgdata)
         self.fromcache = []
+        self.willparse = []
         for filename in self.filelist:
             appends = self.cooker.get_file_appends(filename)
             if not self.bb_cache.cacheValid(filename):
-                self.task_queue.put((filename, appends))
+                self.willparse.append((filename, appends))
             else:
                 self.fromcache.append((filename, appends))
         self.toparse = self.total - len(self.fromcache)
         self.progress_chunk = max(self.toparse / 100, 1)
 
-        def worker(input, output, cfgdata):
+        self.start()
+
+    def start(self):
+        def init(cfg):
             signal.signal(signal.SIGINT, signal.SIG_IGN)
-            for filename, appends in iter(input.get, 'STOP'):
-                try:
-                    infos = bb.cache.Cache.parse(filename, appends, cfgdata)
-                except bb.parse.ParseError as exc:
-                    output.put(exc)
-                else:
-                    output.put(infos)
+            parse_file.cfg = cfg
 
-        self.processes = []
-        for i in xrange(self.num_processes):
-            process = multiprocessing.Process(target=worker,
-                                              args=(self.task_queue,
-                                                    self.result_queue,
-                                                    self.cfgdata))
-            process.start()
-            self.processes.append(process)
+        bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
 
-    def shutdown(self, clean=True):
-        self.result_queue.close()
-        for process in self.processes:
-            if clean:
-                self.task_queue.put('STOP')
-            else:
-                process.terminate()
-        self.task_queue.close()
-        for process in self.processes:
-            process.join()
-        sync = threading.Thread(target=self.bb_cache.sync)
-        sync.start()
-        atexit.register(lambda: sync.join())
-        if self.error > 0:
-            raise ParsingErrorsFound()
+        self.pool = multiprocessing.Pool(self.num_processes, init, [self.cfgdata])
+        parsed = self.pool.imap(parse_file, self.willparse)
+        self.pool.close()
 
-    def parse_next(self):
-        if self.current >= self.total:
+        self.results = itertools.chain(self.load_cached(), parsed)
+
+    def shutdown(self, clean=True):
+        if clean:
             event = bb.event.ParseCompleted(self.cached, self.parsed,
                                             self.skipped, self.masked,
                                             self.virtuals, self.error,
                                             self.total)
             bb.event.fire(event, self.cfgdata)
-            self.shutdown()
-            return False
-        elif not self.bb_cache:
-            self.bb_cache = bb.cache.Cache(self.cfgdata)
-            self.launch_processes()
-            bb.event.fire(bb.event.ParseStarted(self.toparse), self.cfgdata)
-            return True
+        else:
+            self.pool.terminate()
+        self.pool.join()
 
+        sync = threading.Thread(target=self.bb_cache.sync)
+        sync.start()
+        atexit.register(lambda: sync.join())
+
+    def load_cached(self):
+        for filename, appends in self.fromcache:
+            cached, infos = self.bb_cache.load(filename, appends, self.cfgdata)
+            yield not cached, infos
+
+    def parse_next(self):
         try:
-            if self.result_queue.empty() and self.fromcache:
-                filename, appends = self.fromcache.pop()
-                _, result = self.bb_cache.load(filename, appends, self.cfgdata)
-                parsed = False
-                self.cached += 1
-            else:
-                result = self.result_queue.get()
-                if isinstance(result, Exception):
-                    raise result
-
-                parsed = True
-                self.parsed += 1
-                if self.parsed % self.progress_chunk == 0:
-                    bb.event.fire(bb.event.ParseProgress(self.parsed),
-                                  self.cfgdata)
+            parsed, result = self.results.next()
+        except StopIteration:
+            self.shutdown()
+            return False
         except KeyboardInterrupt:
             self.shutdown(clean=False)
             raise
-        except Exception as e:
-            self.error += 1
-            parselog.critical(str(e))
-        else:
-            self.virtuals += len(result)
-
-            for virtualfn, info in result:
-                if info.skipped:
-                    self.skipped += 1
-                else:
-                    self.bb_cache.add_info(virtualfn, info, self.cooker.status,
-                                           parsed=parsed)
+        except Exception as exc:
+            self.shutdown(clean=False)
+            sys.exit(1)
 
         self.current += 1
+        self.virtuals += len(result)
+        if parsed:
+            self.parsed += 1
+            if self.parsed % self.progress_chunk == 0:
+                bb.event.fire(bb.event.ParseProgress(self.parsed),
+                              self.cfgdata)
+        else:
+            self.cached += 1
+
+        for virtualfn, info in result:
+            if info.skipped:
+                self.skipped += 1
+            else:
+                self.bb_cache.add_info(virtualfn, info, self.cooker.status,
+                                        parsed=parsed)
         return True
 
     def reparse(self, filename):