outside of "sqlalchemy" and under "test/".
Rationale:
- coverage plugin works without issue, without need for an awkward
additional package install
- command line for "nosetests" isn't polluted with SQLAlchemy options
[ticket:1949]
tag_build = dev
[nosetests]
--with-sqlalchemy = true
++with-_sqlalchemy = true
exclude = ^examples
first-package-wins = true
import sqlalchemy.topological as topological
- from sqlalchemy.test import TestBase
- from sqlalchemy.test.testing import assert_raises, eq_
- from sqlalchemy.test.util import conforms_partial_ordering
+ from test.lib import TestBase
+ from test.lib.testing import assert_raises, eq_
++from test.lib.util import conforms_partial_ordering
from sqlalchemy import exc, util
--- /dev/null
- from sqlalchemy.test.testing import TestBase, eq_, assert_raises
+"""Test event registration and listening."""
+
++from test.lib.testing import TestBase, eq_, assert_raises
+from sqlalchemy import event, exc, util
+
+class TestEvents(TestBase):
+ """Test class- and instance-level event registration."""
+
+ def setUp(self):
+ global Target
+
+ assert 'on_event_one' not in event._registrars
+ assert 'on_event_two' not in event._registrars
+
+ class TargetEvents(event.Events):
+ def on_event_one(self, x, y):
+ pass
+
+ def on_event_two(self, x):
+ pass
+
+ class Target(object):
+ dispatch = event.dispatcher(TargetEvents)
+
+ def tearDown(self):
+ event._remove_dispatcher(Target.__dict__['dispatch'].events)
+
+ def test_register_class(self):
+ def listen(x, y):
+ pass
+
+ event.listen(listen, "on_event_one", Target)
+
+ eq_(len(Target().dispatch.on_event_one), 1)
+ eq_(len(Target().dispatch.on_event_two), 0)
+
+ def test_register_instance(self):
+ def listen(x, y):
+ pass
+
+ t1 = Target()
+ event.listen(listen, "on_event_one", t1)
+
+ eq_(len(Target().dispatch.on_event_one), 0)
+ eq_(len(t1.dispatch.on_event_one), 1)
+ eq_(len(Target().dispatch.on_event_two), 0)
+ eq_(len(t1.dispatch.on_event_two), 0)
+
+ def test_register_class_instance(self):
+ def listen_one(x, y):
+ pass
+
+ def listen_two(x, y):
+ pass
+
+ event.listen(listen_one, "on_event_one", Target)
+
+ t1 = Target()
+ event.listen(listen_two, "on_event_one", t1)
+
+ eq_(len(Target().dispatch.on_event_one), 1)
+ eq_(len(t1.dispatch.on_event_one), 2)
+ eq_(len(Target().dispatch.on_event_two), 0)
+ eq_(len(t1.dispatch.on_event_two), 0)
+
+ def listen_three(x, y):
+ pass
+
+ event.listen(listen_three, "on_event_one", Target)
+ eq_(len(Target().dispatch.on_event_one), 2)
+ eq_(len(t1.dispatch.on_event_one), 3)
+
+class TestAcceptTargets(TestBase):
+ """Test default target acceptance."""
+
+ def setUp(self):
+ global TargetOne, TargetTwo
+
+ class TargetEventsOne(event.Events):
+ def on_event_one(self, x, y):
+ pass
+
+ class TargetEventsTwo(event.Events):
+ def on_event_one(self, x, y):
+ pass
+
+ class TargetOne(object):
+ dispatch = event.dispatcher(TargetEventsOne)
+
+ class TargetTwo(object):
+ dispatch = event.dispatcher(TargetEventsTwo)
+
+ def tearDown(self):
+ event._remove_dispatcher(TargetOne.__dict__['dispatch'].events)
+ event._remove_dispatcher(TargetTwo.__dict__['dispatch'].events)
+
+ def test_target_accept(self):
+ """Test that events of the same name are routed to the correct
+ collection based on the type of target given.
+
+ """
+ def listen_one(x, y):
+ pass
+
+ def listen_two(x, y):
+ pass
+
+ def listen_three(x, y):
+ pass
+
+ def listen_four(x, y):
+ pass
+
+ event.listen(listen_one, "on_event_one", TargetOne)
+ event.listen(listen_two, "on_event_one", TargetTwo)
+
+ eq_(
+ list(TargetOne().dispatch.on_event_one),
+ [listen_one]
+ )
+
+ eq_(
+ list(TargetTwo().dispatch.on_event_one),
+ [listen_two]
+ )
+
+ t1 = TargetOne()
+ t2 = TargetTwo()
+
+ event.listen(listen_three, "on_event_one", t1)
+ event.listen(listen_four, "on_event_one", t2)
+
+ eq_(
+ list(t1.dispatch.on_event_one),
+ [listen_one, listen_three]
+ )
+
+ eq_(
+ list(t2.dispatch.on_event_one),
+ [listen_two, listen_four]
+ )
+
+class TestCustomTargets(TestBase):
+ """Test custom target acceptance."""
+
+ def setUp(self):
+ global Target
+
+ class TargetEvents(event.Events):
+ @classmethod
+ def accept_with(cls, target):
+ if target == 'one':
+ return Target
+ else:
+ return None
+
+ def on_event_one(self, x, y):
+ pass
+
+ class Target(object):
+ dispatch = event.dispatcher(TargetEvents)
+
+ def tearDown(self):
+ event._remove_dispatcher(Target.__dict__['dispatch'].events)
+
+ def test_indirect(self):
+ def listen(x, y):
+ pass
+
+ event.listen(listen, "on_event_one", "one")
+
+ eq_(
+ list(Target().dispatch.on_event_one),
+ [listen]
+ )
+
+ assert_raises(
+ exc.InvalidRequestError,
+ event.listen,
+ listen, "on_event_one", Target
+ )
+
+class TestListenOverride(TestBase):
+ """Test custom listen functions which change the listener function signature."""
+
+ def setUp(self):
+ global Target
+
+ class TargetEvents(event.Events):
+ @classmethod
+ def listen(cls, fn, identifier, target, add=False):
+ if add:
+ def adapt(x, y):
+ fn(x + y)
+ else:
+ adapt = fn
+
+ event.Events.listen(adapt, identifier, target)
+
+ def on_event_one(self, x, y):
+ pass
+
+ class Target(object):
+ dispatch = event.dispatcher(TargetEvents)
+
+ def tearDown(self):
+ event._remove_dispatcher(Target.__dict__['dispatch'].events)
+
+ def test_listen_override(self):
+ result = []
+ def listen_one(x):
+ result.append(x)
+
+ def listen_two(x, y):
+ result.append((x, y))
+
+ event.listen(listen_one, "on_event_one", Target, add=True)
+ event.listen(listen_two, "on_event_one", Target)
+
+ t1 = Target()
+ t1.dispatch.on_event_one(5, 7)
+ t1.dispatch.on_event_one(10, 5)
+
+ eq_(result,
+ [
+ 12, (5, 7), 15, (10, 5)
+ ]
+ )
+
+class TestPropagate(TestBase):
+ def setUp(self):
+ global Target
+
+ class TargetEvents(event.Events):
+ def on_event_one(self, arg):
+ pass
+
+ def on_event_two(self, arg):
+ pass
+
+ class Target(object):
+ dispatch = event.dispatcher(TargetEvents)
+
+
+ def test_propagate(self):
+ result = []
+ def listen_one(target, arg):
+ result.append((target, arg))
+
+ def listen_two(target, arg):
+ result.append((target, arg))
+
+ t1 = Target()
+
+ event.listen(listen_one, "on_event_one", t1, propagate=True)
+ event.listen(listen_two, "on_event_two", t1)
+
+ t2 = Target()
+
+ t2.dispatch.update(t1.dispatch)
+
+ t2.dispatch.on_event_one(t2, 1)
+ t2.dispatch.on_event_two(t2, 2)
+ eq_(result, [(t2, 1)])
Handles the setup and extra properties required for testing SQLAlchemy
"""
enabled = True
-- name = 'sqlalchemy'
++
++ # nose 1.0 will allow us to replace the old "sqlalchemy" plugin,
++ # if installed, using the same name, but nose 1.0 isn't released yet...
++ name = '_sqlalchemy'
score = 100
def options(self, parser, env=os.environ):
# end Py2K
from sqlalchemy import *
-from sqlalchemy import sql, exc, schema, types as sqltypes
+from sqlalchemy import sql, exc, schema, types as sqltypes, event
from sqlalchemy.dialects.mysql import base as mysql
- from sqlalchemy.test.testing import eq_
- from sqlalchemy.test import *
- from sqlalchemy.test.engines import utf8_engine
+ from test.lib.testing import eq_
+ from test.lib import *
+ from test.lib.engines import utf8_engine
import datetime
class TypesTest(TestBase, AssertsExecutionResults, AssertsCompiledSQL):
assert_raises_message
import datetime
from sqlalchemy import *
-from sqlalchemy import exc, sql, schema
+from sqlalchemy import exc, sql, schema, pool
from sqlalchemy.dialects.sqlite import base as sqlite, \
pysqlite as pysqlite_dialect
- from sqlalchemy.test import *
+ from test.lib import *
class TestTypes(TestBase, AssertsExecutionResults):
from sqlalchemy.schema import DDL, CheckConstraint, AddConstraint, \
DropConstraint
from sqlalchemy import create_engine
-from sqlalchemy import MetaData, Integer, String
+from sqlalchemy import MetaData, Integer, String, event, exc, text
- from sqlalchemy.test.schema import Table
- from sqlalchemy.test.schema import Column
+ from test.lib.schema import Table
+ from test.lib.schema import Column
import sqlalchemy as tsa
- from sqlalchemy.test import TestBase, testing, engines
- from sqlalchemy.test.testing import AssertsCompiledSQL, eq_
+ from test.lib import TestBase, testing, engines
-from test.lib.testing import AssertsCompiledSQL
++from test.lib.testing import AssertsCompiledSQL, eq_
from nose import SkipTest
class DDLEventTest(TestBase):
import re
from sqlalchemy.interfaces import ConnectionProxy
from sqlalchemy import MetaData, Integer, String, INT, VARCHAR, func, \
- bindparam, select
+ bindparam, select, event
- from sqlalchemy.test.schema import Table, Column
+ from test.lib.schema import Table, Column
import sqlalchemy as tsa
- from sqlalchemy.test import TestBase, testing, engines
+ from test.lib import TestBase, testing, engines
import logging
from sqlalchemy.dialects.oracle.zxjdbc import ReturningParam
import threading, time
-from sqlalchemy import pool, interfaces, create_engine, select
+from sqlalchemy import pool, interfaces, create_engine, select, event
import sqlalchemy as tsa
- from sqlalchemy.test import TestBase, testing
- from sqlalchemy.test.util import gc_collect, lazy_gc
- from sqlalchemy.test.testing import eq_
+ from test.lib import TestBase, testing
+ from test.lib.util import gc_collect, lazy_gc
+ from test.lib.testing import eq_
mcid = 1
class MockDBAPI(object):
from sqlalchemy.ext import declarative as decl
from sqlalchemy import exc
import sqlalchemy as sa
- from sqlalchemy.test import testing
+ from test.lib import testing
from sqlalchemy import MetaData, Integer, String, ForeignKey, \
ForeignKeyConstraint, asc, Index
- from sqlalchemy.test.schema import Table, Column
+ from test.lib.schema import Table, Column
from sqlalchemy.orm import relationship, create_session, class_mapper, \
- joinedload, compile_mappers, backref, clear_mappers, \
+ joinedload, configure_mappers, backref, clear_mappers, \
polymorphic_union, deferred, column_property
- from sqlalchemy.test.testing import eq_
+ from test.lib.testing import eq_
from sqlalchemy.util import classproperty
from test.orm._base import ComparableEntity, MappedTest
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext import serializer
from sqlalchemy import exc
import sqlalchemy as sa
- from sqlalchemy.test import testing
+ from test.lib import testing
from sqlalchemy import MetaData, Integer, String, ForeignKey, select, \
desc, func, util
- from sqlalchemy.test.schema import Table
- from sqlalchemy.test.schema import Column
+ from test.lib.schema import Table
+ from test.lib.schema import Column
from sqlalchemy.orm import relationship, sessionmaker, scoped_session, \
- class_mapper, mapper, joinedload, compile_mappers, aliased
+ class_mapper, mapper, joinedload, configure_mappers, aliased
- from sqlalchemy.test.testing import eq_
+ from test.lib.testing import eq_
from test.orm._base import ComparableEntity, MappedTest
import re
class AssertRule(object):
++
def process_execute(self, clauseelement, *multiparams, **params):
pass
-- def process_cursor_execute(self, statement, parameters, context, executemany):
++ def process_cursor_execute(self, statement, parameters, context,
++ executemany):
pass
--
++
def is_consumed(self):
"""Return True if this rule has been consumed, False if not.
-- Should raise an AssertionError if this rule's condition has definitely failed.
++ Should raise an AssertionError if this rule's condition has
++ definitely failed.
"""
++
raise NotImplementedError()
--
++
def rule_passed(self):
-- """Return True if the last test of this rule passed, False if failed, None if no test was applied."""
--
++ """Return True if the last test of this rule passed, False if
++ failed, None if no test was applied."""
++
raise NotImplementedError()
--
++
def consume_final(self):
"""Return True if this rule has been consumed.
-- Should raise an AssertionError if this rule's condition has not been consumed or has failed.
++ Should raise an AssertionError if this rule's condition has not
++ been consumed or has failed.
"""
--
++
if self._result is None:
-- assert False, "Rule has not been consumed"
--
++ assert False, 'Rule has not been consumed'
return self.is_consumed()
class SQLMatchRule(AssertRule):
return True
class ExactSQL(SQLMatchRule):
++
def __init__(self, sql, params=None):
SQLMatchRule.__init__(self)
self.sql = sql
self.params = params
--
-- def process_cursor_execute(self, statement, parameters, context, executemany):
++
++ def process_cursor_execute(self, statement, parameters, context,
++ executemany):
if not context:
return
--
-- _received_statement = _process_engine_statement(context.unicode_statement, context)
++ _received_statement = \
++ _process_engine_statement(context.unicode_statement,
++ context)
_received_parameters = context.compiled_parameters
--
-- # TODO: remove this step once all unit tests
-- # are migrated, as ExactSQL should really be *exact* SQL
++
++ # TODO: remove this step once all unit tests are migrated, as
++ # ExactSQL should really be *exact* SQL
++
sql = _process_assertion_statement(self.sql, context)
--
equivalent = _received_statement == sql
if self.params:
if util.callable(self.params):
params = self.params(context)
else:
params = self.params
--
if not isinstance(params, list):
params = [params]
-- equivalent = equivalent and params == context.compiled_parameters
++ equivalent = equivalent and params \
++ == context.compiled_parameters
else:
params = {}
--
--
self._result = equivalent
if not self._result:
-- self._errmsg = "Testing for exact statement %r exact params %r, " \
-- "received %r with params %r" % (sql, params, _received_statement, _received_parameters)
++ self._errmsg = \
++ 'Testing for exact statement %r exact params %r, '\
++ 'received %r with params %r' % (sql, params,
++ _received_statement, _received_parameters)
class RegexSQL(SQLMatchRule):
++
def __init__(self, regex, params=None):
SQLMatchRule.__init__(self)
self.regex = re.compile(regex)
self.orig_regex = regex
self.params = params
-- def process_cursor_execute(self, statement, parameters, context, executemany):
++ def process_cursor_execute(self, statement, parameters, context,
++ executemany):
if not context:
return
--
-- _received_statement = _process_engine_statement(context.unicode_statement, context)
++ _received_statement = \
++ _process_engine_statement(context.unicode_statement,
++ context)
_received_parameters = context.compiled_parameters
--
equivalent = bool(self.regex.match(_received_statement))
if self.params:
if util.callable(self.params):
params = self.params(context)
else:
params = self.params
--
if not isinstance(params, list):
params = [params]
--
++
# do a positive compare only
++
for param, received in zip(params, _received_parameters):
for k, v in param.iteritems():
if k not in received or received[k] != v:
break
else:
params = {}
--
self._result = equivalent
if not self._result:
-- self._errmsg = "Testing for regex %r partial params %r, "\
-- "received %r with params %r" % (self.orig_regex, params, _received_statement, _received_parameters)
++ self._errmsg = \
++ 'Testing for regex %r partial params %r, received %r '\
++ 'with params %r' % (self.orig_regex, params,
++ _received_statement,
++ _received_parameters)
class CompiledSQL(SQLMatchRule):
++
def __init__(self, statement, params):
SQLMatchRule.__init__(self)
self.statement = statement
self.params = params
-- def process_cursor_execute(self, statement, parameters, context, executemany):
++ def process_cursor_execute(self, statement, parameters, context,
++ executemany):
if not context:
return
--
_received_parameters = list(context.compiled_parameters)
--
++
# recompile from the context, using the default dialect
-- compiled = context.compiled.statement.\
-- compile(dialect=DefaultDialect(), column_keys=context.compiled.column_keys)
--
++
++ compiled = \
++ context.compiled.statement.compile(dialect=DefaultDialect(),
++ column_keys=context.compiled.column_keys)
_received_statement = re.sub(r'\n', '', str(compiled))
--
equivalent = self.statement == _received_statement
if self.params:
if util.callable(self.params):
params = self.params(context)
else:
params = self.params
--
if not isinstance(params, list):
params = [params]
--
all_params = list(params)
all_received = list(_received_parameters)
while params:
param = dict(params.pop(0))
for k, v in context.compiled.params.iteritems():
param.setdefault(k, v)
--
if param not in _received_parameters:
equivalent = False
break
equivalent = False
else:
params = {}
--
self._result = equivalent
if not self._result:
- print "Testing for compiled statement %r partial params %r, " \
- "received %r with params %r" % \
- (self.statement, all_params, _received_statement, all_received)
-
-- self._errmsg = "Testing for compiled statement %r partial params %r, " \
-- "received %r with params %r" % \
-- (self.statement, all_params, _received_statement, all_received)
-- #print self._errmsg
++ print 'Testing for compiled statement %r partial params '\
++ '%r, received %r with params %r' % (self.statement,
++ all_params, _received_statement, all_received)
++ self._errmsg = \
++ 'Testing for compiled statement %r partial params %r, '\
++ 'received %r with params %r' % (self.statement,
++ all_params, _received_statement, all_received)
++
++
++ # print self._errmsg
--
class CountStatements(AssertRule):
++
def __init__(self, count):
self.count = count
self._statement_count = 0
--
++
def process_execute(self, clauseelement, *multiparams, **params):
self._statement_count += 1
-- def process_cursor_execute(self, statement, parameters, context, executemany):
++ def process_cursor_execute(self, statement, parameters, context,
++ executemany):
pass
def is_consumed(self):
return False
--
++
def consume_final(self):
-- assert self.count == self._statement_count, "desired statement count %d does not match %d" % (self.count, self._statement_count)
++ assert self.count == self._statement_count, \
++ 'desired statement count %d does not match %d' \
++ % (self.count, self._statement_count)
return True
class AllOf(AssertRule):
++
def __init__(self, *rules):
self.rules = set(rules)
--
++
def process_execute(self, clauseelement, *multiparams, **params):
for rule in self.rules:
rule.process_execute(clauseelement, *multiparams, **params)
-- def process_cursor_execute(self, statement, parameters, context, executemany):
++ def process_cursor_execute(self, statement, parameters, context,
++ executemany):
for rule in self.rules:
-- rule.process_cursor_execute(statement, parameters, context, executemany)
++ rule.process_cursor_execute(statement, parameters, context,
++ executemany)
def is_consumed(self):
if not self.rules:
return True
--
for rule in list(self.rules):
-- if rule.rule_passed(): # a rule passed, move on
++ if rule.rule_passed(): # a rule passed, move on
self.rules.remove(rule)
return len(self.rules) == 0
++ assert False, 'No assertion rules were satisfied for statement'
-- assert False, "No assertion rules were satisfied for statement"
--
def consume_final(self):
return len(self.rules) == 0
def _process_engine_statement(query, context):
if util.jython:
++
# oracle+zxjdbc passes a PyStatement when returning into
++
query = unicode(query)
-- if context.engine.name == 'mssql' and query.endswith('; select scope_identity()'):
++ if context.engine.name == 'mssql' \
++ and query.endswith('; select scope_identity()'):
query = query[:-25]
--
query = re.sub(r'\n', '', query)
--
return query
def _process_assertion_statement(query, context):
return query
-class SQLAssert(ConnectionProxy):
+class SQLAssert(object):
++
rules = None
--
++
def add_rules(self, rules):
self.rules = list(rules)
--
++
def statement_complete(self):
for rule in self.rules:
if not rule.consume_final():
-- assert False, "All statements are complete, but pending assertion rules remain"
++ assert False, \
++ 'All statements are complete, but pending '\
++ 'assertion rules remain'
def clear_rules(self):
del self.rules
--
- def execute(self, conn, execute, clauseelement, *multiparams, **params):
- result = execute(clauseelement, *multiparams, **params)
+
+ def execute(self, conn, clauseelement, multiparams, params, result):
if self.rules is not None:
if not self.rules:
-- assert False, "All rules have been exhausted, but further statements remain"
++ assert False, \
++ 'All rules have been exhausted, but further '\
++ 'statements remain'
rule = self.rules[0]
rule.process_execute(clauseelement, *multiparams, **params)
if rule.is_consumed():
self.rules.pop(0)
--
- return result
--
- def cursor_execute(self, conn, cursor, statement, parameters, context, executemany):
- def cursor_execute(self, execute, cursor, statement, parameters, context, executemany):
- result = execute(cursor, statement, parameters, context)
-
++
++ def cursor_execute(self, conn, cursor, statement, parameters,
++ context, executemany):
if self.rules:
rule = self.rules[0]
-- rule.process_cursor_execute(statement, parameters, context, executemany)
-
- return result
++ rule.process_cursor_execute(statement, parameters, context,
++ executemany)
asserter = SQLAssert()
import sys, types, weakref
from collections import deque
- from sqlalchemy_nose import config
+ from test.bootstrap import config
from sqlalchemy.util import function_named, callable
+from sqlalchemy import event
import re
import warnings
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.orm import exc as orm_exc
- from sqlalchemy.test import Column, testing
+from sqlalchemy import exc as sa_exc
+ from test.lib import Column, testing
from sqlalchemy.util import function_named
from test.orm import _fixtures, _base
from sqlalchemy.orm.collections import collection
from sqlalchemy.orm.interfaces import AttributeExtension
from sqlalchemy import exc as sa_exc
- from sqlalchemy.test import *
- from sqlalchemy.test.testing import eq_, ne_, assert_raises
+ from test.lib import *
+ from test.lib.testing import eq_, ne_, assert_raises
from test.orm import _base
- from sqlalchemy.test.util import gc_collect, all_partial_orderings
-from test.lib.util import gc_collect
++from test.lib.util import gc_collect, all_partial_orderings
from sqlalchemy.util import cmp, jython
+from sqlalchemy import event, topological
# global for pickling tests
MyTest = None
from sqlalchemy.orm.collections import collection
import sqlalchemy as sa
- from sqlalchemy.test import testing
+ from test.lib import testing
from sqlalchemy import Integer, String, ForeignKey, text
- from sqlalchemy.test.schema import Table, Column
+ from test.lib.schema import Table, Column
from sqlalchemy import util, exc as sa_exc
-from sqlalchemy.orm import create_session, mapper, relationship, attributes
+from sqlalchemy.orm import create_session, mapper, relationship, \
+ attributes, instrumentation
from test.orm import _base
- from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message
+ from test.lib.testing import eq_, assert_raises, assert_raises_message
class Canary(sa.orm.interfaces.AttributeExtension):
def __init__(self):
from sqlalchemy import *
from sqlalchemy import exc as sa_exc
from sqlalchemy.orm import *
- from sqlalchemy.test import *
- from sqlalchemy.test.testing import assert_raises_message
+ from test.lib import *
++from test.lib.testing import assert_raises_message
from test.orm import _base
import sqlalchemy as sa
- from sqlalchemy.test import testing
+from sqlalchemy import Integer, String, ForeignKey, event
- from sqlalchemy.test.schema import Table, Column
+ from test.lib import testing
-from sqlalchemy import Integer, String, ForeignKey
+ from test.lib.schema import Table, Column
from sqlalchemy.orm import mapper, relationship, create_session
from test.orm import _base
- from sqlalchemy.test.testing import eq_
+ from test.lib.testing import eq_
class TriggerDefaultsTest(_base.MappedTest):
- from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message
+ from test.lib.testing import eq_, assert_raises, assert_raises_message
import pickle
from sqlalchemy import util
-import sqlalchemy.orm.attributes as attributes
+from sqlalchemy.orm import attributes, instrumentation
from sqlalchemy.orm.collections import collection
-from sqlalchemy.orm.attributes import set_attribute, get_attribute, del_attribute, is_instrumented
+from sqlalchemy.orm.attributes import set_attribute, get_attribute, del_attribute
+from sqlalchemy.orm.instrumentation import is_instrumented
from sqlalchemy.orm import clear_mappers
from sqlalchemy.orm import InstrumentationManager
- from sqlalchemy.test import *
+ from test.lib import *
from test.orm import _base
class MyTypesManager(InstrumentationManager):
- from sqlalchemy.test.testing import assert_raises, assert_raises_message
+ from test.lib.testing import assert_raises, assert_raises_message
import sqlalchemy as sa
-from sqlalchemy import MetaData, Integer, ForeignKey, util
+from sqlalchemy import MetaData, Integer, ForeignKey, util, event
- from sqlalchemy.test.schema import Table
- from sqlalchemy.test.schema import Column
+from sqlalchemy.orm import mapper, relationship, create_session, \
+ attributes, class_mapper, clear_mappers, instrumentation, events
- from sqlalchemy.test.testing import eq_, ne_
+ from test.lib.schema import Table
+ from test.lib.schema import Column
-from sqlalchemy.orm import mapper, relationship, create_session, attributes, class_mapper, clear_mappers
+ from test.lib.testing import eq_, ne_
from sqlalchemy.util import function_named
from test.orm import _base
"""General mapper operations with an emphasis on selecting/loading."""
- from sqlalchemy.test.testing import assert_raises, assert_raises_message
+ from test.lib.testing import assert_raises, assert_raises_message
import sqlalchemy as sa
- from sqlalchemy.test import testing, pickleable
+ from test.lib import testing, pickleable
from sqlalchemy import MetaData, Integer, String, ForeignKey, func, util
- from sqlalchemy.test.schema import Table, Column
+ from test.lib.schema import Table, Column
from sqlalchemy.engine import default
-from sqlalchemy.orm import mapper, relationship, backref, create_session, class_mapper, compile_mappers, reconstructor, validates, aliased
-from sqlalchemy.orm import defer, deferred, synonym, attributes, column_property, composite, relationship, dynamic_loader, comparable_property,AttributeExtension
+from sqlalchemy.orm import mapper, relationship, backref, \
+ create_session, class_mapper, configure_mappers, reconstructor, \
+ validates, aliased, Mapper
+from sqlalchemy.orm import defer, deferred, synonym, attributes, \
+ column_property, composite, relationship, dynamic_loader, \
+ comparable_property, AttributeExtension
+from sqlalchemy.orm.instrumentation import ClassManager
- from sqlalchemy.test.testing import eq_, AssertsCompiledSQL
+ from test.lib.testing import eq_, AssertsCompiledSQL
from test.orm import _base, _fixtures
- from sqlalchemy.test.assertsql import AllOf, CompiledSQL
+from sqlalchemy import event
+ from test.lib.assertsql import AllOf, CompiledSQL
-
class MapperTest(_fixtures.FixtureTest):
@testing.resolve_artifact_names
import sqlalchemy as sa
from sqlalchemy import Integer, PickleType, String
import operator
- from sqlalchemy.test import testing
+ from test.lib import testing
from sqlalchemy.util import OrderedSet
-from sqlalchemy.orm import mapper, relationship, create_session, PropComparator, \
- synonym, comparable_property, sessionmaker, attributes
+from sqlalchemy.orm import mapper, relationship, create_session, \
+ PropComparator, synonym, comparable_property, sessionmaker, \
+ attributes, Session
from sqlalchemy.orm.collections import attribute_mapped_collection
from sqlalchemy.orm.interfaces import MapperOption
- from sqlalchemy.test.testing import eq_, ne_
+ from test.lib.testing import eq_, ne_
from test.orm import _base, _fixtures
- from sqlalchemy.test.schema import Table, Column
+from sqlalchemy import event
+ from test.lib.schema import Table, Column
class MergeTest(_fixtures.FixtureTest):
"""Session.merge() functionality"""
- from sqlalchemy.test.testing import assert_raises, assert_raises_message
+ from test.lib.testing import assert_raises, assert_raises_message
import datetime
import sqlalchemy as sa
- from sqlalchemy.test import testing
+ from test.lib import testing
from sqlalchemy import Integer, String, ForeignKey, MetaData, and_
- from sqlalchemy.test.schema import Table, Column
+ from test.lib.schema import Table, Column
from sqlalchemy.orm import mapper, relationship, relation, \
- backref, create_session, compile_mappers, \
+ backref, create_session, configure_mappers, \
clear_mappers, sessionmaker, attributes,\
Session, composite, column_property
- from sqlalchemy.test.testing import eq_, startswith_
+ from test.lib.testing import eq_, startswith_
from test.orm import _base, _fixtures
make_transient, Session
from sqlalchemy.orm.attributes import instance_state
import sqlalchemy as sa
- from sqlalchemy.test import engines, testing, config
+ from test.lib import engines, testing, config
from sqlalchemy import Integer, String, Sequence
- from sqlalchemy.test.schema import Table, Column
+ from test.lib.schema import Table, Column
from sqlalchemy.orm import mapper, relationship, backref, joinedload, \
exc as orm_exc, object_session
- from sqlalchemy.test.testing import eq_
+ from test.lib.testing import eq_
from test.engine import _base as engine_base
from test.orm import _base, _fixtures
-
+from sqlalchemy import event
class SessionTest(_fixtures.FixtureTest):
run_inserts = None
from sqlalchemy.orm import mapper as orm_mapper
import sqlalchemy as sa
- from sqlalchemy.test import engines, testing, pickleable
+from sqlalchemy import Integer, String, ForeignKey, literal_column, event
- from sqlalchemy.test.schema import Table
- from sqlalchemy.test.schema import Column
+ from test.lib import engines, testing, pickleable
-from sqlalchemy import Integer, String, ForeignKey, literal_column
+ from test.lib.schema import Table
+ from test.lib.schema import Column
from sqlalchemy.orm import mapper, relationship, create_session, \
column_property, attributes, Session, reconstructor, object_session
- from sqlalchemy.test.testing import eq_, ne_
- from sqlalchemy.test.util import gc_collect
+ from test.lib.testing import eq_, ne_
++from test.lib.util import gc_collect
from test.orm import _base, _fixtures
from test.engine import _base as engine_base
- from sqlalchemy.test.assertsql import AllOf, CompiledSQL
+ from test.lib.assertsql import AllOf, CompiledSQL
import gc
class UnitOfWorkTest(object):
from sqlalchemy.orm import mapper, create_session
- from sqlalchemy.test import TestBase, testing
+ from test.lib import TestBase, testing
from test.orm import _fixtures
- from sqlalchemy.test.testing import eq_
+ from test.lib.testing import eq_
-class ExtensionCarrierTest(TestBase):
- def test_basic(self):
- carrier = util.ExtensionCarrier()
-
- assert 'translate_row' not in carrier
- assert carrier.translate_row() is interfaces.EXT_CONTINUE
- assert 'translate_row' not in carrier
-
- assert_raises(AttributeError, lambda: carrier.snickysnack)
-
- class Partial(object):
- def __init__(self, marker):
- self.marker = marker
- def translate_row(self, row):
- return self.marker
-
- carrier.append(Partial('end'))
- assert 'translate_row' in carrier
- assert carrier.translate_row(None) == 'end'
-
- carrier.push(Partial('front'))
- assert carrier.translate_row(None) == 'front'
-
- assert 'populate_instance' not in carrier
- carrier.append(interfaces.MapperExtension)
-
- # Py3K
- #assert 'populate_instance' not in carrier
- # Py2K
- assert 'populate_instance' in carrier
- # end Py2K
-
- assert carrier.interface
- for m in carrier.interface:
- assert getattr(interfaces.MapperExtension, m)
-
class AliasedClassTest(TestBase):
def point_map(self, cls):
table = Table('point', MetaData(),