def close(self):
self._closed = True
- try:
- self._reader.close()
- finally:
- close = self._close
- if close:
- self._close = None
- close()
+ close = self._close
+ if close:
+ self._close = None
+ close()
def join_thread(self):
debug('Queue.join_thread()')
self._thread = threading.Thread(
target=Queue._feed,
args=(self._buffer, self._notempty, self._send_bytes,
- self._wlock, self._writer.close, self._ignore_epipe,
- self._on_queue_feeder_error, self._sem),
+ self._wlock, self._reader.close, self._writer.close,
+ self._ignore_epipe, self._on_queue_feeder_error,
+ self._sem),
name='QueueFeederThread'
)
self._thread.daemon = True
notempty.notify()
@staticmethod
- def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe,
- onerror, queue_sem):
+ def _feed(buffer, notempty, send_bytes, writelock, reader_close,
+ writer_close, ignore_epipe, onerror, queue_sem):
debug('starting thread to feed data to pipe')
nacquire = notempty.acquire
nrelease = notempty.release
obj = bpopleft()
if obj is sentinel:
debug('feeder thread got sentinel -- exiting')
- close()
+ reader_close()
+ writer_close()
return
# serialize the data before acquiring the lock