'''
if self._listener is None:
raise OSError('listener is closed')
+
c = self._listener.accept()
- if self._authkey:
+ if self._authkey is not None:
deliver_challenge(c, self._authkey)
answer_challenge(c, self._authkey)
return c
if self.TYPE == 'processes':
self.assertRaises(OSError, l.accept)
+ def test_empty_authkey(self):
+ # bpo-43952: allow empty bytes as authkey
+ def handler(*args):
+ raise RuntimeError('Connection took too long...')
+
+ def run(addr, authkey):
+ client = self.connection.Client(addr, authkey=authkey)
+ client.send(1729)
+
+ key = b""
+
+ with self.connection.Listener(authkey=key) as listener:
+ threading.Thread(target=run, args=(listener.address, key)).start()
+ with listener.accept() as d:
+ self.assertEqual(d.recv(), 1729)
+
+ if self.TYPE == 'processes':
+ self.assertRaises(OSError, listener.accept)
+
@unittest.skipUnless(util.abstract_sockets_supported,
"test needs abstract socket support")
def test_abstract_socket(self):