]> git.ipfire.org Git - thirdparty/openembedded/openembedded-core-contrib.git/commitdiff
sstate: Use the python3 ThreadPoolExecutor instead of the OE ThreadedPool
authorJose Quaresma <quaresma.jose@gmail.com>
Sat, 16 Apr 2022 22:28:45 +0000 (23:28 +0100)
committerRichard Purdie <richard.purdie@linuxfoundation.org>
Fri, 15 Jul 2022 16:51:30 +0000 (17:51 +0100)
For the FetchConnectionCache use a queue where each thread can
get an unsed connection_cache that is properly initialized before
we fireup the ThreadPoolExecutor.

Signed-off-by: Jose Quaresma <quaresma.jose@gmail.com>
Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
meta/classes/sstate.bbclass

index 3513269bcaa755729d6e2f78c7aed35c948f2274..0aa901fe89dcc796b12abf4567414483d1d51482 100644 (file)
@@ -977,15 +977,19 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
             localdata.delVar('BB_NO_NETWORK')
 
         from bb.fetch2 import FetchConnectionCache
-        def checkstatus_init(thread_worker):
-            thread_worker.connection_cache = FetchConnectionCache()
+        def checkstatus_init():
+            while not connection_cache_pool.full():
+                connection_cache_pool.put(FetchConnectionCache())
 
-        def checkstatus_end(thread_worker):
-            thread_worker.connection_cache.close_connections()
+        def checkstatus_end():
+            while not connection_cache_pool.empty():
+                connection_cache = connection_cache_pool.get()
+                connection_cache.close_connections()
 
-        def checkstatus(thread_worker, arg):
+        def checkstatus(arg):
             (tid, sstatefile) = arg
 
+            connection_cache = connection_cache_pool.get()
             localdata2 = bb.data.createCopy(localdata)
             srcuri = "file://" + sstatefile
             localdata2.setVar('SRC_URI', srcuri)
@@ -995,7 +999,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
 
             try:
                 fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2,
-                            connection_cache=thread_worker.connection_cache)
+                            connection_cache=connection_cache)
                 fetcher.checkstatus()
                 bb.debug(2, "SState: Successful fetch test for %s" % srcuri)
                 found.add(tid)
@@ -1005,6 +1009,8 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
             except Exception as e:
                 bb.error("SState: cannot test %s: %s\n%s" % (srcuri, repr(e), traceback.format_exc()))
 
+            connection_cache_pool.put(connection_cache)
+
             if progress:
                 bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) - thread_worker.tasks.qsize()), d)
 
@@ -1025,13 +1031,13 @@ def sstate_checkhashes(sq_data, d, siginfo=False, currentcount=0, summary=True,
             fetcherenv = bb.fetch2.get_fetcher_environment(d)
             with bb.utils.environment(**fetcherenv):
                 bb.event.enable_threadlock()
-                pool = oe.utils.ThreadedPool(nproc, len(tasklist),
-                        worker_init=checkstatus_init, worker_end=checkstatus_end,
-                        name="sstate_checkhashes-")
-                for t in tasklist:
-                    pool.add_task(checkstatus, t)
-                pool.start()
-                pool.wait_completion()
+                import concurrent.futures
+                from queue import Queue
+                connection_cache_pool = Queue(nproc)
+                checkstatus_init()
+                with concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor:
+                    executor.map(checkstatus, tasklist.copy())
+                checkstatus_end()
                 bb.event.disable_threadlock()
 
             if progress: