]> git.ipfire.org Git - thirdparty/openembedded/openembedded-core-contrib.git/commitdiff
bitbake: server/process, server/xmlrpc, runqueue: Use select.select() on fds, not...
authorRichard Purdie <richard.purdie@linuxfoundation.org>
Sat, 31 Aug 2013 22:40:55 +0000 (23:40 +0100)
committerRichard Purdie <richard.purdie@linuxfoundation.org>
Sun, 1 Sep 2013 14:51:10 +0000 (15:51 +0100)
The existing backend server implementations were inefficient since they
were sleeping for the full length of the timeouts rather than being woken when
there was data ready for them. It was assumed they would wake and perhaps did
when we forked processes directory but that is no longer the case.

This updates both the process and xmlrpc backends to wait using select(). This
does mean we need to pass the file descriptors to wait on from the internals
who know which these file descriptors are but this is a logical improvement.

Tests of a pathaolgical load on the process server of ~420 rapid tasks
executed on a server with BB_NUMBER_THREAD=48  went from a wall clock
measurement of the overall command execution time of 75s to a much more
reasonable 24s.

(Bitbake rev: 9bee497960889d9baa0a4284d79a384b18a8e826)

Signed-off-by: Richard Purdie <richard.purdie@linuxfoundation.org>
bitbake/lib/bb/runqueue.py
bitbake/lib/bb/server/process.py
bitbake/lib/bb/server/xmlrpc.py

index 075c84985af602b8972bdbe0a03784171ddacff8..197308fd9da7967130bf1464a9577398e6a1a081 100644 (file)
@@ -895,6 +895,14 @@ class RunQueue:
         if self.fakeworkerpipe:
             self.fakeworkerpipe.read()
 
+    def active_fds(self):
+        fds = []
+        if self.workerpipe:
+            fds.append(self.workerpipe.input)
+        if self.fakeworkerpipe:
+            fds.append(self.fakeworkerpipe.input)
+        return fds
+
     def check_stamp_task(self, task, taskname = None, recurse = False, cache = None):
         def get_timestamp(f):
             try:
@@ -972,7 +980,7 @@ class RunQueue:
         (if the abort on failure configuration option isn't set)
         """
 
-        retval = 0.5
+        retval = True
 
         if self.state is runQueuePrepare:
             self.rqexe = RunQueueExecuteDummy(self)
@@ -1375,7 +1383,7 @@ class RunQueueExecuteTasks(RunQueueExecute):
 
         if self.stats.active > 0:
             self.rq.read_workers()
-            return 0.5
+            return self.rq.active_fds()
 
         if len(self.failed_fnids) != 0:
             self.rq.state = runQueueFailed
index e2cec49b74fe02c686633570c4165d330a154838..c0af052ebd24c29b3e4cbd13b3161367ac48bfe8 100644 (file)
@@ -29,6 +29,7 @@ import os
 import signal
 import sys
 import time
+import select
 from Queue import Empty
 from multiprocessing import Event, Process, util, Queue, Pipe, queues
 
@@ -105,7 +106,7 @@ class ProcessServer(Process, BaseImplServer):
                     command = self.command_channel.recv()
                     self.runCommand(command)
 
-                self.idle_commands(.1)
+                self.idle_commands(.1, [self.event_queue._reader, self.command_channel])
             except Exception:
                 logger.exception('Running command %s', command)
 
@@ -115,7 +116,7 @@ class ProcessServer(Process, BaseImplServer):
         self.cooker.stop()
         self.idle_commands(.1)
 
-    def idle_commands(self, delay):
+    def idle_commands(self, delay, fds = []):
         nextsleep = delay
 
         for function, data in self._idlefuns.items():
@@ -127,15 +128,15 @@ class ProcessServer(Process, BaseImplServer):
                     nextsleep = None
                 elif nextsleep is None:
                     continue
-                elif retval < nextsleep:
-                    nextsleep = retval
+                else:
+                    fds = fds + retval
             except SystemExit:
                 raise
             except Exception:
                 logger.exception('Running idle function')
 
         if nextsleep is not None:
-            time.sleep(nextsleep)
+            select.select(fds,[],[],nextsleep)
 
     def runCommand(self, command):
         """
index 641e15e8336cab598f65b7fcb4a7f92cc5a960d1..cca569d0e9866c5c228b46a195bb6a1cb66e0ed5 100644 (file)
@@ -264,12 +264,9 @@ class XMLRPCServer(SimpleXMLRPCServer, BaseImplServer):
         Serve Requests. Overloaded to honor a quit command
         """
         self.quit = False
-        self.timeout = 0 # Run Idle calls for our first callback
         while not self.quit:
-            #print "Idle queue length %s" % len(self._idlefuns)
-            self.handle_request()
-            #print "Idle timeout, running idle functions"
-            nextsleep = None
+            fds = [self]
+            nextsleep = 0.1
             for function, data in self._idlefuns.items():
                 try:
                     retval = function(self, data, False)
@@ -277,21 +274,22 @@ class XMLRPCServer(SimpleXMLRPCServer, BaseImplServer):
                         del self._idlefuns[function]
                     elif retval is True:
                         nextsleep = 0
-                    elif nextsleep is 0:
-                        continue
-                    elif nextsleep is None:
-                        nextsleep = retval
-                    elif retval < nextsleep:
-                        nextsleep = retval
+                    else:
+                        fds = fds + retval
                 except SystemExit:
                     raise
                 except:
                     import traceback
                     traceback.print_exc()
                     pass
-            if nextsleep is None and len(self._idlefuns) > 0:
-                nextsleep = 0
-            self.timeout = nextsleep
+
+            socktimeout = self.socket.gettimeout() or nextsleep
+            socktimeout = min(socktimeout, nextsleep)
+            # Mirror what BaseServer handle_request would do
+            fd_sets = select.select(fds, [], [], socktimeout)
+            if fd_sets[0] and self in fd_sets[0]:
+                self._handle_request_noblock()
+
         # Tell idle functions we're exiting
         for function, data in self._idlefuns.items():
             try: