]> git.ipfire.org Git - thirdparty/openembedded/openembedded-core-contrib.git/commitdiff
lib/oe/utils: Add utils function for multiprocess execution
authorRichard Purdie <richard.purdie@linuxfoundation.org>
Thu, 21 Aug 2014 20:46:28 +0000 (21:46 +0100)
committerRichard Purdie <richard.purdie@linuxfoundation.org>
Sat, 23 Aug 2014 08:29:49 +0000 (09:29 +0100)
Our usage of multitprocessing is problematic. In particular, there is a bug
in python 2.7 multiprocessing where signals are not handled until command
completion instead of immediately.

This factors the multiprocess code into a function which is enhanced with
a workaround to ensure immediate signal handling and also better SIGINT
handling which should happen in the parent, not the children to ensure
clean exits. The workaround for the signals is being added to the core
bb.utils function so it can benefit all users.

package_manager is then converted to use the new code.

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
meta/lib/oe/package_manager.py
meta/lib/oe/utils.py

index 8be3d4170625553721e3ce3af8bf0b37ef2769b7..f8fc3c28bfb7af297aa01b5d831007724da47677 100644 (file)
@@ -7,6 +7,7 @@ import multiprocessing
 import re
 import bb
 import tempfile
+import oe.utils
 
 
 # this can be used by all PM backends to create the index files in parallel
@@ -116,16 +117,7 @@ class RpmIndexer(Indexer):
             bb.note("There are no packages in %s" % self.deploy_dir)
             return
 
-        nproc = multiprocessing.cpu_count()
-        pool = bb.utils.multiprocessingpool(nproc)
-        results = list(pool.imap(create_index, index_cmds))
-        pool.close()
-        pool.join()
-
-        for result in results:
-            if result is not None:
-                return(result)
-
+        oe.utils.multiprocess_exec(index_cmds, create_index)
 
 class OpkgIndexer(Indexer):
     def write_index(self):
@@ -161,15 +153,7 @@ class OpkgIndexer(Indexer):
             bb.note("There are no packages in %s!" % self.deploy_dir)
             return
 
-        nproc = multiprocessing.cpu_count()
-        pool = bb.utils.multiprocessingpool(nproc)
-        results = list(pool.imap(create_index, index_cmds))
-        pool.close()
-        pool.join()
-
-        for result in results:
-            if result is not None:
-                return(result)
+        oe.utils.multiprocess_exec(index_cmds, create_index)
 
 
 class DpkgIndexer(Indexer):
@@ -210,15 +194,7 @@ class DpkgIndexer(Indexer):
             bb.note("There are no packages in %s" % self.deploy_dir)
             return
 
-        nproc = multiprocessing.cpu_count()
-        pool = bb.utils.multiprocessingpool(nproc)
-        results = list(pool.imap(create_index, index_cmds))
-        pool.close()
-        pool.join()
-
-        for result in results:
-            if result is not None:
-                return(result)
+        oe.utils.multiprocess_exec(index_cmds, create_index)
 
 
 class PkgsList(object):
index 0a1d1080c94558b29802a2c16a71a8b5251e38a8..92e21a4e0e9f6b5ab1f01d4f1d79eab76387c2b0 100644 (file)
@@ -151,3 +151,32 @@ def execute_pre_post_process(d, cmds):
         if cmd != '':
             bb.note("Executing %s ..." % cmd)
             bb.build.exec_func(cmd, d)
+
+def multiprocess_exec(commands, function):
+    import signal
+    import multiprocessing
+
+    if not commands:
+        return []
+
+    def init_worker():
+        signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+    nproc = min(multiprocessing.cpu_count(), len(commands))
+    pool = bb.utils.multiprocessingpool(nproc, init_worker)
+    imap = pool.imap(function, commands)
+
+    try:
+        results = list(imap)
+        pool.close()
+        pool.join()
+        results = []
+        for result in results:
+            if result is not None:
+                results.append(result)
+        return results
+
+    except KeyboardInterrupt:
+        pool.terminate()
+        pool.join()
+        raise