logging.config.stopListening()
t.join(2.0)
+ @unittest.skipUnless(threading, 'Threading required for this test.')
def test_listen_config_10_ok(self):
with captured_stdout() as output:
self.setup_via_listener(json.dumps(self.config10))
('ERROR', '4'),
], stream=output)
+ @unittest.skipUnless(threading, 'Threading required for this test.')
def test_listen_config_1_ok(self):
with captured_stdout() as output:
self.setup_via_listener(textwrap.dedent(ConfigFileTest.config1))
def test_multiprocessing(self):
r = logging.makeLogRecord({})
self.assertEqual(r.processName, 'MainProcess')
- import multiprocessing as mp
- r = logging.makeLogRecord({})
- self.assertEqual(r.processName, mp.current_process().name)
+ try:
+ import multiprocessing as mp
+ r = logging.makeLogRecord({})
+ self.assertEqual(r.processName, mp.current_process().name)
+ except ImportError:
+ pass
def test_optional(self):
r = logging.makeLogRecord({})
NOT_NONE = self.assertIsNotNone
- NOT_NONE(r.thread)
- NOT_NONE(r.threadName)
+ if threading:
+ NOT_NONE(r.thread)
+ NOT_NONE(r.threadName)
NOT_NONE(r.process)
NOT_NONE(r.processName)
log_threads = logging.logThreads