if self.fakeworkerpipe:
self.fakeworkerpipe.read()
+ def active_fds(self):
+ fds = []
+ if self.workerpipe:
+ fds.append(self.workerpipe.input)
+ if self.fakeworkerpipe:
+ fds.append(self.fakeworkerpipe.input)
+ return fds
+
def check_stamp_task(self, task, taskname = None, recurse = False, cache = None):
def get_timestamp(f):
try:
(if the abort on failure configuration option isn't set)
"""
- retval = 0.5
+ retval = True
if self.state is runQueuePrepare:
self.rqexe = RunQueueExecuteDummy(self)
if self.stats.active > 0:
self.rq.read_workers()
- return 0.5
+ return self.rq.active_fds()
if len(self.failed_fnids) != 0:
self.rq.state = runQueueFailed
import signal
import sys
import time
+import select
from Queue import Empty
from multiprocessing import Event, Process, util, Queue, Pipe, queues
command = self.command_channel.recv()
self.runCommand(command)
- self.idle_commands(.1)
+ self.idle_commands(.1, [self.event_queue._reader, self.command_channel])
except Exception:
logger.exception('Running command %s', command)
self.cooker.stop()
self.idle_commands(.1)
- def idle_commands(self, delay):
+ def idle_commands(self, delay, fds = []):
nextsleep = delay
for function, data in self._idlefuns.items():
nextsleep = None
elif nextsleep is None:
continue
- elif retval < nextsleep:
- nextsleep = retval
+ else:
+ fds = fds + retval
except SystemExit:
raise
except Exception:
logger.exception('Running idle function')
if nextsleep is not None:
- time.sleep(nextsleep)
+ select.select(fds,[],[],nextsleep)
def runCommand(self, command):
"""
Serve Requests. Overloaded to honor a quit command
"""
self.quit = False
- self.timeout = 0 # Run Idle calls for our first callback
while not self.quit:
- #print "Idle queue length %s" % len(self._idlefuns)
- self.handle_request()
- #print "Idle timeout, running idle functions"
- nextsleep = None
+ fds = [self]
+ nextsleep = 0.1
for function, data in self._idlefuns.items():
try:
retval = function(self, data, False)
del self._idlefuns[function]
elif retval is True:
nextsleep = 0
- elif nextsleep is 0:
- continue
- elif nextsleep is None:
- nextsleep = retval
- elif retval < nextsleep:
- nextsleep = retval
+ else:
+ fds = fds + retval
except SystemExit:
raise
except:
import traceback
traceback.print_exc()
pass
- if nextsleep is None and len(self._idlefuns) > 0:
- nextsleep = 0
- self.timeout = nextsleep
+
+ socktimeout = self.socket.gettimeout() or nextsleep
+ socktimeout = min(socktimeout, nextsleep)
+ # Mirror what BaseServer handle_request would do
+ fd_sets = select.select(fds, [], [], socktimeout)
+ if fd_sets[0] and self in fd_sets[0]:
+ self._handle_request_noblock()
+
# Tell idle functions we're exiting
for function, data in self._idlefuns.items():
try: