instance.io_loop = io_loop
instance._multi = pycurl.CurlMulti()
instance._multi.setopt(pycurl.M_TIMERFUNCTION,
- instance._handle_timer)
+ instance._set_timeout)
instance._multi.setopt(pycurl.M_SOCKETFUNCTION,
instance._handle_socket)
instance._curls = [_curl_create(max_simultaneous_connections)
instance._free_list = instance._curls[:]
instance._requests = collections.deque()
instance._fds = {}
+ instance._timeout = None
cls._ASYNC_CLIENTS[io_loop] = instance
return instance
request = HTTPRequest(url=request, **kwargs)
self._requests.append((request, callback))
self._process_queue()
- self.io_loop.add_callback(self._handle_timeout)
+ self._set_timeout(0)
def _handle_socket(self, event, fd, multi, data):
"""Called by libcurl when it wants to change the file descriptors
self._fds[fd] = ioloop_event
self.io_loop.update_handler(fd, ioloop_event)
- def _handle_timer(self, msecs):
+ def _set_timeout(self, msecs):
"""Called by libcurl to schedule a timeout."""
- self.io_loop.add_timeout(
+ if self._timeout is not None:
+ self.io_loop.remove_timeout(self._timeout)
+ self._timeout = self.io_loop.add_timeout(
time.time() + msecs/1000.0, self._handle_timeout)
def _handle_events(self, fd, events):
def _handle_timeout(self):
"""Called by IOLoop when the requested timeout has passed."""
+ self._timeout = None
while True:
try:
ret, num_handles = self._multi.socket_action(
break
self._finish_pending_requests()
+ # In theory, we shouldn't have to do this because curl will
+ # call _set_timeout whenever the timeout changes. However,
+ # sometimes after _handle_timeout we will need to reschedule
+ # immediately even though nothing has changed from curl's
+ # perspective. This is because when socket_action is
+ # called with SOCKET_TIMEOUT, libcurl decides internally which
+ # timeouts need to be processed by using a monotonic clock
+ # (where available) while tornado uses python's time.time()
+ # to decide when timeouts have occurred. When those clocks
+ # disagree on elapsed time (as they will whenever there is an
+ # NTP adjustment), tornado might call _handle_timeout before
+ # libcurl is ready. After each timeout, resync the scheduled
+ # timeout with libcurl's current state.
+ new_timeout = self._multi.timeout()
+ if new_timeout != -1:
+ self._set_timeout(new_timeout)
+
def _finish_pending_requests(self):
"""Process any requests that were completed by the last
call to multi.socket_action.