0.2.9
+- logging is now implemented via standard python "logging" module.
+"echo" keyword parameters are still functional but set/unset
+log levels for their respective classes/instances. all logging
+can be controlled directly through the Python API by setting
+INFO and DEBUG levels for loggers in the "sqlalchemy" namespace.
+class-level logging is under "sqlalchemy.<module>.<classname>",
+instance-level logging under "sqlalchemy.<module>.<classname>.<hexid>".
+Test suite includes "--log-info" and "--log-debug" arguments
+which work independently of --verbose/--quiet. Logging added
+to orm to allow tracking of mapper configurations, row iteration.
- updates to MS-SQL driver:
-- fixes bug 261 (table reflection broken for MS-SQL case-sensitive databases)
-- can now specify port for pymssql
mysql, oracle, oracle8, mssql)
--mockpool use mock pool
--verbose full debug echoing
- --noecho Disable SQL statement echoing
+ --log-info=LOG_INFO turn on info logging for <LOG> (multiple OK)
+ --log-debug=LOG_DEBUG
+ turn on debug logging for <LOG> (multiple OK)
--quiet be totally quiet
--nothreadlocal dont use thread-local mod
--enginestrategy=ENGINESTRATEGY
python test/orm/mapper.py MapperTest.testget
+Logging is now available via Python's logging package. Any area of SQLAlchemy can be logged
+through the unittest interface, such as:
+Log mapper configuration, connection pool checkouts, and SQL statement execution:
+
+ python test/orm/unitofwork.py --log-info=sqlalchemy.orm.mapper --log-debug=sqlalchemy.pool --log-info=sqlalchemy.engine
-from sqlalchemy import exceptions, sql, schema, util, types
+from sqlalchemy import exceptions, sql, schema, util, types, logging
import StringIO, sys, re
class ConnectionProvider(object):
def in_transaction(self):
return self.__transaction is not None
def _begin_impl(self):
- if self.__engine.echo:
- self.__engine.log("BEGIN")
+ self.__engine.logger.info("BEGIN")
self.__engine.dialect.do_begin(self.connection)
def _rollback_impl(self):
- if self.__engine.echo:
- self.__engine.log("ROLLBACK")
+ self.__engine.logger.info("ROLLBACK")
self.__engine.dialect.do_rollback(self.connection)
self.__connection.close_open_cursors()
self.__transaction = None
def _commit_impl(self):
- if self.__engine.echo:
- self.__engine.log("COMMIT")
+ self.__engine.logger.info("COMMIT")
self.__engine.dialect.do_commit(self.connection)
self.__transaction = None
def _autocommit(self, statement):
return self.__engine.dialect.get_default_schema_name(self)
def run_callable(self, callable_):
return callable_(self)
- def _execute_raw(self, statement, parameters=None, cursor=None, echo=None, context=None, **kwargs):
+ def _execute_raw(self, statement, parameters=None, cursor=None, context=None, **kwargs):
if cursor is None:
cursor = self.connection.cursor()
try:
- if echo is True or self.__engine.echo is not False:
- self.__engine.log(statement)
- self.__engine.log(repr(parameters))
+ self.__engine.logger.info(statement)
+ self.__engine.logger.info(repr(parameters))
if parameters is not None and isinstance(parameters, list) and len(parameters) > 0 and (isinstance(parameters[0], list) or isinstance(parameters[0], dict)):
self._executemany(cursor, statement, parameters, context=context)
else:
Connects a ConnectionProvider, a Dialect and a CompilerFactory together to
provide a default implementation of SchemaEngine.
"""
- def __init__(self, connection_provider, dialect, echo=False, logger=None, **kwargs):
+ def __init__(self, connection_provider, dialect, echo=None, **kwargs):
self.connection_provider = connection_provider
self.dialect=dialect
self.echo = echo
- self.logger = logger or util.Logger(origin='engine')
+ self.logger = logging.instance_logger(self)
name = property(lambda s:sys.modules[s.dialect.__module__].descriptor()['name'])
engine = property(lambda s:s)
-
+ echo = logging.echo_property()
+
def dispose(self):
self.connection_provider.dispose()
def create(self, entity, connection=None, **kwargs):
def log(self, msg):
"""logs a message using this SQLEngine's logger stream."""
- self.logger.write(msg)
+ self.logger.info(msg)
class ResultProxy:
"""wraps a DBAPI cursor object to provide access to row columns based on integer
self.rowcount = executioncontext.get_rowcount(cursor)
else:
self.rowcount = cursor.rowcount
- self.echo = engine.echo=="debug"
self.__key_cache = {}
+ self.__echo = engine.echo == 'debug'
metadata = cursor.description
self.props = {}
self.keys = []
"""fetch one row, just like DBAPI cursor.fetchone()."""
row = self.cursor.fetchone()
if row is not None:
- if self.echo: self.engine.log(repr(row))
+ if self.__echo:
+ self.engine.logger.debug("Row " + repr(row))
return RowProxy(self, row)
else:
# controversy! can we auto-close the cursor after results are consumed ?
row = self.cursor.fetchone()
try:
if row is not None:
- if self.echo: self.engine.log(repr(row))
+ if self.__echo:
+ self.engine.logger.debug("Row " + repr(row))
return row[0]
else:
return None
--- /dev/null
+# logging.py - adapt python logging module to SQLAlchemy
+# Copyright (C) 2006 Michael Bayer mike_mp@zzzcomputing.com
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+
+"""provides a few functions used by instances to turn on/off their logging, including support
+for the usual "echo" parameter. Control of logging for SA can be performed from the regular
+python logging module. The regular dotted module namespace is used, starting at 'sqlalchemy'.
+For class-level logging, the class name is appended, and for instance-level logging, the hex
+id of the instance is appended.
+
+The "echo" keyword parameter which is available on some SA objects corresponds to an instance-level
+logger for that instance.
+
+E.g.:
+
+ engine.echo = True
+
+is equivalent to:
+
+ import logging
+ logging.getLogger('sqlalchemy.engine.ComposedSQLEngine.%s' % hex(id(engine))).setLevel(logging.DEBUG)
+
+"""
+
+import sys
+
+# py2.5 absolute imports will fix....
+logging = __import__('logging')
+
+default_enabled = False
+def default_logging():
+ global default_enabled
+ if not default_enabled:
+ default_enabled = True
+ rootlogger = logging.getLogger('sqlalchemy')
+ rootlogger.setLevel(logging.NOTSET)
+ handler = logging.StreamHandler(sys.stdout)
+ handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(name)s %(message)s'))
+ rootlogger.addHandler(handler)
+
+def _get_instance_name(instance):
+ return instance.__class__.__module__ + "." + instance.__class__.__name__ + "." + hex(id(instance))
+
+def instance_logger(instance):
+ return logging.getLogger(_get_instance_name(instance))
+
+def class_logger(cls):
+ return logging.getLogger(cls.__module__ + "." + cls.__name__)
+
+def is_debug_enabled(logger):
+ return logger.isEnabledFor(logging.DEBUG)
+def is_info_enabled(logger):
+ return logger.isEnabledFor(logging.INFO)
+
+class echo_property(object):
+ level_map={logging.DEBUG : "debug", logging.NOTSET : False}
+ def __get__(self, instance, owner):
+ level = logging.getLogger(_get_instance_name(instance)).getEffectiveLevel()
+ return echo_property.level_map.get(level, True)
+ def __set__(self, instance, value):
+ if value:
+ default_logging()
+ logging.getLogger(_get_instance_name(instance)).setLevel(value == 'debug' and logging.DEBUG or logging.INFO)
+ else:
+ logging.getLogger(_get_instance_name(instance)).setLevel(logging.NOTSET)
+
+
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-from sqlalchemy import sql, schema, util, exceptions
+from sqlalchemy import sql, schema, util, exceptions, logging
from sqlalchemy import sql_util as sqlutil
import util as mapperutil
import sync
# calls to class_mapper() for the class_/entity name combination will return this
# mapper.
self._compile_class()
+
+ self.__log("constructed")
- #print self, "constructed"
# uncomment to compile at construction time (the old way)
# this will break mapper setups that arent declared in the order
# of dependency
#self.compile()
+ def __log(self, msg):
+ self.logger.info("(" + self.class_.__name__ + "|" + (self.entity_name is not None and "/%s" % self.entity_name or "") + (self.local_table and self.local_table.name or str(self.local_table)) + (not self._is_primary_mapper() and "|non-primary" or "") + ") " + msg)
+
+ def __log_debug(self, msg):
+ self.logger.debug("(" + self.class_.__name__ + "|" + (self.entity_name is not None and "/%s" % self.entity_name or "") + (self.local_table and self.local_table.name or str(self.local_table)) + (not self._is_primary_mapper() and "|non-primary" or "") + ") " + msg)
+
def _is_orphan(self, obj):
optimistic = has_identity(obj)
for (key,klass) in self.delete_orphans:
"""
if self.__is_compiled:
return self
- #print self, "_do_compile"
+ self.__log("_do_compile() started")
self.__is_compiled = True
self.__props_init = False
self._compile_extensions()
self._compile_tables()
self._compile_properties()
self._compile_selectable()
-# self._initialize_properties()
-
+ self.__log("_do_compile() complete")
return self
def _compile_extensions(self):
prop = ColumnProperty(column)
self.__props[column.key] = prop
prop.set_parent(self)
+ self.__log("adding ColumnProperty %s" % (column.key))
elif isinstance(prop, ColumnProperty):
prop.columns.append(column)
else:
def _initialize_properties(self):
"""calls the init() method on all MapperProperties attached to this mapper. this will incur the
compilation of related mappers."""
+ self.__log("_initialize_properties() started")
l = [(key, prop) for key, prop in self.__props.iteritems()]
for key, prop in l:
if getattr(prop, 'key', None) is None:
prop.init(key, self)
+ self.__log("_initialize_properties() complete")
self.__props_init = True
def _compile_selectable(self):
circular relationships, or overriding the parameters of auto-generated properties
such as backreferences."""
+ self.__log("_compile_property(%s, %s)" % (key, prop.__class__.__name__))
+
if not isinstance(prop, MapperProperty):
prop = self._create_prop_from_column(prop, skipmissing=skipmissing)
if prop is None:
def instances(self, cursor, session, *mappers, **kwargs):
"""given a cursor (ResultProxy) from an SQLEngine, returns a list of object instances
corresponding to the rows in the cursor."""
+ self.__log_debug("instances()")
self.compile()
limit = kwargs.get('limit', None)
offset = kwargs.get('offset', None)
prop = self._getpropbycolumn(column, raiseerror)
if prop is None:
return NO_ATTRIBUTE
+ #self.__log_debug("get column attribute '%s' from instance %s" % (column.key, mapperutil.instance_str(obj)))
return prop.getattr(obj)
def _setattrbycolumn(self, obj, column, value):
instance = self.extension.create_instance(self, session, row, imap, self.class_)
if instance is EXT_PASS:
instance = self._create_instance(session)
+ self.__log_debug("new instance %s identity %s" % (mapperutil.instance_str(instance), str(identitykey)))
imap[identitykey] = instance
isnew = True
else:
def select_text(self, text, **params):
"""deprecated. use Query instead."""
return self.query().select_text(text, **params)
+
+Mapper.logger = logging.class_logger(Mapper)
class MapperProperty(object):
"""an element attached to a Mapper that describes and assists in the loading and saving
well as relationships. also defines some MapperOptions that can be used with the
properties."""
-from sqlalchemy import sql, schema, util, attributes, exceptions, sql_util
+from sqlalchemy import sql, schema, util, attributes, exceptions, sql_util, logging
import sync
import mapper
import session as sessionlib
def do_init(self):
# establish a SmartProperty property manager on the object for this key
if self.is_primary():
- #print "regiser col on class %s key %s" % (parent.class_.__name__, key)
+ self.logger.info("register managed attribute %s on class %s" % (self.key, self.parent.class_.__name__))
sessionlib.attribute_manager.register_attribute(self.parent.class_, self.key, uselist=False, copy_function=lambda x: self.columns[0].type.copy_value(x), compare_function=lambda x,y:self.columns[0].type.compare_values(x,y), mutable_scalars=self.columns[0].type.is_mutable())
def execute(self, session, instance, row, identitykey, imap, isnew):
if isnew:
- #print "POPULATING OBJ", instance.__class__.__name__, "COL", self.columns[0]._label, "WITH DATA", row[self.columns[0]], "ROW IS A", row.__class__.__name__, "COL ID", id(self.columns[0])
+ self.logger.debug("populating %s with %s/%s" % (mapperutil.attribute_str(instance, self.key), row.__class__.__name__, self.columns[0].key))
# set a scalar object instance directly on the object,
# bypassing SmartProperty event handlers.
instance.__dict__[self.key] = row[self.columns[0]]
def __repr__(self):
return "ColumnProperty(%s)" % repr([str(c) for c in self.columns])
+ColumnProperty.logger = logging.class_logger(ColumnProperty)
+
class DeferredColumnProperty(ColumnProperty):
"""describes an object attribute that corresponds to a table column, which also
will "lazy load" its value from the table. this is per-column lazy loading."""
# establish a SmartProperty property manager on the object for this key,
# containing a callable to load in the attribute
if self.is_primary():
+ self.logger.info("register managed attribute %s on class %s" % (self.key, self.parent.class_.__name__))
sessionlib.attribute_manager.register_attribute(self.parent.class_, self.key, uselist=False, callable_=lambda i:self.setup_loader(i), copy_function=lambda x: self.columns[0].type.copy_value(x), compare_function=lambda x,y:self.columns[0].type.compare_values(x,y), mutable_scalars=self.columns[0].type.is_mutable())
def setup_loader(self, instance):
if not self.localparent.is_assigned(instance):
return mapper.object_mapper(instance).props[self.key].setup_loader(instance)
def lazyload():
+ self.logger.debug("deferred load %s group %s" % (mapperutil.attribute_str(instance, self.key), str(self.group)))
try:
pk = self.parent.pks_by_table[self.columns[0].table]
except KeyError:
sessionlib.attribute_manager.init_instance_attribute(instance, self.key, False, callable_=self.setup_loader(instance))
else:
sessionlib.attribute_manager.reset_instance_attribute(instance, self.key)
+
+DeferredColumnProperty.logger = logging.class_logger(DeferredColumnProperty)
mapper.ColumnProperty = ColumnProperty
def __str__(self):
return self.__class__.__name__ + " " + str(self.parent) + "->" + self.key + "->" + str(self.mapper)
+
def cascade_iterator(self, type, object, recursive):
if not type in self.cascade:
return
self.do_init_subclass()
def _register_attribute(self, class_, callable_=None):
+ self.logger.info("register managed %s attribute %s on class %s" % ((self.uselist and "list-holding" or "scalar"), self.key, self.parent.class_.__name__))
sessionlib.attribute_manager.register_attribute(class_, self.key, uselist = self.uselist, extension=self.attributeext, cascade=self.cascade, trackparent=True, callable_=callable_)
def _init_instance_attribute(self, instance, callable_=None):
else:
self.syncrules.compile(self.primaryjoin, parent_tables, target_tables)
+PropertyLoader.logger = logging.class_logger(PropertyLoader)
+
class LazyLoader(PropertyLoader):
def do_init_subclass(self):
(self.lazywhere, self.lazybinds, self.lazyreverse) = create_lazy_clause(self.parent.unjoined_table, self.primaryjoin, self.secondaryjoin, self.foreignkey)
return mapper.object_mapper(instance).props[self.key].setup_loader(instance)
def lazyload():
+ self.logger.debug("lazy load attribute %s on instance %s" % (self.key, mapperutil.instance_str(instance)))
params = {}
allparams = True
# if the instance wasnt loaded from the database, then it cannot lazy load
if isnew:
# new object instance being loaded from a result row
if not self.is_primary():
- #print "EXEC NON-PRIAMRY", repr(self.mapper.class_), self.key
+ self.logger.debug("set instance-level lazy loader on %s" % mapperutil.attribute_str(instance, self.key))
# we are not the primary manager for this attribute on this class - set up a per-instance lazyloader,
# which will override the clareset_instance_attributess-level behavior
self._init_instance_attribute(instance, callable_=self.setup_loader(instance))
else:
- #print "EXEC PRIMARY", repr(self.mapper.class_), self.key
+ self.logger.debug("set class-level lazy loader on %s" % mapperutil.attribute_str(instance, self.key))
# we are the primary manager for this attribute on this class - reset its per-instance attribute state,
# so that the class-level lazy loader is executed when next referenced on this instance.
# this usually is not needed unless the constructor of the object referenced the attribute before we got
# to load data into it.
sessionlib.attribute_manager.reset_instance_attribute(instance, self.key)
+LazyLoader.logger = logging.class_logger(LazyLoader)
+
def create_lazy_clause(table, primaryjoin, secondaryjoin, foreignkey):
binds = {}
reverse = {}
identity_key = self.eagermapper._row_identity_key(decorated_row)
except KeyError:
# else degrade to a lazy loader
+ self.logger.debug("degrade to lazy loader on %s" % mapperutil.attribute_str(instance, self.key))
LazyLoader.execute(self, session, instance, row, identitykey, imap, isnew)
return
if not self.uselist:
+ self.logger.debug("eagerload scalar instance on %s" % mapperutil.attribute_str(instance, self.key))
if isnew:
# set a scalar object instance directly on the parent object,
# bypassing SmartProperty event handlers.
self.eagermapper._instance(session, decorated_row, imap, None)
else:
if isnew:
+ self.logger.debug("initialize UniqueAppender on %s" % mapperutil.attribute_str(instance, self.key))
# call the SmartProperty's initialize() method to create a new, blank list
l = getattr(instance.__class__, self.key).initialize(instance)
# store it in the "scratch" area, which is local to this load operation.
imap['_scratch'][(instance, self.key)] = appender
result_list = imap['_scratch'][(instance, self.key)]
+ self.logger.debug("eagerload list instance on %s" % mapperutil.attribute_str(instance, self.key))
self.eagermapper._instance(session, decorated_row, imap, result_list)
def _create_decorator_row(self):
- class DecoratorDict(object):
+ class EagerRowAdapter(object):
def __init__(self, row):
self.row = row
def has_key(self, key):
map[parent] = c
map[parent._label] = c
map[parent.name] = c
- return DecoratorDict
+ return EagerRowAdapter
def _decorate_row(self, row):
# since the EagerLoader makes an Alias of its mapper's table,
self._create_eager_chain()
return self._row_decorator(row)
+EagerLoader.logger = logging.class_logger(EagerLoader)
+
class GenericOption(mapper.MapperOption):
"""a mapper option that can handle dotted property names,
descending down through the relations of a mapper until it
kwargs = util.constructor_args(oldprop)
mapper._compile_property(key, class_(**kwargs ))
+
class BackRef(object):
"""stores the name of a backreference property as well as options to
be used on the resulting PropertyLoader."""
self.hash_key = hash_key
_sessions[self.hash_key] = self
+ class _echo_uow(object):
+ def __get__(self, obj, owner):
+ return obj.uow.echo
+ def __set__(self, obj, value):
+ obj.uow.echo = value
+ echo_uow = _echo_uow()
+
def create_transaction(self, **kwargs):
"""returns a new SessionTransaction corresponding to an existing or new transaction.
if the transaction is new, the returned SessionTransaction will have commit control
dirty, or deleted and provides the capability to flush all those changes at once.
"""
-from sqlalchemy import attributes
-from sqlalchemy import util
+from sqlalchemy import attributes, util, logging
import sqlalchemy
from sqlalchemy.exceptions import *
import StringIO
self.new = util.Set() #OrderedSet()
self.deleted = util.Set()
+ self.logger = logging.instance_logger(self)
+
+ echo = logging.echo_property()
def _remove_deleted(self, obj):
if hasattr(obj, "_instance_key"):
# and organize a hierarchical dependency structure. it also handles
# communication with the mappers and relationships to fire off SQL
# and synchronize attributes between related objects.
+ echo = logging.is_info_enabled(self.logger)
+
flush_context = UOWTransaction(self, session)
# create the set of all objects we want to operate upon
trans.commit()
flush_context.post_exec()
-
class UOWTransaction(object):
"""handles the details of organizing and executing transaction tasks
self.tasks = {}
self.__modified = False
self.__is_executing = False
-
+ self.logger = logging.instance_logger(self)
+
# TODO: shouldnt be able to register stuff here that is not in the enclosing Session
def register_object(self, obj, isdelete = False, listonly = False, postupdate=False, post_update_cols=None, **kwargs):
"""adds an object to this UOWTransaction to be updated in the database.
self._mark_modified()
def execute(self, echo=False):
- #print "\n------------------\nEXECUTE"
- #for task in self.tasks.values():
- # print "\nTASK:", task
- # for obj in task.objects:
- # print "TASK OBJ:", obj
- # for elem in task.get_elements(polymorphic=True):
- # print "POLYMORPHIC TASK OBJ:", elem.obj
-
# insure that we have a UOWTask for every mapper that will be involved
# in the topological sort
[self.get_task_by_mapper(m) for m in self._get_noninheriting_mappers()]
- #print "-----------------------------"
# pre-execute dependency processors. this process may
# result in new tasks, objects and/or dependency processors being added,
# particularly with 'delete-orphan' cascade rules.
self.__modified = False
if echo:
if head is None:
- print "Task dump: None"
+ self.logger.info("Task dump: None")
else:
- print "Task dump:\n" + head.dump()
+ self.logger.info("Task dump:\n" + head.dump())
if head is not None:
head.execute(self)
#if self.__modified and head is not None:
# raise "Assertion failed ! new pre-execute dependency step should eliminate post-execute changes (except post_update stuff)."
if echo:
- print "\nExecute complete\n"
+ self.logger.info("Execute Complete")
def post_exec(self):
"""after an execute/flush is completed, all of the objects and lists that have
else:
result.append(sql.select([col(name, table) for name in colnames], from_obj=[table]))
return sql.union_all(*result).alias(aliasname)
+
+def instance_str(instance):
+ """return a string describing an instance"""
+ return instance.__class__.__name__ + "@" + hex(id(instance))
+
+def attribute_str(instance, attribute):
+ return instance_str(instance) + "." + attribute
\ No newline at end of file
except:
import pickle
-from sqlalchemy import util, exceptions
+from sqlalchemy import util, exceptions, logging
import sqlalchemy.queue as Queue
try:
for manager in proxies.values():
manager.close()
proxies.clear()
-
class Pool(object):
- def __init__(self, creator, recycle=-1, echo = False, use_threadlocal = True, logger=None):
+ def __init__(self, creator, recycle=-1, echo=None, use_threadlocal = True):
+ self.logger = logging.instance_logger(self)
self._threadconns = weakref.WeakValueDictionary()
self._creator = creator
self._recycle = recycle
self._use_threadlocal = use_threadlocal
self.echo = echo
- self._logger = logger or util.Logger(origin='pool')
+ echo = logging.echo_property()
def unique_connection(self):
return _ConnectionFairy(self).checkout()
raise NotImplementedError()
def log(self, msg):
- self._logger.write(msg)
+ self.logger.info(msg)
def dispose(self):
raise NotImplementedError()
- def __del__(self):
- pass
- # produces too much log garbage when apps end, due to python non-deterministic teardown
- #self.dispose()
class _ConnectionRecord(object):
def __init__(self, pool):
def __connect(self):
try:
self.starttime = time.time()
- return self.__pool._creator()
+ connection = self.__pool._creator()
+ self.__pool.log("Created new connection %s" % repr(connection))
+ return connection
except Exception, e:
self.__pool.log("Error on connect(): %s" % (str(e)))
raise
else:
return getattr(obj, self.key)
-class Logger(object):
- """defines various forms of logging"""
- def __init__(self, logger=None, usethreads=False, usetimestamp=True, origin=None):
- self.logger = logger or sys.stdout
- self.usethreads = usethreads
- self.usetimestamp = usetimestamp
- self.origin = origin
- def write(self, msg):
- if self.usetimestamp:
- t = time.time()
- ms = (t - long(t)) * 1000
- timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(t))
- timestamp = "[%s,%03d]" % (timestamp, ms)
- else:
- timestamp = None
- if self.origin:
- origin = "[%s]" % self.origin
- origin = "%-8s" % origin
- else:
- origin = None
- if self.usethreads:
- threadname = threading.currentThread().getName()
- threadname = "[" + threadname + ' '*(8-len(threadname)) + "]"
- else:
- threadname = None
- self.logger.write(string.join([s for s in (timestamp, threadname, origin) if s is not None]) + ": " + msg + "\n")
-
class OrderedProperties(object):
"""
An object that maintains the order in which attributes are set upon it.
import sqlalchemy.pool as pool
import sqlalchemy.exceptions as exceptions
+mcid = 1
class MockDBAPI(object):
def __init__(self):
self.throw_error = False
raise Exception("couldnt connect !")
return MockConnection()
class MockConnection(object):
+ def __init__(self):
+ global mcid
+ self.id = mcid
+ mcid += 1
def close(self):
pass
def cursor(self):
self._do_testqueuepool(useclose=True)
def _do_testqueuepool(self, useclose=False):
- p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = False, echo = False)
+ p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = False)
def status(pool):
tup = (pool.size(), pool.checkedin(), pool.overflow(), pool.checkedout())
self.assert_(status(p) == (3, 2, 0, 1))
def test_timeout(self):
- p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = 0, use_threadlocal = False, echo = False, timeout=2)
+ p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = 0, use_threadlocal = False, timeout=2)
c1 = p.get()
c2 = p.get()
c3 = p.get()
assert int(time.time() - now) == 2
def test_mixed_close(self):
- p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True, echo=True)
+ p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True)
c1 = p.connect()
c2 = p.connect()
assert c1 is c2
"""this is a "flaw" in the connection pool; since threadlocal uses a single ConnectionFairy per thread
with an open/close counter, you can fool the counter into giving you a ConnectionFairy with an
ambiguous counter. i.e. its not true reference counting."""
- p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True, echo=True)
+ p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True)
c1 = p.connect()
c2 = p.connect()
assert c1 is c2
self.assert_(p.checkedout() == 0)
def test_recycle(self):
- p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False, echo=True, recycle=3)
+ p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False, recycle=3)
c1 = p.connect()
c_id = id(c1.connection)
def test_invalidate(self):
dbapi = MockDBAPI()
- p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False, echo=True)
+ p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False)
c1 = p.connect()
- c_id = id(c1.connection)
+ c_id = c1.connection.id
c1.close(); c1=None
c1 = p.connect()
- assert id(c1.connection) == c_id
+ assert c1.connection.id == c_id
c1.invalidate()
c1 = None
c1 = p.connect()
- assert id(c1.connection) != c_id
+ assert c1.connection.id != c_id
def test_reconnect(self):
dbapi = MockDBAPI()
- p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False, echo=True)
+ p = pool.QueuePool(creator = lambda: dbapi.connect('foo.db'), pool_size = 1, max_overflow = 0, use_threadlocal = False)
c1 = p.connect()
- c_id = id(c1.connection)
+ c_id = c1.connection.id
c1.close(); c1=None
c1 = p.connect()
- assert id(c1.connection) == c_id
+ assert c1.connection.id == c_id
dbapi.raise_error = True
c1.invalidate()
c1 = None
c1 = p.connect()
- assert id(c1.connection) != c_id
+ assert c1.connection.id != c_id
def testthreadlocal_del(self):
self._do_testthreadlocal(useclose=False)
def _do_testthreadlocal(self, useclose=False):
for p in (
- pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True, echo = True),
+ pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True),
pool.SingletonThreadPool(creator = lambda: mock_dbapi.connect('foo.db'), use_threadlocal = True)
):
c1 = p.connect()
if __name__ == "__main__":
- unittest.main()
+ testbase.main()
class TLTransactionTest(testbase.PersistTest):
def setUpAll(self):
global users, metadata, tlengine
- tlengine = create_engine(testbase.db_uri, strategy='threadlocal', echo=True)
+ tlengine = create_engine(testbase.db_uri, strategy='threadlocal')
metadata = MetaData()
users = Table('query_users', metadata,
Column('user_id', INT, primary_key = True),
def setUpAll(self):
global ctx, data
- ctx = SessionContext(lambda: create_session(echo_uow=True))
+ ctx = SessionContext(lambda: create_session( ))
tables.create()
mapper(tables.User, tables.users, properties = dict(
address = relation(mapper(tables.Address, tables.addresses), lazy = False, uselist = False, private = True),
})
a = C1('head c1')
a.c1s.append(C1('another c1'))
- sess = create_session(echo_uow=False)
+ sess = create_session( )
sess.save(a)
sess.flush()
sess.delete(a)
a.c1s[0].c1s.append(C1('subchild2'))
a.c1s[1].c2s.append(C2('child2 data1'))
a.c1s[1].c2s.append(C2('child2 data2'))
- sess = create_session(echo_uow=False)
+ sess = create_session( )
sess.save(a)
sess.flush()
)
)
- session = create_session(echo_uow=False)
+ session = create_session( )
a1 = Assembly(name='a1')
)
)
- session = create_session(echo_uow=False)
+ session = create_session( )
s = SpecLine(slave=Product(name='p1'))
s2 = SpecLine(slave=Detail(name='d1'))
raster_document_mapper = mapper(RasterDocument, inherits=document_mapper,
polymorphic_identity='raster_document')
- session = create_session(echo_uow=False)
+ session = create_session( )
a1 = Assembly(name='a1')
a1.documents.append(RasterDocument('doc2'))
print [x for x in session]
session.clear()
- session.echo_uow=True
session.flush()
session.clear()
p = session.query(Publication).selectone_by(name='Test')
class EagerTest(MapperSuperTest):
def testbasic(self):
- testbase.db.echo="debug"
"""tests a basic one-to-many eager load"""
m = mapper(Address, addresses)
])
def do_testlist(self, classes):
- sess = create_session(echo_uow=False)
+ sess = create_session( )
# create objects in a linked list
count = 1
import unittest, sys, datetime
db = testbase.db
-#db.echo_uow=True
+#db.
from sqlalchemy import *
class OrphanDeletionTest(AssertMixin):
def setUpAll(self):
- db.echo = False
tables.create()
tables.data()
- db.echo = testbase.echo
def tearDownAll(self):
- db.echo = False
tables.drop()
- db.echo = testbase.echo
def tearDown(self):
clear_mappers()
def setUp(self):
items=relation(itemMapper, cascade="all,delete-orphan", backref="order")
))
- s = create_session(echo_uow=True)
+ s = create_session( )
order = Order()
s.save(order)
u[0].addresses[0].email_address='hi'
# insure that upon commit, the new mapper with the address relation is used
- ctx.current.echo_uow=True
self.assert_sql(db, lambda: ctx.current.flush(),
[
(
def setUp(self):
global metadata
metadata = BoundMetaData(testbase.db)
- self.echo = testbase.db.echo
- self.logger = testbase.db.logger
def tearDown(self):
- testbase.db.echo = self.echo
- testbase.db.logger = testbase.db.engine.logger = self.logger
metadata.drop_all()
def test_constraint(self):
def test_index_create_inline(self):
"""Test indexes defined with tables"""
- capt = []
- class dummy:
- pass
- stream = dummy()
- stream.write = capt.append
- testbase.db.logger = testbase.db.engine.logger = stream
events = Table('events', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(30), unique=True),
assert 'idx_winners' in index_names
assert len(index_names) == 4
- events.create()
+ capt = []
+ connection = testbase.db.connect()
+ def proxy(statement, parameters):
+ capt.append(statement)
+ capt.append(repr(parameters))
+ connection.proxy(statement, parameters)
+ schemagen = testbase.db.dialect.schemagenerator(testbase.db, proxy)
+ events.accept_schema_visitor(schemagen)
+
+ assert capt[0].strip().startswith('CREATE TABLE events')
+ assert capt[2].strip() == \
+ 'CREATE UNIQUE INDEX ux_events_name ON events (name)'
+ assert capt[4].strip() == \
+ 'CREATE INDEX ix_events_location ON events (location)'
+ assert capt[6].strip() == \
+ 'CREATE UNIQUE INDEX sport_announcer ON events (sport, announcer)'
+ assert capt[8].strip() == \
+ 'CREATE INDEX idx_winners ON events (winner)'
# verify that the table is functional
events.insert().execute(id=1, name='hockey finals', location='rink',
winner='sweden')
ss = events.select().execute().fetchall()
- assert capt[0].strip().startswith('CREATE TABLE events')
- assert capt[3].strip() == \
- 'CREATE UNIQUE INDEX ux_events_name ON events (name)'
- assert capt[6].strip() == \
- 'CREATE INDEX ix_events_location ON events (location)'
- assert capt[9].strip() == \
- 'CREATE UNIQUE INDEX sport_announcer ON events (sport, announcer)'
- assert capt[12].strip() == \
- 'CREATE INDEX idx_winners ON events (winner)'
if __name__ == "__main__":
testbase.main()
def write(self, msg):
if echo:
local_stdout.write(msg)
+ def flush(self):
+ pass
sys.stdout = Logger()
def echo_text(text):
parser.add_option("--db", action="store", dest="db", default="sqlite", help="prefab database uri (sqlite, sqlite_file, postgres, mysql, oracle, oracle8, mssql)")
parser.add_option("--mockpool", action="store_true", dest="mockpool", help="use mock pool")
parser.add_option("--verbose", action="store_true", dest="verbose", help="full debug echoing")
- parser.add_option("--noecho", action="store_true", dest="noecho", help="Disable SQL statement echoing")
parser.add_option("--quiet", action="store_true", dest="quiet", help="be totally quiet")
+ parser.add_option("--log-info", action="append", dest="log_info", help="turn on info logging for <LOG> (multiple OK)")
+ parser.add_option("--log-debug", action="append", dest="log_debug", help="turn on debug logging for <LOG> (multiple OK)")
parser.add_option("--nothreadlocal", action="store_true", dest="nothreadlocal", help="dont use thread-local mod")
parser.add_option("--enginestrategy", action="store", default=None, dest="enginestrategy", help="engine strategy (plain or threadlocal, defaults to SA default)")
if options.enginestrategy is not None:
opts['strategy'] = options.enginestrategy
if options.mockpool:
- db = engine.create_engine(db_uri, echo=(not options.noecho), default_ordering=True, poolclass=MockPool, **opts)
+ db = engine.create_engine(db_uri, default_ordering=True, poolclass=MockPool, **opts)
else:
- db = engine.create_engine(db_uri, echo=(not options.noecho), default_ordering=True, **opts)
+ db = engine.create_engine(db_uri, default_ordering=True, **opts)
db = EngineAssert(db)
+
+ import logging
+ logging.basicConfig()
+ if options.log_info is not None:
+ for elem in options.log_info:
+ logging.getLogger(elem).setLevel(logging.INFO)
+ if options.log_debug is not None:
+ for elem in options.log_debug:
+ logging.getLogger(elem).setLevel(logging.DEBUG)
metadata = sqlalchemy.BoundMetaData(db)
def unsupported(*dbs):
def start_session():
"""creates a new session for the start of a request."""
- trans.session = create_session(bind_to=zblog.database.engine, echo_uow=False)
+ trans.session = create_session(bind_to=zblog.database.engine )
def session():
return trans.session