def send(self, packet):
self.socket.sendall(struct.pack("!I", len(packet)) + packet)
- def receive(self):
- raw_length = self._recvall(self.HEADER_LENGTH)
+ def receive(self, timeout=None):
+ raw_length = self._recvall(self.HEADER_LENGTH, timeout)
length, = struct.unpack("!I", raw_length)
payload = self._recvall(length)
return payload
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()
- def _recvall(self, count):
+ def _recvall(self, count, timeout=None):
"""Ensure to read count bytes from the socket"""
data = b""
+ if count > 0:
+ self.socket.settimeout(timeout)
while len(data) < count:
buf = self.socket.recv(count - len(data))
+ self.socket.settimeout(None)
if not buf:
raise socket.error('Connection closed')
data += buf
)
)
- def listen(self, event_types):
+ def listen(self, event_types, timeout=None):
"""Register and listen for the given events.
+ If a timeout is given, the generator produces a (None, None) tuple
+ if no event has been received for that time. This allows the caller
+ to either abort by breaking from the generator, or perform periodic
+ tasks while staying registered within listen(), and then continue
+ waiting for more events.
+
:param event_types: event types to register
:type event_types: list
+ :param timeout: timeout to wait for events, in fractions of a second
+ :type timeout: float
:return: generator for streamed event responses as (event_type, dict)
:rtype: generator
"""
try:
while True:
- response = Packet.parse(self.transport.receive())
+ try:
+ response = Packet.parse(self.transport.receive(timeout))
+ except socket.timeout:
+ yield None, None
+ continue
if response.response_type == Packet.EVENT:
try:
msg = Message.deserialize(response.payload)