ALLOWED_TYPES = ('manager',)
+ @support.skip_if_sanitizer('TSan: leaks threads', thread=True)
def test_mymanager(self):
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start()
# which happens on slow buildbots.
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
+ @support.skip_if_sanitizer('TSan: leaks threads', thread=True)
def test_mymanager_context(self):
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
with manager:
# which happens on slow buildbots.
self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
+ @support.skip_if_sanitizer('TSan: leaks threads', thread=True)
def test_mymanager_context_prestarted(self):
manager = MyManager(shutdown_timeout=SHUTDOWN_TIMEOUT)
manager.start()
# Note that xmlrpclib will deserialize object as a list not a tuple
queue.put(tuple(cls.values))
+ @support.skip_if_sanitizer('TSan: leaks threads', thread=True)
def test_remote(self):
authkey = os.urandom(32)
queue = manager.get_queue()
queue.put('hello world')
+ @support.skip_if_sanitizer("TSan: leaks threads", thread=True)
def test_rapid_restart(self):
authkey = os.urandom(32)
manager = QueueManager(