authkey = current.authkey
self.assertTrue(current.is_alive())
- self.assertTrue(not current.daemon)
+ self.assertFalse(current.daemon)
self.assertIsInstance(authkey, bytes)
self.assertTrue(len(authkey) > 0)
self.assertEqual(current.ident, os.getpid())
self.assertEqual(p.is_alive(), False)
self.assertEqual(p.daemon, True)
self.assertNotIn(p, self.active_children())
- self.assertTrue(type(self.active_children()) is list)
+ self.assertIs(type(self.active_children()), list)
self.assertEqual(p.exitcode, None)
p.start()
cpus = multiprocessing.cpu_count()
except NotImplementedError:
cpus = 1
- self.assertTrue(type(cpus) is int)
- self.assertTrue(cpus >= 1)
+ self.assertIsInstance(cpus, int)
+ self.assertGreaterEqual(cpus, 1)
def test_active_children(self):
self.assertEqual(type(self.active_children()), list)
self.assertEqual(lock, lock3)
arr4 = self.Value('i', 5, lock=False)
- self.assertFalse(hasattr(arr4, 'get_lock'))
- self.assertFalse(hasattr(arr4, 'get_obj'))
+ self.assertNotHasAttr(arr4, 'get_lock')
+ self.assertNotHasAttr(arr4, 'get_obj')
self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
arr5 = self.RawValue('i', 5)
- self.assertFalse(hasattr(arr5, 'get_lock'))
- self.assertFalse(hasattr(arr5, 'get_obj'))
+ self.assertNotHasAttr(arr5, 'get_lock')
+ self.assertNotHasAttr(arr5, 'get_obj')
class _TestArray(BaseTestCase):
self.assertEqual(lock, lock3)
arr4 = self.Array('i', range(10), lock=False)
- self.assertFalse(hasattr(arr4, 'get_lock'))
- self.assertFalse(hasattr(arr4, 'get_obj'))
+ self.assertNotHasAttr(arr4, 'get_lock')
+ self.assertNotHasAttr(arr4, 'get_obj')
self.assertRaises(AttributeError,
self.Array, 'i', range(10), lock='notalock')
arr5 = self.RawArray('i', range(10))
- self.assertFalse(hasattr(arr5, 'get_lock'))
- self.assertFalse(hasattr(arr5, 'get_obj'))
+ self.assertNotHasAttr(arr5, 'get_lock')
+ self.assertNotHasAttr(arr5, 'get_obj')
#
#
self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
del n.job
self.assertEqual(str(n), "Namespace(name='Bob')")
- self.assertTrue(hasattr(n, 'name'))
- self.assertTrue(not hasattr(n, 'job'))
+ self.assertHasAttr(n, 'name')
+ self.assertNotHasAttr(n, 'job')
#
#
for name in modules:
__import__(name)
mod = sys.modules[name]
- self.assertTrue(hasattr(mod, '__all__'), name)
-
+ self.assertHasAttr(mod, '__all__', name)
for attr in mod.__all__:
- self.assertTrue(
- hasattr(mod, attr),
- '%r does not have attribute %r' % (mod, attr)
- )
+ self.assertHasAttr(mod, attr)
#
# Quick test that logging works -- does not test logging output
def test_enable_logging(self):
logger = multiprocessing.get_logger()
logger.setLevel(util.SUBWARNING)
- self.assertTrue(logger is not None)
+ self.assertIsNotNone(logger)
logger.debug('this will not be printed')
logger.info('nor will this')
logger.setLevel(LOG_LEVEL)
self.assertEqual(multiprocessing.get_start_method(), method)
ctx = multiprocessing.get_context()
self.assertEqual(ctx.get_start_method(), method)
- self.assertTrue(type(ctx).__name__.lower().startswith(method))
- self.assertTrue(
- ctx.Process.__name__.lower().startswith(method))
+ self.assertStartsWith(type(ctx).__name__.lower(), method)
+ self.assertStartsWith(ctx.Process.__name__.lower(), method)
self.check_context(multiprocessing)
count += 1
finally:
if should_die:
self.assertEqual(len(all_warn), 1)
the_warn = all_warn[0]
- self.assertTrue(issubclass(the_warn.category, UserWarning))
- self.assertTrue("resource_tracker: process died"
- in str(the_warn.message))
+ self.assertIsSubclass(the_warn.category, UserWarning)
+ self.assertIn("resource_tracker: process died",
+ str(the_warn.message))
else:
self.assertEqual(len(all_warn), 0)
Process=FailingForkProcess))
p.close()
p.join()
- self.assertFalse(
- any(process.is_alive() for process in forked_processes))
+ for process in forked_processes:
+ self.assertFalse(process.is_alive(), process)
@hashlib_helper.requires_hashdigest('sha256')