ALLOWED_TYPES = ('manager',)
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
+ @support.skip_if_sanitizer('TSan: leaks threads', thread=True)
def test_mymanager(self):
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start()
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
+ @support.skip_if_sanitizer('TSan: leaks threads', thread=True)
def test_mymanager_context(self):
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
with manager:
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
+ @support.skip_if_sanitizer('TSan: leaks threads', thread=True)
def test_mymanager_context_prestarted(self):
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start()
queue.put(tuple(cls.values))
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
+ @support.skip_if_sanitizer('TSan: leaks threads', thread=True)
def test_remote(self):
authkey = os.urandom(32)
queue.put('hello world')
@warnings_helper.ignore_fork_in_thread_deprecation_warnings()
+ @support.skip_if_sanitizer("TSan: leaks threads", thread=True)
def test_rapid_restart(self):
authkey = os.urandom(32)
manager = QueueManager(