class CreateServerFunctionalTest(unittest.TestCase):
timeout = support.LOOPBACK_TIMEOUT
- def setUp(self):
- self.thread = None
-
- def tearDown(self):
- if self.thread is not None:
- self.thread.join(self.timeout)
-
def echo_server(self, sock):
def run(sock):
with sock:
event = threading.Event()
sock.settimeout(self.timeout)
- self.thread = threading.Thread(target=run, args=(sock, ))
- self.thread.start()
+ thread = threading.Thread(target=run, args=(sock, ))
+ thread.start()
+ self.addCleanup(thread.join, self.timeout)
event.set()
def echo_client(self, addr, family):