self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDPLITE)
self.port = socket_helper.bind_port(self.serv)
-class ThreadSafeCleanupTestCase:
- """Subclass of unittest.TestCase with thread-safe cleanup methods.
-
- This subclass protects the addCleanup() method with a recursive lock.
-
- doCleanups() is called when the server completed, but the client can still
- be running in its thread especially if the server failed with a timeout.
- Don't put a lock on doCleanups() to prevent deadlock between addCleanup()
- called in the client and doCleanups() waiting for self.done.wait of
- ThreadableTest._setUp() (gh-110167)
- """
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self._cleanup_lock = threading.RLock()
-
- def addCleanup(self, *args, **kwargs):
- with self._cleanup_lock:
- return super().addCleanup(*args, **kwargs)
-
class SocketCANTest(unittest.TestCase):
self.serv.listen()
-class ThreadedSocketTestMixin(ThreadSafeCleanupTestCase, SocketTestBase,
- ThreadableTest):
+class ThreadedSocketTestMixin(SocketTestBase, ThreadableTest):
"""Mixin to add client socket and allow client/server tests.
Client socket is self.cli and its address is self.cli_addr. See
# here assumes that datagram delivery on the local machine will be
# reliable.
-class SendrecvmsgBase(ThreadSafeCleanupTestCase):
+class SendrecvmsgBase:
# Base class for sendmsg()/recvmsg() tests.
# Time in seconds to wait before considering a test failed, or
@unittest.skipUnless(hasattr(signal, "alarm") or hasattr(signal, "setitimer"),
"Don't have signal.alarm or signal.setitimer")
class InterruptedSendTimeoutTest(InterruptedTimeoutBase,
- ThreadSafeCleanupTestCase,
SocketListeningTestMixin, TCPTestBase):
# Test interrupting the interruptible send*() methods with signals
# when a timeout is set.