# log.py - adapt python logging module to SQLAlchemy
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Michael Bayer mike_mp@zzzcomputing.com
+# Includes alterations by Vinay Sajip vinay_sajip@yahoo.co.uk
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
module. The regular dotted module namespace is used, starting at
'sqlalchemy'. For class-level logging, the class name is appended.
-The "echo" keyword parameter which is available on SQLA ``Engine``
-and ``Pool`` objects corresponds to a logger specific to that
+The "echo" keyword parameter, available on SQLA :class:`.Engine`
+and :class:`.Pool` objects, corresponds to a logger specific to that
instance only.
-E.g.::
-
- engine.echo = True
-
-is equivalent to::
-
- import logging
- logger = logging.getLogger('sqlalchemy.engine.Engine.%s' % hex(id(engine)))
- logger.setLevel(logging.DEBUG)
-
"""
import logging
import sys
from sqlalchemy import util
+# set initial level to WARN. This so that
+# log statements don't occur in the absense of explicit
+# logging being enabled for 'sqlalchemy'.
rootlogger = logging.getLogger('sqlalchemy')
if rootlogger.level == logging.NOTSET:
rootlogger.setLevel(logging.WARN)
-default_enabled = False
-def default_logging(name):
- global default_enabled
- if logging.getLogger(name).getEffectiveLevel() < logging.WARN:
- default_enabled = True
- if not default_enabled:
- default_enabled = True
- handler = logging.StreamHandler(sys.stdout)
- handler.setFormatter(logging.Formatter(
- '%(asctime)s %(levelname)s %(name)s %(message)s'))
- rootlogger.addHandler(handler)
+def _add_default_handler(logger):
+ handler = logging.StreamHandler(sys.stdout)
+ handler.setFormatter(logging.Formatter(
+ '%(asctime)s %(levelname)s %(name)s %(message)s'))
+ logger.addHandler(handler)
_logged_classes = set()
def class_logger(cls, enable=False):
class Identified(object):
- @util.memoized_property
- def logging_name(self):
- # limit the number of loggers by chopping off the hex(id).
- # some novice users unfortunately create an unlimited number
- # of Engines in their applications which would otherwise
- # cause the app to run out of memory.
- return "0x...%s" % hex(id(self))[-4:]
+ logging_name = None
+
+ def _should_log_debug(self):
+ return self.logger.isEnabledFor(logging.DEBUG)
+
+ def _should_log_info(self):
+ return self.logger.isEnabledFor(logging.INFO)
+class InstanceLogger(object):
+ """A logger adapter (wrapper) for :class:`.Identified` subclasses.
-def instance_logger(instance, echoflag=None):
- """create a logger for an instance that implements :class:`Identified`.
+ This allows multiple instances (e.g. Engine or Pool instances)
+ to share a logger, but have its verbosity controlled on a
+ per-instance basis.
+
+ The basic functionality is to return a logging level
+ which is based on an instance's echo setting.
- Warning: this is an expensive call which also results in a permanent
- increase in memory overhead for each call. Use only for
- low-volume, long-time-spanning objects.
+ Default implementation is:
+ 'debug' -> logging.DEBUG
+ True -> logging.INFO
+ False -> Effective level of underlying logger
+ (logging.WARNING by default)
+ None -> same as False
"""
- name = "%s.%s.%s" % (instance.__class__.__module__,
- instance.__class__.__name__, instance.logging_name)
+ # Map echo settings to logger levels
+ _echo_map = {
+ None: logging.NOTSET,
+ False: logging.NOTSET,
+ True: logging.INFO,
+ 'debug': logging.DEBUG,
+ }
+
+ def __init__(self, echo, name):
+ self.echo = echo
+ self.logger = logging.getLogger(name)
+
+ # if echo flag is enabled and no handlers,
+ # add a handler to the list
+ if self._echo_map[echo] <= logging.INFO \
+ and not self.logger.handlers:
+ _add_default_handler(self.logger)
+
+ #
+ # Boilerplate convenience methods
+ #
+ def debug(self, msg, *args, **kwargs):
+ """Delegate a debug call to the underlying logger."""
+
+ self.log(logging.DEBUG, msg, *args, **kwargs)
+
+ def info(self, msg, *args, **kwargs):
+ """Delegate an info call to the underlying logger."""
+
+ self.log(logging.INFO, msg, *args, **kwargs)
+
+ def warning(self, msg, *args, **kwargs):
+ """Delegate a warning call to the underlying logger."""
+
+ self.log(logging.WARNING, msg, *args, **kwargs)
+
+ warn = warning
+
+ def error(self, msg, *args, **kwargs):
+ """
+ Delegate an error call to the underlying logger.
+ """
+ self.log(logging.ERROR, msg, *args, **kwargs)
+
+ def exception(self, msg, *args, **kwargs):
+ """Delegate an exception call to the underlying logger."""
+
+ kwargs["exc_info"] = 1
+ self.log(logging.ERROR, msg, *args, **kwargs)
+
+ def critical(self, msg, *args, **kwargs):
+ """Delegate a critical call to the underlying logger."""
+
+ self.log(logging.CRITICAL, msg, *args, **kwargs)
+
+ def log(self, level, msg, *args, **kwargs):
+ """Delegate a log call to the underlying logger.
+
+ The level here is determined by the echo
+ flag as well as that of the underlying logger, and
+ logger._log() is called directly.
+
+ """
+
+ # inline the logic from isEnabledFor(),
+ # getEffectiveLevel(), to avoid overhead.
+
+ if self.logger.manager.disable >= level:
+ return
+
+ selected_level = self._echo_map[self.echo]
+ if selected_level == logging.NOTSET:
+ selected_level = self.logger.getEffectiveLevel()
+
+ if level >= selected_level:
+ self.logger._log(level, msg, args, **kwargs)
+
+ def isEnabledFor(self, level):
+ """Is this logger enabled for level 'level'?"""
+
+ if self.logger.manager.disable >= level:
+ return False
+ return level >= self.getEffectiveLevel()
+
+ def getEffectiveLevel(self):
+ """What's the effective level for this logger?"""
+
+ level = self._echo_map[self.echo]
+ if level == logging.NOTSET:
+ level = self.logger.getEffectiveLevel()
+ return level
+
+def instance_logger(instance, echoflag=None):
+ """create a logger for an instance that implements :class:`Identified`."""
+
+ if instance.logging_name:
+ name = "%s.%s.%s" % (instance.__class__.__module__,
+ instance.__class__.__name__, instance.logging_name)
+ else:
+ name = "%s.%s" % (instance.__class__.__module__,
+ instance.__class__.__name__)
+
+ instance._echo = echoflag
- if echoflag is not None:
- l = logging.getLogger(name)
- if echoflag == 'debug':
- default_logging(name)
- l.setLevel(logging.DEBUG)
- elif echoflag is True:
- default_logging(name)
- l.setLevel(logging.INFO)
- elif echoflag is False:
- l.setLevel(logging.WARN)
+ if echoflag in (False, None):
+ # if no echo setting or False, return a Logger directly,
+ # avoiding overhead of filtering
+ logger = logging.getLogger(name)
else:
- l = logging.getLogger(name)
- instance._should_log_debug = lambda: l.isEnabledFor(logging.DEBUG)
- instance._should_log_info = lambda: l.isEnabledFor(logging.INFO)
- return l
-
+ # if a specified echo flag, return an EchoLogger,
+ # which checks the flag, overrides normal log
+ # levels by calling logger._log()
+ logger = InstanceLogger(echoflag, name)
+
+ instance.logger = logger
+
class echo_property(object):
__doc__ = """\
When ``True``, enable log output for this element.
if instance is None:
return self
else:
- return instance._should_log_debug() and 'debug' or \
- (instance._should_log_info() and True or False)
+ return instance._echo
def __set__(self, instance, value):
instance_logger(instance, echoflag=value)
assert len(cache) == 1
eq_(conn.execute("select count(*) from users").scalar(), 3)
-class LogTest(TestBase):
- def _test_logger(self, eng, eng_name, pool_name):
- buf = logging.handlers.BufferingHandler(100)
- logs = [
+class LoggingNameTest(TestBase):
+ def _assert_names_in_execute(self, eng, eng_name, pool_name):
+ eng.execute(select([1]))
+ for name in [b.name for b in self.buf.buffer]:
+ assert name in (
+ 'sqlalchemy.engine.base.Engine.%s' % eng_name,
+ 'sqlalchemy.pool.%s.%s' %
+ (eng.pool.__class__.__name__, pool_name)
+ )
+
+ def _assert_no_name_in_execute(self, eng):
+ eng.execute(select([1]))
+ for name in [b.name for b in self.buf.buffer]:
+ assert name in (
+ 'sqlalchemy.engine.base.Engine',
+ 'sqlalchemy.pool.%s' % eng.pool.__class__.__name__
+ )
+
+ def _named_engine(self, **kw):
+ options = {
+ 'logging_name':'myenginename',
+ 'pool_logging_name':'mypoolname'
+ }
+ options.update(kw)
+ return engines.testing_engine(options=options)
+
+ def _unnamed_engine(self, **kw):
+ return engines.testing_engine(options=kw)
+
+ def setup(self):
+ self.buf = logging.handlers.BufferingHandler(100)
+ for log in [
+ logging.getLogger('sqlalchemy.engine'),
+ logging.getLogger('sqlalchemy.pool')
+ ]:
+ log.addHandler(self.buf)
+
+ def teardown(self):
+ for log in [
logging.getLogger('sqlalchemy.engine'),
logging.getLogger('sqlalchemy.pool')
- ]
- for log in logs:
- log.addHandler(buf)
+ ]:
+ log.removeHandler(self.buf)
- eq_(eng.logging_name, eng_name)
- eq_(eng.pool.logging_name, pool_name)
+ def test_named_logger_names(self):
+ eng = self._named_engine()
+ eq_(eng.logging_name, "myenginename")
+ eq_(eng.pool.logging_name, "mypoolname")
+
+ def test_named_logger_names_after_dispose(self):
+ eng = self._named_engine()
+ eng.execute(select([1]))
+ eng.dispose()
+ eq_(eng.logging_name, "myenginename")
+ eq_(eng.pool.logging_name, "mypoolname")
+
+ def test_unnamed_logger_names(self):
+ eng = self._unnamed_engine()
+ eq_(eng.logging_name, None)
+ eq_(eng.pool.logging_name, None)
+
+ def test_named_logger_execute(self):
+ eng = self._named_engine()
+ self._assert_names_in_execute(eng, "myenginename", "mypoolname")
+
+ def test_named_logger_echoflags_execute(self):
+ eng = self._named_engine(echo='debug', echo_pool='debug')
+ self._assert_names_in_execute(eng, "myenginename", "mypoolname")
+
+ def test_named_logger_execute_after_dispose(self):
+ eng = self._named_engine()
eng.execute(select([1]))
- for log in logs:
- log.removeHandler(buf)
+ eng.dispose()
+ self._assert_names_in_execute(eng, "myenginename", "mypoolname")
+
+ def test_unnamed_logger_execute(self):
+ eng = self._unnamed_engine()
+ self._assert_no_name_in_execute(eng)
+
+ def test_unnamed_logger_echoflags_execute(self):
+ eng = self._unnamed_engine(echo='debug', echo_pool='debug')
+ self._assert_no_name_in_execute(eng)
+
+class EchoTest(TestBase):
+
+ def setup(self):
+ self.level = logging.getLogger('sqlalchemy.engine').level
+ logging.getLogger('sqlalchemy.engine').setLevel(logging.WARN)
+ self.buf = logging.handlers.BufferingHandler(100)
+ logging.getLogger('sqlalchemy.engine').addHandler(self.buf)
+
+ def teardown(self):
+ logging.getLogger('sqlalchemy.engine').removeHandler(self.buf)
+ logging.getLogger('sqlalchemy.engine').setLevel(self.level)
+
+ def testing_engine(self):
+ e = engines.testing_engine()
- names = set([b.name for b in buf.buffer])
- assert 'sqlalchemy.engine.base.Engine.%s' % (eng_name,) in names
- assert 'sqlalchemy.pool.%s.%s' % (eng.pool.__class__.__name__,
- pool_name) in names
+ # do an initial execute to clear out 'first connect'
+ # messages
+ e.execute("select 10")
+ self.buf.flush()
- def test_named_logger(self):
- options = {'echo':'debug', 'echo_pool':'debug',
- 'logging_name':'myenginename',
- 'pool_logging_name':'mypoolname'
- }
- eng = engines.testing_engine(options=options)
- self._test_logger(eng, "myenginename", "mypoolname")
+ return e
- eng.dispose()
- self._test_logger(eng, "myenginename", "mypoolname")
+ def test_levels(self):
+ e1 = engines.testing_engine()
+
+ eq_(e1._should_log_info(), False)
+ eq_(e1._should_log_debug(), False)
+ eq_(e1.logger.isEnabledFor(logging.INFO), False)
+ eq_(e1.logger.getEffectiveLevel(), logging.WARN)
+
+ e1.echo = True
+ eq_(e1._should_log_info(), True)
+ eq_(e1._should_log_debug(), False)
+ eq_(e1.logger.isEnabledFor(logging.INFO), True)
+ eq_(e1.logger.getEffectiveLevel(), logging.INFO)
+
+ e1.echo = 'debug'
+ eq_(e1._should_log_info(), True)
+ eq_(e1._should_log_debug(), True)
+ eq_(e1.logger.isEnabledFor(logging.DEBUG), True)
+ eq_(e1.logger.getEffectiveLevel(), logging.DEBUG)
+
+ e1.echo = False
+ eq_(e1._should_log_info(), False)
+ eq_(e1._should_log_debug(), False)
+ eq_(e1.logger.isEnabledFor(logging.INFO), False)
+ eq_(e1.logger.getEffectiveLevel(), logging.WARN)
+ def test_echo_flag_independence(self):
+ """test the echo flag's independence to a specific engine."""
- def test_unnamed_logger(self):
- eng = engines.testing_engine(options={'echo': 'debug',
- 'echo_pool': 'debug'})
- self._test_logger(
- eng,
- "0x...%s" % hex(id(eng))[-4:],
- "0x...%s" % hex(id(eng.pool))[-4:],
- )
+ e1 = self.testing_engine()
+ e2 = self.testing_engine()
+
+ e1.echo = True
+ e1.execute(select([1]))
+ e2.execute(select([2]))
+
+ e1.echo = False
+ e1.execute(select([3]))
+ e2.execute(select([4]))
+ e2.echo = True
+ e1.execute(select([5]))
+ e2.execute(select([6]))
+
+ assert self.buf.buffer[0].getMessage().startswith("SELECT 1")
+ assert self.buf.buffer[2].getMessage().startswith("SELECT 6")
+ assert len(self.buf.buffer) == 4
+
class ResultProxyTest(TestBase):
def test_nontuple_row(self):
"""ensure the C version of BaseRowProxy handles