def sub_debug(msg, *args):
if _logger:
- _logger.log(SUBDEBUG, msg, *args)
+ _logger.log(SUBDEBUG, msg, *args, stacklevel=2)
def debug(msg, *args):
if _logger:
- _logger.log(DEBUG, msg, *args)
+ _logger.log(DEBUG, msg, *args, stacklevel=2)
def info(msg, *args):
if _logger:
- _logger.log(INFO, msg, *args)
+ _logger.log(INFO, msg, *args, stacklevel=2)
def sub_warning(msg, *args):
if _logger:
- _logger.log(SUBWARNING, msg, *args)
+ _logger.log(SUBWARNING, msg, *args, stacklevel=2)
def get_logger():
'''
root_logger.setLevel(root_level)
logger.setLevel(level=LOG_LEVEL)
+ def test_filename(self):
+ logger = multiprocessing.get_logger()
+ original_level = logger.level
+ try:
+ logger.setLevel(util.DEBUG)
+ stream = io.StringIO()
+ handler = logging.StreamHandler(stream)
+ logging_format = '[%(levelname)s] [%(filename)s] %(message)s'
+ handler.setFormatter(logging.Formatter(logging_format))
+ logger.addHandler(handler)
+ logger.info('1')
+ util.info('2')
+ logger.debug('3')
+ filename = os.path.basename(__file__)
+ log_record = stream.getvalue()
+ self.assertIn(f'[INFO] [{filename}] 1', log_record)
+ self.assertIn(f'[INFO] [{filename}] 2', log_record)
+ self.assertIn(f'[DEBUG] [{filename}] 3', log_record)
+ finally:
+ logger.setLevel(original_level)
+ logger.removeHandler(handler)
+ handler.close()
+
# class _TestLoggingProcessName(BaseTestCase):
#