raise
numeric_table.drop()
- @testing.exclude('mysql', '<', (4, 1, 1))
def test_charset(self):
"""Exercise CHARACTER SET and COLLATE-ish options on string types."""
except:
raise
charset_table.drop()
+ test_charset = testing.exclude('mysql', '<', (4, 1, 1))(test_charset)
- @testing.exclude('mysql', '<', (5, 0, 5))
def test_bit_50(self):
"""Exercise BIT types on 5.0+ (not valid for all engine types)"""
roundtrip([0, 0, 0, 0, 0, 0, 0, i])
finally:
meta.drop_all()
+ test_bit_50 = testing.exclude('mysql', '<', (5, 0, 5))(test_bit_50)
def test_boolean(self):
"""Test BOOL/TINYINT(1) compatability and reflection."""
finally:
meta.drop_all()
- @testing.exclude('mysql', '<', (4, 1, 0))
def test_timestamp(self):
"""Exercise funky TIMESTAMP default syntax."""
self.assert_(r.c.t is not None)
finally:
meta.drop_all()
+ test_timestamp = testing.exclude('mysql', '<', (4, 1, 0))(test_timestamp)
def test_year(self):
"""Exercise YEAR."""
self.assert_eq(res, expected)
enum_table.drop()
- @testing.exclude('mysql', '>', (3))
def test_enum_parse(self):
"""More exercises for the ENUM type."""
assert t.c.e5.type.enums == ["", "'a'", "b'b", "'"]
finally:
enum_table.drop()
+ test_enum_parse = testing.exclude('mysql', '>', (3))(test_enum_parse)
def test_default_reflection(self):
"""Test reflection of column defaults."""
finally:
def_table.drop()
- @testing.exclude('mysql', '<', (5, 0, 0))
- @testing.uses_deprecated('Using String type with no length')
def test_type_reflection(self):
# (ask_for, roundtripped_as_if_different)
specs = [( String(), mysql.MSText(), ),
db.execute('DROP VIEW mysql_types_v')
finally:
m.drop_all()
+ test_type_reflection = testing.uses_deprecated('Using String type with no length')(test_type_reflection)
+ test_type_reflection = testing.exclude('mysql', '<', (5, 0, 0))(test_type_reflection)
def test_autoincrement(self):
meta = MetaData(testing.db)
class ReturningTest(TestBase, AssertsExecutionResults):
__only_on__ = 'postgres'
- @testing.exclude('postgres', '<', (8, 2))
def test_update_returning(self):
meta = MetaData(testing.db)
table = Table('tables', meta,
self.assertEqual(result2.fetchall(), [(1,True),(2,False)])
finally:
table.drop()
+ test_update_returning = testing.exclude('postgres', '<', (8, 2))(test_update_returning)
- @testing.exclude('postgres', '<', (8, 2))
def test_insert_returning(self):
meta = MetaData(testing.db)
table = Table('tables', meta,
self.assertEqual([dict(row) for row in result4], [{'persons': 10}])
finally:
table.drop()
+ test_insert_returning = testing.exclude('postgres', '<', (8, 2))(test_insert_returning)
class InsertTest(TestBase, AssertsExecutionResults):
self.assertEquals(rp(bp(dt)), dt)
- @testing.uses_deprecated('Using String type with no length')
def test_type_reflection(self):
# (ask_for, roundtripped_as_if_different)
specs = [( String(), sqlite.SLText(), ),
db.execute('DROP VIEW types_v')
finally:
m.drop_all()
+ test_type_reflection = testing.uses_deprecated('Using String type with no length')(test_type_reflection)
class DialectTest(TestBase, AssertsExecutionResults):
__only_on__ = 'sqlite'
finally:
cx.execute('DETACH DATABASE alt_schema')
- @testing.exclude('sqlite', '<', (2, 6))
def test_temp_table_reflection(self):
cx = testing.db.connect()
try:
except exceptions.DBAPIError:
pass
raise
+ test_temp_table_reflection = testing.exclude('sqlite', '<', (2, 6))(test_temp_table_reflection)
class InsertTest(TestBase, AssertsExecutionResults):
"""Tests inserts and autoincrement."""
finally:
table.drop()
- @testing.exclude('sqlite', '<', (3, 4))
def test_empty_insert_pk1(self):
self._test_empty_insert(
Table('a', MetaData(testing.db),
Column('id', Integer, primary_key=True)))
+ test_empty_insert_pk1 = testing.exclude('sqlite', '<', (3, 4))(test_empty_insert_pk1)
- @testing.exclude('sqlite', '<', (3, 4))
def test_empty_insert_pk2(self):
self.assertRaises(
exceptions.DBAPIError,
Table('b', MetaData(testing.db),
Column('x', Integer, primary_key=True),
Column('y', Integer, primary_key=True)))
+ test_empty_insert_pk2 = testing.exclude('sqlite', '<', (3, 4))(test_empty_insert_pk2)
- @testing.exclude('sqlite', '<', (3, 4))
def test_empty_insert_pk3(self):
self.assertRaises(
exceptions.DBAPIError,
Column('x', Integer, primary_key=True),
Column('y', Integer, PassiveDefault('123'),
primary_key=True)))
+ test_empty_insert_pk3 = testing.exclude('sqlite', '<', (3, 4))(test_empty_insert_pk3)
- @testing.exclude('sqlite', '<', (3, 4))
def test_empty_insert_pk4(self):
self._test_empty_insert(
Table('d', MetaData(testing.db),
Column('x', Integer, primary_key=True),
Column('y', Integer, PassiveDefault('123'))))
+ test_empty_insert_pk4 = testing.exclude('sqlite', '<', (3, 4))(test_empty_insert_pk4)
- @testing.exclude('sqlite', '<', (3, 4))
def test_empty_insert_nopk1(self):
self._test_empty_insert(
Table('e', MetaData(testing.db),
Column('id', Integer)))
+ test_empty_insert_nopk1 = testing.exclude('sqlite', '<', (3, 4))(test_empty_insert_nopk1)
- @testing.exclude('sqlite', '<', (3, 4))
def test_empty_insert_nopk2(self):
self._test_empty_insert(
Table('f', MetaData(testing.db),
Column('x', Integer),
Column('y', Integer)))
+ test_empty_insert_nopk2 = testing.exclude('sqlite', '<', (3, 4))(test_empty_insert_nopk2)
def test_inserts_with_spaces(self):
tbl = Table('tbl', MetaData('sqlite:///'),
"assign this Table's .metadata.bind to enable implicit "
"execution.")
- @testing.future
def test_create_drop_err2(self):
for meth in [
table.exists,
"against. Either execute with an explicit connection or "
"assign this Table's .metadata.bind to enable implicit "
"execution.")
+ test_create_drop_err2 = testing.future(test_create_drop_err2)
- @testing.uses_deprecated('//connect')
def test_create_drop_bound(self):
for meta in (MetaData,ThreadLocalMetaData):
assert not table.exists()
if isinstance(bind, engine.Connection):
bind.close()
+ test_create_drop_bound = testing.uses_deprecated('//connect')(test_create_drop_bound)
def test_create_drop_constructor_bound(self):
for bind in (
metadata.drop_all(bind)
assert canary.state == 'after-create'
- @testing.future
def test_metadata_table_isolation(self):
metadata, table, bind = self.metadata, self.table, self.bind
# path that metadata.create_all() does
self.table.create(self.bind)
assert metadata_canary.state == None
+ test_metadata_table_isolation = testing.future(test_metadata_table_isolation)
def test_append_listener(self):
metadata, table, bind = self.metadata, self.table, self.bind
def tearDownAll(self):
metadata.drop_all()
- @testing.fails_on_everything_except('firebird', 'maxdb', 'sqlite')
def test_raw_qmark(self):
for conn in (testing.db, testing.db.connect()):
conn.execute("insert into users (user_id, user_name) values (?, ?)", (1,"jack"))
res = conn.execute("select * from users order by user_id")
assert res.fetchall() == [(1, "jack"), (2, "fred"), (3, "ed"), (4, "horse"), (5, "barney"), (6, "donkey"), (7, 'sally')]
conn.execute("delete from users")
+ test_raw_qmark = testing.fails_on_everything_except('firebird', 'maxdb', 'sqlite')(test_raw_qmark)
- @testing.fails_on_everything_except('mysql', 'postgres')
# some psycopg2 versions bomb this.
def test_raw_sprintf(self):
for conn in (testing.db, testing.db.connect()):
res = conn.execute("select * from users order by user_id")
assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally'), (5, None)]
conn.execute("delete from users")
+ test_raw_sprintf = testing.fails_on_everything_except('mysql', 'postgres')(test_raw_sprintf)
# pyformat is supported for mysql, but skipping because a few driver
# versions have a bug that bombs out on this test. (1.2.2b3, 1.2.2c1, 1.2.2)
- @testing.unsupported('mysql')
- @testing.fails_on_everything_except('postgres')
def test_raw_python(self):
for conn in (testing.db, testing.db.connect()):
conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", {'id':1, 'name':'jack'})
res = conn.execute("select * from users order by user_id")
assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')]
conn.execute("delete from users")
+ test_raw_python = testing.fails_on_everything_except('postgres')(test_raw_python)
+ test_raw_python = testing.unsupported('mysql')(test_raw_python)
- @testing.fails_on_everything_except('sqlite', 'oracle')
def test_raw_named(self):
for conn in (testing.db, testing.db.connect()):
conn.execute("insert into users (user_id, user_name) values (:id, :name)", {'id':1, 'name':'jack'})
res = conn.execute("select * from users order by user_id")
assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')]
conn.execute("delete from users")
+ test_raw_named = testing.fails_on_everything_except('sqlite', 'oracle')(test_raw_named)
def test_exception_wrapping(self):
for conn in (testing.db, testing.db.connect()):
finally:
metadata.drop_all()
- @testing.exclude('mysql', '<', (4, 1, 1))
def test_to_metadata(self):
meta = MetaData()
assert not c.columns.contains_column(table.c.name)
finally:
meta.drop_all(testing.db)
+ test_to_metadata = testing.exclude('mysql', '<', (4, 1, 1))(test_to_metadata)
def test_nonexistent(self):
self.assertRaises(exceptions.NoSuchTableError, Table,
meta.drop_all()
engine.dispose()
- @testing.fails_on('mysql')
def test_invalidate_on_results(self):
conn = engine.connect()
raise
assert conn.invalidated
+ test_invalidate_on_results = testing.fails_on('mysql') (test_invalidate_on_results)
if __name__ == '__main__':
testenv.main()
class ReflectionTest(TestBase, ComparesTables):
- @testing.exclude('mysql', '<', (4, 1, 1))
def test_basic_reflection(self):
meta = MetaData(testing.db)
finally:
addresses.drop()
users.drop()
+ test_basic_reflection = testing.exclude('mysql', '<', (4, 1, 1))(test_basic_reflection)
def test_include_columns(self):
meta = MetaData(testing.db)
except exceptions.SAWarning:
assert True
- @testing.emits_warning('Did not recognize type')
def warns():
m3 = MetaData(testing.db)
t3 = Table("test", m3, autoload=True)
assert t3.c.foo.type.__class__ == sqltypes.NullType
+ warns = testing.emits_warning('Did not recognize type')(warns)
finally:
dialect_module.ischema_names = ischema_names
finally:
testing.db.execute("drop table book")
- @testing.exclude('mysql', '<', (4, 1, 1))
def test_composite_fk(self):
"""test reflection of composite foreign keys"""
self.assert_(and_(table.c.multi_id==table2.c.foo, table.c.multi_rev==table2.c.bar, table.c.multi_hoho==table2.c.lala).compare(j.onclause))
finally:
meta.drop_all()
+ test_composite_fk = testing.exclude('mysql', '<', (4, 1, 1))(test_composite_fk)
- @testing.unsupported('oracle')
def testreserved(self):
# check a table that uses an SQL reserved name doesn't cause an error
meta = MetaData(testing.db)
table_c2 = Table('is', meta2, autoload=True)
finally:
meta.drop_all()
+ testreserved = testing.unsupported('oracle')(testreserved)
def test_reflect_all(self):
existing = testing.db.table_names()
finally:
metadata.drop_all(bind=testing.db)
- @testing.exclude('mysql', '<', (4, 1, 1))
def test_createdrop(self):
metadata.create_all(bind=testing.db)
self.assertEqual( testing.db.has_table('items'), True )
self.assertEqual( testing.db.has_table('email_addresses'), False )
metadata.drop_all(bind=testing.db)
self.assertEqual( testing.db.has_table('items'), False )
+ test_createdrop = testing.exclude('mysql', '<', (4, 1, 1))(test_createdrop)
def test_tablenames(self):
from sqlalchemy.util import Set
assert buf.index("CREATE TABLE someschema.table1") > -1
assert buf.index("CREATE TABLE someschema.table2") > -1
- @testing.unsupported('sqlite', 'firebird')
- # fixme: revisit these below.
- @testing.fails_on('mssql', 'sybase', 'access')
def test_explicit_default_schema(self):
engine = testing.db
table2 = Table('table2', metadata, autoload=True, schema=schema)
finally:
metadata.drop_all()
+ test_explicit_default_schema = testing.fails_on('mssql', 'sybase', 'access')(test_explicit_default_schema)
+ # fixme: revisit these below.
+ test_explicit_default_schema = testing.unsupported('sqlite', 'firebird')(test_explicit_default_schema)
class HasSequenceTest(TestBase):
Column('user_name', String(40)),
)
- @testing.unsupported('sqlite', 'mysql', 'mssql', 'access', 'sybase')
def test_hassequence(self):
metadata.create_all(bind=testing.db)
self.assertEqual(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), True)
metadata.drop_all(bind=testing.db)
self.assertEqual(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), False)
+ test_hassequence = testing.unsupported('sqlite', 'mysql', 'mssql', 'access', 'sybase')(test_hassequence)
if __name__ == "__main__":
assert len(result.fetchall()) == 0
connection.close()
- @testing.exclude('mysql', '<', (5, 0, 3))
def testnestedrollback(self):
connection = testing.db.connect()
assert str(e) == 'uh oh' # and not "This transaction is inactive"
finally:
connection.close()
+ testnestedrollback = testing.exclude('mysql', '<', (5, 0, 3))(testnestedrollback)
- @testing.exclude('mysql', '<', (5, 0, 3))
def testnesting(self):
connection = testing.db.connect()
transaction = connection.begin()
result = connection.execute("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
+ testnesting = testing.exclude('mysql', '<', (5, 0, 3))(testnesting)
- @testing.exclude('mysql', '<', (5, 0, 3))
def testclose(self):
connection = testing.db.connect()
transaction = connection.begin()
result = connection.execute("select * from query_users")
assert len(result.fetchall()) == 5
connection.close()
+ testclose = testing.exclude('mysql', '<', (5, 0, 3))(testclose)
- @testing.exclude('mysql', '<', (5, 0, 3))
def testclose2(self):
connection = testing.db.connect()
transaction = connection.begin()
result = connection.execute("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
+ testclose2 = testing.exclude('mysql', '<', (5, 0, 3))(testclose2)
- @testing.unsupported('sqlite', 'mssql', 'sybase', 'access')
- @testing.exclude('mysql', '<', (5, 0, 3))
def testnestedsubtransactionrollback(self):
connection = testing.db.connect()
transaction = connection.begin()
[(1,),(3,)]
)
connection.close()
+ testnestedsubtransactionrollback = testing.exclude('mysql', '<', (5, 0, 3))(testnestedsubtransactionrollback)
+ testnestedsubtransactionrollback = testing.unsupported('sqlite', 'mssql', 'sybase', 'access')(testnestedsubtransactionrollback)
- @testing.unsupported('sqlite', 'mssql', 'sybase', 'access')
- @testing.exclude('mysql', '<', (5, 0, 3))
def testnestedsubtransactioncommit(self):
connection = testing.db.connect()
transaction = connection.begin()
[(1,),(2,),(3,)]
)
connection.close()
+ testnestedsubtransactioncommit = testing.exclude('mysql', '<', (5, 0, 3))(testnestedsubtransactioncommit)
+ testnestedsubtransactioncommit = testing.unsupported('sqlite', 'mssql', 'sybase', 'access')(testnestedsubtransactioncommit)
- @testing.unsupported('sqlite', 'mssql', 'sybase', 'access')
- @testing.exclude('mysql', '<', (5, 0, 3))
def testrollbacktosubtransaction(self):
connection = testing.db.connect()
transaction = connection.begin()
[(1,),(4,)]
)
connection.close()
+ testrollbacktosubtransaction = testing.exclude('mysql', '<', (5, 0, 3))(testrollbacktosubtransaction)
+ testrollbacktosubtransaction = testing.unsupported('sqlite', 'mssql', 'sybase', 'access')(testrollbacktosubtransaction)
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @testing.exclude('mysql', '<', (5, 0, 3))
def testtwophasetransaction(self):
connection = testing.db.connect()
[(1,),(2,)]
)
connection.close()
+ testtwophasetransaction = testing.exclude('mysql', '<', (5, 0, 3))(testtwophasetransaction)
+ testtwophasetransaction = testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb')(testtwophasetransaction)
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @testing.exclude('mysql', '<', (5, 0, 3))
def testmixedtwophasetransaction(self):
connection = testing.db.connect()
[(1,),(2,),(5,)]
)
connection.close()
+ testmixedtwophasetransaction = testing.exclude('mysql', '<', (5, 0, 3))(testmixedtwophasetransaction)
+ testmixedtwophasetransaction = testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb')(testmixedtwophasetransaction)
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- # fixme: see if this is still true and/or can be convert to fails_on()
- @testing.unsupported('mysql')
def testtwophaserecover(self):
# MySQL recovery doesn't currently seem to work correctly
# Prepared transactions disappear when connections are closed and even
[(1,)]
)
connection2.close()
+ testtwophaserecover = testing.unsupported('mysql')(testtwophaserecover)
+ # fixme: see if this is still true and/or can be convert to fails_on()
+ testtwophaserecover = testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb')(testtwophaserecover)
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @testing.exclude('mysql', '<', (5, 0, 3))
def testmultipletwophase(self):
conn = testing.db.connect()
self.assertEqual(result.fetchall(), [('user1',),('user4',)])
conn.close()
+ testmultipletwophase = testing.exclude('mysql', '<', (5, 0, 3))(testmultipletwophase)
+ testmultipletwophase = testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb')(testmultipletwophase)
class AutoRollbackTest(TestBase):
def setUpAll(self):
def tearDownAll(self):
metadata.drop_all(testing.db)
- @testing.unsupported('sqlite')
def testrollback_deadlock(self):
"""test that returning connections to the pool clears any object locks."""
conn1 = testing.db.connect()
# comment out the rollback in pool/ConnectionFairy._close() to see !
users.drop(conn2)
conn2.close()
+ testrollback_deadlock = testing.unsupported('sqlite')(testrollback_deadlock)
class ExplicitAutoCommitTest(TestBase):
"""test the 'autocommit' flag on select() and text() objects.
finally:
external_connection.close()
- @testing.unsupported('sqlite')
- @testing.exclude('mysql', '<', (5, 0, 3))
def testnesting(self):
"""tests nesting of transactions"""
external_connection = tlengine.connect()
self.assert_(external_connection.scalar("select count(1) from query_users") == 0)
finally:
external_connection.close()
+ testnesting = testing.exclude('mysql', '<', (5, 0, 3))(testnesting)
+ testnesting = testing.unsupported('sqlite')(testnesting)
- @testing.exclude('mysql', '<', (5, 0, 3))
def testmixednesting(self):
"""tests nesting of transactions off the TLEngine directly inside of
tranasctions off the connection from the TLEngine"""
self.assert_(external_connection.scalar("select count(1) from query_users") == 0)
finally:
external_connection.close()
+ testmixednesting = testing.exclude('mysql', '<', (5, 0, 3))(testmixednesting)
- @testing.exclude('mysql', '<', (5, 0, 3))
def testmoremixednesting(self):
"""tests nesting of transactions off the connection from the TLEngine
inside of tranasctions off thbe TLEngine directly."""
self.assert_(external_connection.scalar("select count(1) from query_users") == 0)
finally:
external_connection.close()
+ testmoremixednesting = testing.exclude('mysql', '<', (5, 0, 3))(testmoremixednesting)
- @testing.exclude('mysql', '<', (5, 0, 3))
def testsessionnesting(self):
class User(object):
pass
tlengine.commit()
finally:
clear_mappers()
+ testsessionnesting = testing.exclude('mysql', '<', (5, 0, 3))(testsessionnesting)
def testconnections(self):
c2.close()
assert c1.connection.connection is not None
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @testing.exclude('mysql', '<', (5, 0, 3))
def testtwophasetransaction(self):
tlengine.begin_twophase()
tlengine.execute(users.insert(), user_id=1, user_name='user1')
tlengine.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),
[(1,),(2,)]
)
+ testtwophasetransaction = testing.exclude('mysql', '<', (5, 0, 3))(testtwophasetransaction)
+ testtwophasetransaction = testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb')(testtwophasetransaction)
class ForUpdateTest(TestBase):
def setUpAll(self):
break
con.close()
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access')
-
def testqueued_update(self):
"""Test SELECT FOR UPDATE with concurrent modifications.
sel = counters.select(whereclause=counters.c.counter_id==1)
final = db.execute(sel).fetchone()
self.assert_(final['counter_value'] == iterations * thread_count)
+ testqueued_update = testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access')(testqueued_update)
def overlap(self, ids, errors, update_style):
sel = counters.select(for_update=update_style,
return errors
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access')
def testqueued_select(self):
"""Simple SELECT FOR UPDATE conflict test"""
for e in errors:
sys.stderr.write("Failure: %s\n" % e)
self.assert_(len(errors) == 0)
+ testqueued_select = testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access')(testqueued_select)
- @testing.unsupported('sqlite', 'mysql', 'mssql', 'firebird',
- 'sybase', 'access')
def testnowait_select(self):
"""Simple SELECT FOR UPDATE NOWAIT conflict test"""
errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)],
update_style='nowait')
self.assert_(len(errors) != 0)
+ testnowait_select = testing.unsupported('sqlite', 'mysql', 'mssql', 'firebird', 'sybase', 'access')(testnowait_select)
if __name__ == "__main__":
self.assertEquals(len(person.addresses), 2)
self.assertEquals(person.addresses[0].postal_code, '30338')
- @testing.unsupported('mysql')
def test_update(self):
p1 = self.create_person_one()
objectstore.flush()
assert False
except exceptions.ConcurrentModificationError:
pass
+ test_update = testing.unsupported('mysql')(test_update)
def test_delete(self):
)
metadata.create_all()
- @testing.uses_deprecated('SessionContext', 'assign_mapper')
def setUp(self):
global SomeObject, SomeOtherObject, ctx
class SomeObject(object):pass
s.options.append(sso)
ctx.current.flush()
ctx.current.clear()
+ setUp = testing.uses_deprecated('SessionContext', 'assign_mapper')(setUp)
def tearDownAll(self):
metadata.drop_all()
table.delete().execute()
clear_mappers()
- @testing.uses_deprecated('assign_mapper')
def test_override_attributes(self):
sso = SomeOtherObject.query().first()
assert False
except exceptions.ArgumentError:
pass
+ test_override_attributes = testing.uses_deprecated('assign_mapper')(test_override_attributes)
- @testing.uses_deprecated('assign_mapper')
def test_dont_clobber_methods(self):
class MyClass(object):
def expunge(self):
assign_mapper(ctx, MyClass, table2)
assert MyClass().expunge() == "an expunge !"
+ test_dont_clobber_methods = testing.uses_deprecated('assign_mapper')(test_dont_clobber_methods)
if __name__ == '__main__':
class DictCollection(dict):
- @collection.appender
def append(self, obj):
self[obj.foo] = obj
- @collection.remover
+ append = collection.appender(append)
def remove(self, obj):
del self[obj.foo]
+ remove = collection.remover(remove)
class SetCollection(set):
pass
class ObjectCollection(object):
def __init__(self):
self.values = list()
- @collection.appender
def append(self, obj):
self.values.append(obj)
- @collection.remover
+ append = collection.appender(append)
def remove(self, obj):
self.values.remove(obj)
+ remove = collection.remover(remove)
def __iter__(self):
return iter(self.values)
assert Foo.__mapper__.compile().extension.create_instance() == 'CHECK'
- @testing.emits_warning('Ignoring declarative-like tuple value of '
- 'attribute id')
def test_oops(self):
def define():
class User(Base, Fixture):
exceptions.ArgumentError,
"Mapper Mapper|User|users could not assemble any primary key",
define)
+ test_oops = testing.emits_warning('Ignoring declarative-like tuple value of attribute id')(test_oops)
def test_expression(self):
class User(Base, Fixture):
sess.flush()
self.assertEquals(sess.query(User).filter(User.name=="SOMENAME someuser").one(), u1)
- @testing.uses_deprecated('Call to deprecated function declared_synonym')
def test_decl_synonym_inline(self):
class User(Base, Fixture):
__tablename__ = 'users'
sess.save(u1)
sess.flush()
self.assertEquals(sess.query(User).filter(User.name=="SOMENAME someuser").one(), u1)
+ test_decl_synonym_inline = testing.uses_deprecated('Call to deprecated function declared_synonym')(test_decl_synonym_inline)
def test_synonym_added(self):
class User(Base, Fixture):
sess.flush()
self.assertEquals(sess.query(User).filter(User.name=="SOMENAME someuser").one(), u1)
- @testing.uses_deprecated('Call to deprecated function declared_synonym')
def test_decl_synonym_added(self):
class User(Base, Fixture):
__tablename__ = 'users'
sess.save(u1)
sess.flush()
self.assertEquals(sess.query(User).filter(User.name=="SOMENAME someuser").one(), u1)
+ test_decl_synonym_added = testing.uses_deprecated('Call to deprecated function declared_synonym')(test_decl_synonym_added)
def test_joined_inheritance(self):
class Company(Base, Fixture):
id = Column('id', Integer, primary_key=True)
name = Column('name', String(50))
- @synonym_for('name')
- @property
def namesyn(self):
return self.name
+ namesyn = property(namesyn)
+ namesyn = synonym_for('name')(namesyn)
Base.metadata.create_all()
def test_comparable_using(self):
class NameComparator(PropComparator):
- @property
def upperself(self):
cls = self.prop.parent.class_
col = getattr(cls, 'name')
return func.upper(col)
+ upperself = property(upperself)
def operate(self, op, other, **kw):
return op(self.upperself, other, **kw)
id = Column('id', Integer, primary_key=True)
name = Column('name', String(50))
- @comparable_using(NameComparator)
- @property
def uc_name(self):
return self.name is not None and self.name.upper() or None
+ uc_name = property(uc_name)
+ uc_name = comparable_using(NameComparator)(uc_name)
Base.metadata.create_all()
from testlib import *
class AssociationTest(TestBase):
- @testing.uses_deprecated('association option')
def setUpAll(self):
global items, item_keywords, keywords, metadata, Item, Keyword, KeywordAssociation
metadata = MetaData(testing.db)
mapper(Item, items, properties={
'keywords' : relation(KeywordAssociation, association=Keyword)
})
+ setUpAll = testing.uses_deprecated('association option')(setUpAll)
def tearDown(self):
for t in metadata.table_iterator(reverse=True):
print loaded
self.assert_(saved == loaded)
- @testing.uses_deprecated('association option')
def testdelete(self):
sess = create_session()
item1 = Item('item1')
sess.delete(item2)
sess.flush()
self.assert_(item_keywords.count().scalar() == 0)
+ testdelete = testing.uses_deprecated('association option')(testdelete)
class AssociationTest2(TestBase):
def setUpAll(self):
print result
assert result == [u'1 Some Category', u'3 Some Category']
- @testing.uses_deprecated('//select')
def test_withouteagerload_deprecated(self):
s = create_session()
l=s.query(Test).select ( and_(tests.c.owner_id==1,or_(options.c.someoption==None,options.c.someoption==False)),
result = ["%d %s" % ( t.id,t.category.name ) for t in l]
print result
assert result == [u'1 Some Category', u'3 Some Category']
+ test_withouteagerload_deprecated = testing.uses_deprecated('//select')(test_withouteagerload_deprecated)
def test_witheagerload(self):
"""test that an eagerload locates the correct "from" clause with
print result
assert result == [u'1 Some Category', u'3 Some Category']
- @testing.uses_deprecated('//select')
def test_witheagerload_deprecated(self):
"""As test_witheagerload, but via select()."""
s = create_session()
result = ["%d %s" % ( t.id,t.category.name ) for t in l]
print result
assert result == [u'1 Some Category', u'3 Some Category']
+ test_witheagerload_deprecated = testing.uses_deprecated('//select')(test_witheagerload_deprecated)
def test_dslish(self):
"""test the same as witheagerload except using generative"""
print result
assert result == [u'1 Some Category', u'3 Some Category']
- @testing.unsupported('sybase')
def test_withoutouterjoin_literal(self):
s = create_session()
q = s.query(Test).options(eagerload('category'))
result = ["%d %s" % ( t.id,t.category.name ) for t in l]
print result
assert result == [u'3 Some Category']
+ test_withoutouterjoin_literal = testing.unsupported('sybase')(test_withoutouterjoin_literal)
- @testing.unsupported('sybase')
- @testing.uses_deprecated('//select', '//join_to')
def test_withoutouterjoin_literal_deprecated(self):
s = create_session()
q=s.query(Test).options(eagerload('category'))
result = ["%d %s" % ( t.id,t.category.name ) for t in l]
print result
assert result == [u'3 Some Category']
+ test_withoutouterjoin_literal_deprecated = testing.uses_deprecated('//select', '//join_to')(test_withoutouterjoin_literal_deprecated)
+ test_withoutouterjoin_literal_deprecated = testing.unsupported('sybase')(test_withoutouterjoin_literal_deprecated)
def test_withoutouterjoin(self):
s = create_session()
print result
assert result == [u'3 Some Category']
- @testing.uses_deprecated('//select', '//join_to', '//join_via')
def test_withoutouterjoin_deprecated(self):
s = create_session()
q=s.query(Test).options(eagerload('category'))
result = ["%d %s" % ( t.id,t.category.name ) for t in l]
print result
assert result == [u'3 Some Category']
+ test_withoutouterjoin_deprecated = testing.uses_deprecated('//select', '//join_to', '//join_via')(test_withoutouterjoin_deprecated)
class EagerTest2(TestBase, AssertsExecutionResults):
def setUpAll(self):
for t in metadata.table_iterator(reverse=True):
t.delete().execute()
- @testing.fails_on('maxdb')
def testeagerterminate(self):
"""test that eager query generation does not include the same mapper's table twice.
session.clear()
obj = session.query(Left).filter_by(tag='tag1').one()
print obj.middle.right[0]
+ testeagerterminate = testing.fails_on('maxdb')(testeagerterminate)
class EagerTest3(ORMTest):
"""test eager loading combined with nested SELECT statements, functions, and aggregates"""
Column ( 'data_id', Integer, ForeignKey('datas.id')),
Column ( 'somedata', Integer, nullable=False ))
- @testing.fails_on('maxdb')
def test_nesting_with_functions(self):
class Data(object): pass
class Foo(object):pass
# assert equality including ordering (may break if the DB "ORDER BY" and python's sort() used differing
# algorithms and there are repeated 'somedata' values in the list)
assert verify_result == arb_result
+ test_nesting_with_functions = testing.fails_on('maxdb')(test_nesting_with_functions)
class EagerTest4(ORMTest):
def define_tables(self, metadata):
Column('department_id', Integer,
ForeignKey('departments.department_id')))
- @testing.fails_on('maxdb')
def test_basic(self):
class Department(object):
def __init__(self, **kwargs):
q = q.join('employees').filter(Employee.c.name.startswith('J')).distinct().order_by([desc(Department.c.name)])
assert q.count() == 2
assert q[0] is d2
+ test_basic = testing.fails_on('maxdb')(test_basic)
class EagerTest5(ORMTest):
"""test the construction of AliasedClauses for the same eager load property but different
x.inheritedParts
class EagerTest7(ORMTest):
- @testing.uses_deprecated('SessionContext')
def define_tables(self, metadata):
global companies_table, addresses_table, invoice_table, phones_table, items_table, ctx
global Company, Address, Phone, Item,Invoice
class Item(object):
def __repr__(self):
return "Item: " + repr(getattr(self, 'item_id', None)) + " " + repr(getattr(self, 'invoice_id', None)) + " " + repr(self.code) + " " + repr(self.qty)
+ define_tables = testing.uses_deprecated('SessionContext')(define_tables)
- @testing.uses_deprecated('SessionContext')
def testone(self):
"""tests eager load of a many-to-one attached to a one-to-many. this testcase illustrated
the bug, which is that when the single Company is loaded, no further processing of the rows
print repr(c)
print repr(i.company)
self.assert_(repr(c) == repr(i.company))
+ testone = testing.uses_deprecated('SessionContext')(testone)
def testtwo(self):
"""this is the original testcase that includes various complicating factors"""
testing.db.execute(task_type_t.insert(), {'id':1})
testing.db.execute(task_t.insert(), {'title':u'task 1', 'task_type_id':1, 'status_id':1, 'prj_id':1})
- @testing.fails_on('maxdb')
def test_nested_joins(self):
# this is testing some subtle column resolution stuff,
# concerning corresponding_column() being extremely accurate
for t in session.query(cls.mapper).limit(10).offset(0).all():
print t.id, t.title, t.props_cnt
+ test_nested_joins = testing.fails_on('maxdb')(test_nested_joins)
class EagerTest9(ORMTest):
"""test the usage of query options to eagerly load specific paths.
Column('transaction_id', Integer, ForeignKey(transactions_table.c.transaction_id)),
)
- @testing.fails_on('maxdb')
def test_eagerload_on_path(self):
class Account(fixtures.Base):
pass
assert e.account is acc
self.assert_sql_count(testing.db, go, 1)
+ test_eagerload_on_path = testing.fails_on('maxdb')(test_eagerload_on_path)
assert str(e) == "Type InstrumentedDict must elect an appender method to be a collection class"
class MyDict(dict):
- @collection.appender
def append(self, item):
self[item.foo] = item
- @collection.remover
+ append = collection.appender(append)
def remove(self, item):
del self[item.foo]
+ remove = collection.remover(remove)
attributes.register_attribute(Foo, "collection", uselist=True, typecallable=MyDict, useobject=True)
assert isinstance(Foo().collection, MyDict)
assert str(e) == "Type MyColl must elect an appender method to be a collection class"
class MyColl(object):
- @collection.iterator
def __iter__(self):
return iter([])
- @collection.appender
+ __iter__ = collection.iterator(__iter__)
def append(self, item):
pass
- @collection.remover
+ append = collection.appender(append)
def remove(self, item):
pass
+ remove = collection.remover(remove)
attributes.register_attribute(Foo, "collection", uselist=True, typecallable=MyColl, useobject=True)
try:
Foo().collection
sess.flush()
sess.close()
- @testing.fails_on('maxdb')
def test_orphan(self):
sess = create_session()
assert prefs.count().scalar() == 3
sess.flush()
assert prefs.count().scalar() == 2
assert extra.count().scalar() == 2
+ test_orphan = testing.fails_on('maxdb')(test_orphan)
- @testing.fails_on('maxdb')
def test_orphan_on_update(self):
sess = create_session()
jack = sess.query(User).filter_by(name="jack").one()
sess.flush()
assert prefs.count().scalar() == 2
assert extra.count().scalar() == 2
+ test_orphan_on_update = testing.fails_on('maxdb')(test_orphan_on_update)
def test_pending_expunge(self):
sess = create_session()
self.assertEquals(sess.query(T2).all(), [T2()])
self.assertEquals(sess.query(T3).all(), [T3()])
- @testing.future
def test_preserves_orphans_onelevel_postremove(self):
sess = create_session()
self.assertEquals(sess.query(T1).all(), [])
self.assertEquals(sess.query(T2).all(), [T2()])
self.assertEquals(sess.query(T3).all(), [T3()])
+ test_preserves_orphans_onelevel_postremove = testing.future(test_preserves_orphans_onelevel_postremove)
def test_preserves_orphans_twolevel(self):
sess = create_session()
def test_dict_subclass(self):
class MyDict(dict):
- @collection.appender
- @collection.internally_instrumented
def set(self, item, _sa_initiator=None):
self.__setitem__(item.a, item, _sa_initiator=_sa_initiator)
- @collection.remover
- @collection.internally_instrumented
+ set = collection.internally_instrumented(set)
+ set = collection.appender(set)
def _remove(self, item, _sa_initiator=None):
self.__delitem__(item.a, _sa_initiator=_sa_initiator)
+ _remove = collection.internally_instrumented(_remove)
+ _remove = collection.remover(_remove)
self._test_adapter(MyDict, dictable_entity,
to_set=lambda c: set(c.values()))
def __init__(self):
self.data = dict()
- @collection.appender
- @collection.replaces(1)
def set(self, item):
current = self.data.get(item.a, None)
self.data[item.a] = item
return current
- @collection.remover
+ set = collection.appender(set)
+ set = collection.replaces(1)(set)
def _remove(self, item):
del self.data[item.a]
+ _remove = collection.remover(_remove)
def __setitem__(self, key, value):
self.data[key] = value
def __getitem__(self, key):
return self.data.values()
def __contains__(self, key):
return key in self.data
- @collection.iterator
def itervalues(self):
return self.data.itervalues()
+ itervalues = collection.iterator(itervalues)
def __eq__(self, other):
return self.data == other
def __repr__(self):
def __init__(self):
self.data = dict()
- @collection.appender
- @collection.replaces(1)
def set(self, item):
current = self.data.get(item.a, None)
self.data[item.a] = item
return current
- @collection.remover
+ set = collection.replaces(1)(set)
+ set = collection.appender(set)
def _remove(self, item):
del self.data[item.a]
+ _remove = collection.remover(_remove)
def __setitem__(self, key, value):
self.data[key] = value
def __getitem__(self, key):
return self.data.values()
def __contains__(self, key):
return key in self.data
- @collection.iterator
def itervalues(self):
return self.data.itervalues()
+ itervalues = collection.iterator(itervalues)
def __eq__(self, other):
return self.data == other
def __repr__(self):
class MyCollection(object):
def __init__(self):
self.data = set()
- @collection.appender
def push(self, item):
self.data.add(item)
- @collection.remover
+ push = collection.appender(push)
def zark(self, item):
self.data.remove(item)
- @collection.removes_return()
+ zark = collection.remover(zark)
def maybe_zark(self, item):
if item in self.data:
self.data.remove(item)
return item
- @collection.iterator
+ maybe_zark = collection.removes_return()(maybe_zark)
def __iter__(self):
return iter(self.data)
+ __iter__ = collection.iterator(__iter__)
def __eq__(self, other):
return self.data == other
# looks like a list
def append(self, item):
assert False
- @collection.appender
def push(self, item):
self.data.add(item)
- @collection.remover
+ push = collection.appender(push)
def zark(self, item):
self.data.remove(item)
- @collection.removes_return()
+ zark = collection.remover(zark)
def maybe_zark(self, item):
if item in self.data:
self.data.remove(item)
return item
- @collection.iterator
+ maybe_zark = collection.removes_return()(maybe_zark)
def __iter__(self):
return iter(self.data)
+ __iter__ = collection.iterator(__iter__)
def __eq__(self, other):
return self.data == other
class Custom(object):
def __init__(self):
self.data = []
- @collection.appender
- @collection.adds('entity')
def put(self, entity):
self.data.append(entity)
+ put = collection.adds('entity')(put)
+ put = collection.appender(put)
- @collection.remover
- @collection.removes(1)
def remove(self, entity):
self.data.remove(entity)
+ remove = collection.removes(1)(remove)
+ remove = collection.remover(remove)
- @collection.adds(1)
def push(self, *args):
self.data.append(args[0])
+ push = collection.adds(1)(push)
- @collection.removes('entity')
def yank(self, entity, arg):
self.data.remove(entity)
+ yank = collection.removes('entity')(yank)
- @collection.replaces(2)
def replace(self, arg, entity, **kw):
self.data.insert(0, entity)
return self.data.pop()
+ replace = collection.replaces(2)(replace)
- @collection.removes_return()
def pop(self, key):
return self.data.pop()
+ pop = collection.removes_return()(pop)
- @collection.iterator
def __iter__(self):
return iter(self.data)
+ __iter__ = collection.iterator(__iter__)
class Foo(object):
pass
class Bar(object):
pass
class AppenderDict(dict):
- @collection.appender
def set(self, item):
self[id(item)] = item
- @collection.remover
+ set = collection.appender(set)
def remove(self, item):
if id(item) in self:
del self[id(item)]
+ remove = collection.remover(remove)
mapper(Foo, sometable, properties={
'bars':relation(Bar, collection_class=AppenderDict)
class MyCollection(object):
def __init__(self):
self.data = []
- @collection.appender
def append(self, value):
self.data.append(value)
- @collection.remover
+ append = collection.appender(append)
def remove(self, value):
self.data.remove(value)
- @collection.iterator
+ remove = collection.remover(remove)
def __iter__(self):
return iter(self.data)
+ __iter__ = collection.iterator(__iter__)
mapper(Parent, sometable, properties={
'children':relation(Child, collection_class=MyCollection)
sess.rollback()
self.assertEquals(u1.addresses.all(), [Address(email_address='lala@hoho.com')])
- @testing.fails_on('maxdb')
def test_delete_nocascade(self):
mapper(User, users, properties={
'addresses':dynamic_loader(mapper(Address, addresses), backref='user')
sess.close()
assert testing.db.scalar(addresses.count(addresses.c.user_id != None)) ==0
+ test_delete_nocascade = testing.fails_on('maxdb')(test_delete_nocascade)
- @testing.fails_on('maxdb')
def test_delete_cascade(self):
mapper(User, users, properties={
'addresses':dynamic_loader(mapper(Address, addresses), backref='user', cascade="all, delete-orphan")
sess.close()
assert testing.db.scalar(addresses.count()) ==0
+ test_delete_cascade = testing.fails_on('maxdb')(test_delete_cascade)
- @testing.fails_on('maxdb')
def test_remove_orphans(self):
mapper(User, users, properties={
'addresses':dynamic_loader(mapper(Address, addresses), cascade="all, delete-orphan", backref='user')
sess.delete(u)
sess.close()
+ test_remove_orphans = testing.fails_on('maxdb')(test_remove_orphans)
def create_backref_test(autoflush, saveuser):
assert 'orders' not in noeagers[0].__dict__
assert 'addresses' not in noeagers[0].__dict__
- @testing.fails_on('maxdb')
def test_limit(self):
"""test limit operations combined with lazy-load relationships."""
print fixtures.user_all_result[1:3]
print l
assert fixtures.user_all_result[1:3] == l
+ test_limit = testing.fails_on('maxdb')(test_limit)
def test_distinct(self):
# this is an involved 3x union of the users table to get a lot of rows.
assert fixtures.user_address_result == l
self.assert_sql_count(testing.db, go, 1)
- @testing.fails_on('maxdb')
def test_limit_2(self):
mapper(Keyword, keywords)
mapper(Item, items, properties = dict(
order_by(Item.id).limit(2).all()
assert fixtures.item_keyword_result[1:3] == l
+ test_limit_2 = testing.fails_on('maxdb')(test_limit_2)
- @testing.fails_on('maxdb')
def test_limit_3(self):
"""test that the ORDER BY is propigated from the inner select to the outer select, when using the
'wrapped' select statement resulting from the combination of eager loading and limit/offset clauses."""
addresses=[Address(id=1)]
)
] == l.all()
+ test_limit_3 = testing.fails_on('maxdb')(test_limit_3)
def test_limit_4(self):
# tests the LIMIT/OFFSET aliasing on a mapper against a select. original issue from ticket #904
assert [User(id=7, address=Address(id=1))] == l
self.assert_sql_count(testing.db, go, 1)
- @testing.fails_on('maxdb')
def test_many_to_one(self):
mapper(Address, addresses, properties = dict(
user = relation(mapper(User, users), lazy=False)
u1 = sess.query(User).get(7)
assert a.user is u1
self.assert_sql_count(testing.db, go, 1)
+ test_many_to_one = testing.fails_on('maxdb')(test_many_to_one)
def test_one_and_many(self):
Column('parent_id', Integer, ForeignKey('nodes.id')),
Column('data', String(30)))
- @testing.fails_on('maxdb')
def test_basic(self):
class Node(Base):
def append(self, node):
Node(data='n13')
]) == d
self.assert_sql_count(testing.db, go, 1)
+ test_basic = testing.fails_on('maxdb')(test_basic)
def test_lazy_fallback_doesnt_affect_eager(self):
),
])
- @testing.fails_on('maxdb')
def test_no_depth(self):
class Node(Base):
def append(self, node):
Node(data='n13')
]) == d
self.assert_sql_count(testing.db, go, 3)
+ test_no_depth = testing.fails_on('maxdb')(test_no_depth)
class SelfReferentialM2MEagerTest(ORMTest):
def define_tables(self, metadata):
"""
class User(Base):
- @property
def prop_score(self):
return sum([tag.prop_score for tag in self.tags])
+ prop_score = property(prop_score)
class Tag(Base):
- @property
def prop_score(self):
return self.score1 * self.score2
+ prop_score = property(prop_score)
for labeled, labelname in [(True, 'score'), (True, None), (False, None)]:
clear_mappers()
"""tests mappers that are constructed based on "entity names", which allows the same class
to have multiple primary mappers """
- @testing.uses_deprecated('SessionContext')
def setUpAll(self):
global user1, user2, address1, address2, metadata, ctx
metadata = MetaData(testing.db)
Column('email', String(100), nullable=False)
)
metadata.create_all()
+ setUpAll = testing.uses_deprecated('SessionContext')(setUpAll)
def tearDownAll(self):
metadata.drop_all()
def tearDown(self):
for t in metadata.table_iterator(reverse=True):
t.delete().execute()
- @testing.uses_deprecated('SessionContextExt')
def testbasic(self):
"""tests a pair of one-to-many mapper structures, establishing that both
parent and child objects honor the "entity_name" attribute attached to the object
u1 = ctx.current.query(User, entity_name='user1').first()
ctx.current.refresh(u1)
ctx.current.expire(u1)
+ testbasic = testing.uses_deprecated('SessionContextExt')(testbasic)
def testcascade(self):
s.expire(u)
assert len(u.addresses) == 3
- @testing.fails_on('maxdb')
def test_refresh2(self):
"""test a hang condition that was occuring on expire/refresh"""
assert u.name == 'Justin'
s.refresh(u)
+ test_refresh2 = testing.fails_on('maxdb')(test_refresh2)
if __name__ == '__main__':
testenv.main()
assert res.order_by([Foo.c.bar])[0].bar == 5
assert res.order_by([desc(Foo.c.bar)])[0].bar == 95
- @testing.unsupported('mssql')
- @testing.fails_on('maxdb')
def test_slice(self):
sess = create_session(bind=testing.db)
query = sess.query(Foo)
assert list(query[10:40:3]) == orig[10:40:3]
assert list(query[-5:]) == orig[-5:]
assert query[10:20][5] == orig[10:20][5]
+ test_slice = testing.fails_on('maxdb')(test_slice)
+ test_slice = testing.unsupported('mssql')(test_slice)
- @testing.uses_deprecated('Call to deprecated function apply_max')
def test_aggregate(self):
sess = create_session(bind=testing.db)
query = sess.query(Foo)
assert query.filter(foo.c.bar<30).max(foo.c.bar) == 29
assert query.filter(foo.c.bar<30).apply_max(foo.c.bar).first() == 29
assert query.filter(foo.c.bar<30).apply_max(foo.c.bar).one() == 29
+ test_aggregate = testing.uses_deprecated('Call to deprecated function apply_max')(test_aggregate)
def test_aggregate_1(self):
if (testing.against('mysql') and
query = create_session(bind=testing.db).query(Foo)
assert query.filter(foo.c.bar<30).sum(foo.c.bar) == 435
- @testing.fails_on('firebird', 'mssql')
def test_aggregate_2(self):
query = create_session(bind=testing.db).query(Foo)
avg = query.filter(foo.c.bar < 30).avg(foo.c.bar)
assert round(avg, 1) == 14.5
+ test_aggregate_2 = testing.fails_on('firebird', 'mssql')(test_aggregate_2)
- @testing.fails_on('firebird', 'mssql')
- @testing.uses_deprecated('Call to deprecated function apply_avg')
def test_aggregate_3(self):
query = create_session(bind=testing.db).query(Foo)
avg_o = query.filter(foo.c.bar<30).apply_avg(foo.c.bar).one()
assert round(avg_o, 1) == 14.5
+ test_aggregate_3 = testing.uses_deprecated('Call to deprecated function apply_avg')(test_aggregate_3)
+ test_aggregate_3 = testing.fails_on('firebird', 'mssql')(test_aggregate_3)
def test_filter(self):
query = create_session(bind=testing.db).query(Foo)
Column('foo_id', Integer, ForeignKey('foo.id'))
)
- @testing.fails_on('maxdb')
def testbasic(self):
class Foo(object): pass
class Bar(Foo): pass
q = sess.query(Bar)
self.assert_(len(q.first().lazy) == 1)
self.assert_(len(q.first().eager) == 1)
+ testbasic = testing.fails_on('maxdb')(testbasic)
class FlushTest(ORMTest):
Column('parent', Integer, ForeignKey('base.id'))
)
- @engines.close_open_connections
def test_save_update(self):
class Base(fixtures.Base):
pass
assert s2.subdata == 'sess1 subdata'
s2.subdata = 'sess2 subdata'
sess2.flush()
+ test_save_update = engines.close_open_connections(test_save_update)
def test_delete(self):
class Base(fixtures.Base):
found = repr(l[0]) + repr(sorted([repr(o) for o in l[0].foos]))
self.assertEqual(found, compare)
- @testing.fails_on('maxdb')
def testadvanced(self):
class Foo(object):
def __init__(self, data=None):
x = sess.query(Blub).filter_by(id=blubid).one()
print x
self.assert_(repr(x) == compare)
+ testadvanced = testing.fails_on('maxdb')(testadvanced)
if __name__ == "__main__":
table1_mapper.compile()
assert table1_mapper.primary_key == [table1.c.id], table1_mapper.primary_key
- @testing.fails_on('maxdb')
def testone(self):
self.do_testlist([Table1, Table2, Table1, Table2])
+ testone = testing.fails_on('maxdb')(testone)
- @testing.fails_on('maxdb')
def testtwo(self):
self.do_testlist([Table3])
+ testtwo = testing.fails_on('maxdb')(testtwo)
- @testing.fails_on('maxdb')
def testthree(self):
self.do_testlist([Table2, Table1, Table1B, Table3, Table3, Table1B, Table1B, Table2, Table1])
+ testthree = testing.fails_on('maxdb')(testthree)
- @testing.fails_on('maxdb')
def testfour(self):
self.do_testlist([
Table2('t2', [Data('data1'), Data('data2')]),
Table3('t3', [Data('data3')]),
Table1B('t1b', [Data('data4'), Data('data5')])
])
+ testfour = testing.fails_on('maxdb')(testfour)
def do_testlist(self, classes):
sess = create_session( )
q = sess.query(User)
assert [User(id=7, addresses=[Address(id=1, email_address='jack@bean.com')])] == q.filter(users.c.id == 7).all()
- @testing.uses_deprecated('SessionContext')
def test_bindstosession(self):
"""test that lazy loaders use the mapper's contextual session if the parent instance
is not in a session, and that an error is raised if no contextual session"""
assert False
except exceptions.InvalidRequestError, err:
assert "not bound to a Session, and no contextual session" in str(err)
+ test_bindstosession = testing.uses_deprecated('SessionContext')(test_bindstosession)
def test_orderby(self):
mapper(User, users, properties = {
assert len(list(sess)) == 0
self.assertRaises(TypeError, Foo, 'one')
- @testing.uses_deprecated('SessionContext', 'SessionContextExt')
def test_constructorexceptions(self):
"""test that exceptions raised in the mapped class are not masked by sa decorations"""
ex = AssertionError('oops')
assert False
except TypeError:
assert True
+ test_constructorexceptions = testing.uses_deprecated('SessionContext', 'SessionContextExt')(test_constructorexceptions)
def test_props(self):
m = mapper(User, users, properties = {
assert_props(Hoho, ['id', 'name', 'type'])
assert_props(Lala, ['p_employee_number', 'p_id', 'p_name', 'p_type'])
- @testing.uses_deprecated('//select_by', '//join_via', '//list')
def test_recursive_select_by_deprecated(self):
"""test that no endless loop occurs when traversing for select_by"""
m = mapper(User, users, properties={
})
q = create_session().query(m)
q.select_by(email_address='foo')
+ test_recursive_select_by_deprecated = testing.uses_deprecated('//select_by', '//join_via', '//list')(test_recursive_select_by_deprecated)
def test_mappingtojoin(self):
"""test mapping to a join"""
self.assert_result(l, User, user_result[0])
- @testing.uses_deprecated('//select')
def test_customjoin_deprecated(self):
"""test that the from_obj parameter to query.select() can be used
to totally replace the FROM parameters of the generated query."""
q = create_session().query(m)
l = q.select((orderitems.c.item_name=='item 4'), from_obj=[users.join(orders).join(orderitems)])
self.assert_result(l, User, user_result[0])
+ test_customjoin_deprecated = testing.uses_deprecated('//select')(test_customjoin_deprecated)
def test_orderby(self):
"""test ordering at the mapper and query level"""
#l = create_session().query(User).select(order_by=None)
- @testing.unsupported('firebird')
def test_function(self):
"""Test mapping to a SELECT statement that has functions in it."""
print "User", u.user_id, u.user_name, u.concat, u.count
assert l[0].concat == l[0].user_id * 2 == 14
assert l[1].concat == l[1].user_id * 2 == 16
+ test_function = testing.unsupported('firebird')(test_function)
- @testing.unsupported('firebird')
def test_count(self):
"""test the count function on Query.
q = create_session().query(User)
self.assert_(q.count()==3)
self.assert_(q.count(users.c.user_id.in_([8,9]))==2)
+ test_count = testing.unsupported('firebird')(test_count)
- @testing.unsupported('firebird')
- @testing.uses_deprecated('//count_by', '//join_by', '//join_via')
def test_count_by_deprecated(self):
mapper(User, users)
q = create_session().query(User)
self.assert_(q.count_by(user_name='fred')==1)
+ test_count_by_deprecated = testing.uses_deprecated('//count_by', '//join_by', '//join_via')(test_count_by_deprecated)
+ test_count_by_deprecated = testing.unsupported('firebird')(test_count_by_deprecated)
def test_manytomany_count(self):
mapper(Item, orderitems, properties = dict(
def map_(with_explicit_property):
class User(object):
- @extendedproperty
def uc_user_name(self):
if self.user_name is None:
return None
return self.user_name.upper()
+ uc_user_name = extendedproperty(uc_user_name)
if with_explicit_property:
args = (UCComparator, User.uc_user_name)
else:
sess.rollback()
class OptionsTest(MapperSuperTest):
- @testing.fails_on('maxdb')
def test_synonymoptions(self):
sess = create_session()
mapper(User, users, properties = dict(
u = sess.query(User).options(eagerload('adlist')).filter_by(user_name='jack').one()
self.assert_result(u.adlist, Address, *(user_address_result[0]['addresses'][1]))
self.assert_sql_count(testing.db, go, 1)
+ test_synonymoptions = testing.fails_on('maxdb')(test_synonymoptions)
- @testing.uses_deprecated('//select_by')
def test_extension_options(self):
sess = create_session()
class ext1(MapperExtension):
assert l.TEST_2 == "also hello world"
assert not hasattr(l.addresses[0], 'TEST')
assert not hasattr(l.addresses[0], 'TEST2')
+ test_extension_options = testing.uses_deprecated('//select_by')(test_extension_options)
def test_eageroptions(self):
"""tests that a lazy relation can be upgraded to an eager relation via the options method"""
self.assert_result(l, User, *user_address_result)
self.assert_sql_count(testing.db, go, 0)
- @testing.fails_on('maxdb')
def test_eageroptionswithlimit(self):
sess = create_session()
mapper(User, users, properties = dict(
assert u.user_id == 8
assert len(u.addresses) == 3
assert "tbl_row_count" not in self.capture_sql(testing.db, go)
+ test_eageroptionswithlimit = testing.fails_on('maxdb')(test_eageroptionswithlimit)
- @testing.fails_on('maxdb')
def test_lazyoptionswithlimit(self):
sess = create_session()
mapper(User, users, properties = dict(
assert u.user_id == 8
assert len(u.addresses) == 3
self.assert_sql_count(testing.db, go, 1)
+ test_lazyoptionswithlimit = testing.fails_on('maxdb')(test_lazyoptionswithlimit)
def test_eagerdegrade(self):
"""tests that an eager relation automatically degrades to a lazy relation if eager columns are not available"""
m3 = mapper(A, table1, non_primary=True)
- @profile_memory
def go():
sess = create_session()
a1 = A(col2="a1")
for a in alist:
sess.delete(a)
sess.flush()
+ go = profile_memory(go)
go()
metadata.drop_all()
Column('col3', Integer, ForeignKey("mytable.col1"))
)
- @profile_memory
def go():
m1 = mapper(A, table1, properties={
"bs":relation(B)
sess.flush()
sess.close()
clear_mappers()
+ go = profile_memory(go)
metadata.create_all()
try:
Column('col3', String(30)),
)
- @profile_memory
def go():
class A(Base):
pass
# dont need to clear_mappers()
del B
del A
+ go = profile_memory(go)
metadata.create_all()
try:
Column('t2', Integer, ForeignKey('mytable2.col1')),
)
- @profile_memory
def go():
class A(Base):
pass
# dont need to clear_mappers()
del B
del A
+ go = profile_memory(go)
metadata.create_all()
try:
assert sess.get(User, 'jack') is None
assert sess.get(User, 'ed').fullname == 'jack'
- @testing.unsupported('sqlite','mysql')
def test_onetomany_passive(self):
self._test_onetomany(True)
+ test_onetomany_passive = testing.unsupported('sqlite','mysql')(test_onetomany_passive)
def test_onetomany_nonpassive(self):
self._test_onetomany(False)
u1 = sess.get(User, 'fred')
self.assertEquals(User(username='fred', fullname='jack'), u1)
- @testing.unsupported('sqlite', 'mysql')
def test_manytoone_passive(self):
self._test_manytoone(True)
+ test_manytoone_passive = testing.unsupported('sqlite', 'mysql')(test_manytoone_passive)
def test_manytoone_nonpassive(self):
self._test_manytoone(False)
sess.clear()
self.assertEquals([Address(username='ed'), Address(username='ed')], sess.query(Address).all())
- @testing.unsupported('sqlite', 'mysql')
def test_bidirectional_passive(self):
self._test_bidirectional(True)
+ test_bidirectional_passive = testing.unsupported('sqlite', 'mysql')(test_bidirectional_passive)
def test_bidirectional_nonpassive(self):
self._test_bidirectional(False)
self.assertEquals([Address(username='fred'), Address(username='fred')], sess.query(Address).all())
- @testing.unsupported('sqlite', 'mysql')
def test_manytomany_passive(self):
self._test_manytomany(True)
+ test_manytomany_passive = testing.unsupported('sqlite', 'mysql')(test_manytomany_passive)
def test_manytomany_nonpassive(self):
self._test_manytomany(False)
Column('email', String(50)),
Column('username', String(50), ForeignKey('users.username', onupdate="cascade")))
- @testing.unsupported('sqlite','mysql')
def test_onetomany_passive(self):
self._test_onetomany(True)
+ test_onetomany_passive = testing.unsupported('sqlite','mysql')(test_onetomany_passive)
def test_onetomany_nonpassive(self):
self._test_onetomany(False)
self.description = description
class O2OTest(TestBase, AssertsExecutionResults):
- @testing.uses_deprecated('SessionContext')
def setUpAll(self):
global jack, port, metadata, ctx
metadata = MetaData(testing.db)
Column('jack_id', Integer, ForeignKey("jack.id")),
)
metadata.create_all()
+ setUpAll = testing.uses_deprecated('SessionContext')(setUpAll)
def setUp(self):
pass
def tearDown(self):
def tearDownAll(self):
metadata.drop_all()
- @testing.uses_deprecated('SessionContext')
def test1(self):
mapper(Port, port, extension=ctx.mapper_extension)
mapper(Jack, jack, order_by=[jack.c.number],properties = {
ctx.current.delete(j)
ctx.current.flush()
+ test1 = testing.uses_deprecated('SessionContext')(test1)
if __name__ == "__main__":
testenv.main()
except exceptions.SAWarning, e:
assert str(e) == "Query.get() being called on a Query with existing criterion; criterion is being ignored."
- @testing.emits_warning('Query.*')
def warns():
assert s.query(User).filter(User.id==7).get(19) is None
assert s.query(User).join('addresses').filter(Address.user_id==8).get(7).id == u.id
assert s.query(User).join('addresses').filter(Address.user_id==8).load(7).id == u.id
+ warns = testing.emits_warning('Query.*')(warns)
warns()
def test_unique_param_names(self):
assert u2.name =='jack'
assert a not in u2.addresses
- @testing.exclude('mysql', '<', (4, 1))
def test_unicode(self):
"""test that Query.get properly sets up the type for the bind parameter. using unicode would normally fail
on postgres, mysql and oracle unless it is converted to an encoded string"""
LocalFoo(id=ustring, data=ustring))
finally:
metadata.drop_all()
+ test_unicode = testing.exclude('mysql', '<', (4, 1))(test_unicode)
def test_populate_existing(self):
s = create_session()
def test_basic(self):
assert [User(id=7), User(id=8), User(id=9),User(id=10)] == create_session().query(User).all()
- @testing.fails_on('maxdb')
def test_limit(self):
assert [User(id=8), User(id=9)] == create_session().query(User).limit(2).offset(1).all()
assert [User(id=8), User(id=9)] == list(create_session().query(User)[1:3])
assert User(id=8) == create_session().query(User)[1]
+ test_limit = testing.fails_on('maxdb')(test_limit)
def test_onefilter(self):
assert [User(id=8), User(id=9)] == create_session().query(User).filter(User.name.endswith('ed')).all()
assert [User(id=9)] == sess.query(User).filter(User.addresses.any(email_address='fred@fred.com')).all()
- @testing.fails_on_everything_except()
def test_broken_any_1(self):
sess = create_session()
# overcorrelates
assert [User(id=7), User(id=8)] == sess.query(User).join("addresses").filter(~User.addresses.any(Address.email_address=='fred@fred.com')).all()
+ test_broken_any_1 = testing.fails_on_everything_except()(test_broken_any_1)
def test_broken_any_2(self):
sess = create_session()
# works, filter is after the join, but reset_joinpoint is called, removing aliasing
assert [User(id=7), User(id=8)] == sess.query(User).join("addresses", aliased=True).filter(Address.email_address != None).reset_joinpoint().filter(~User.addresses.any(email_address='fred@fred.com')).all()
- @testing.fails_on_everything_except()
def test_broken_any_4(self):
sess = create_session()
# filter is after the join, gets aliased. in 0.5 any(), has() and not contains() are shielded from aliasing
assert [User(id=10)] == sess.query(User).outerjoin("addresses", aliased=True).filter(~User.addresses.any()).all()
+ test_broken_any_4 = testing.fails_on_everything_except()(test_broken_any_4)
- @testing.unsupported('maxdb') # can core
def test_has(self):
sess = create_session()
assert [Address(id=5)] == sess.query(Address).filter(Address.user.has(name='fred')).all()
dingaling = sess.query(Dingaling).get(2)
assert [User(id=9)] == sess.query(User).filter(User.addresses.any(Address.dingaling==dingaling)).all()
+ test_has = testing.unsupported('maxdb')(test_has) # can core
def test_contains_m2m(self):
sess = create_session()
orders = sess.query(Order).filter(Order.id.in_([2, 3, 4]))
assert orders.sum(Order.user_id * Order.address_id) == 79
- @testing.uses_deprecated('Call to deprecated function apply_sum')
def test_apply(self):
sess = create_session()
assert sess.query(Order).apply_sum(Order.user_id * Order.address_id).filter(Order.id.in_([2, 3, 4])).one() == 79
+ test_apply = testing.uses_deprecated('Call to deprecated function apply_sum')(test_apply)
def test_having(self):
sess = create_session()
except exceptions.AssertionError, e:
assert str(e).startswith("Dependency rule tried to blank-out primary key column 'B.id' on instance ")
- @testing.fails_on_everything_except('sqlite', 'mysql')
def test_nullPKsOK_BtoA(self):
# postgres cant handle a nullable PK column...?
tableC = Table('tablec', tableA.metadata,
sess.save(c1)
# test that no error is raised.
sess.flush()
+ test_nullPKsOK_BtoA = testing.fails_on_everything_except('sqlite', 'mysql')(test_nullPKsOK_BtoA)
def test_delete_cascade_BtoA(self):
"""test that the 'blank the PK' error doesnt get raised when the child is to be deleted as part of a
selectable = select(["x", "y", "z"])
self.assertRaisesMessage(exceptions.InvalidRequestError, "Could not find any Table objects", mapper, Subset, selectable)
- @testing.emits_warning('.*creating an Alias.*')
def test_basic(self):
class Subset(Base):
pass
subset_select = class_mapper(Subset).mapped_table
self.assertEquals(sess.query(Subset).filter(subset_select.c.data==1).one(), Subset(data=1))
+ test_basic = testing.emits_warning('.*creating an Alias.*')(test_basic)
# TODO: more tests mapping to selects
# then see if expunge fails
session.expunge(u)
- @engines.close_open_connections
def test_binds_from_expression(self):
"""test that Session can extract Table objects from ClauseElements and match them to tables."""
Session = sessionmaker(binds={users:testing.db, addresses:testing.db})
sess.execute(users.insert(), params=dict(user_id=2, user_name='fred'))
assert sess.execute(users.select()).fetchall() == [(1, 'ed'), (2, 'fred')]
sess.close()
+ test_binds_from_expression = engines.close_open_connections(test_binds_from_expression)
- @engines.close_open_connections
def test_bind_from_metadata(self):
Session = sessionmaker()
sess = Session()
assert len(sess.query(User).all()) == 0
sess.close()
+ test_bind_from_metadata = engines.close_open_connections(test_bind_from_metadata)
- @testing.unsupported('sqlite', 'mssql') # TEMP: test causes mssql to hang
- @engines.close_open_connections
def test_transaction(self):
class User(object):pass
mapper(User, users)
assert conn1.execute("select count(1) from users").scalar() == 1
assert testing.db.connect().execute("select count(1) from users").scalar() == 1
sess.close()
+ test_transaction = engines.close_open_connections(test_transaction)
+ test_transaction = testing.unsupported('sqlite', 'mssql')(test_transaction) # TEMP: test causes mssql to hang
def test_flush_noop(self):
session = create_session()
sess.add(u1)
assert u1 in sess
- @testing.unsupported('sqlite', 'mssql') # TEMP: test causes mssql to hang
- @engines.close_open_connections
def test_autoflush(self):
class User(object):pass
mapper(User, users)
assert conn1.execute("select count(1) from users").scalar() == 1
assert testing.db.connect().execute("select count(1) from users").scalar() == 1
sess.close()
+ test_autoflush = engines.close_open_connections(test_autoflush)
+ test_autoflush = testing.unsupported('sqlite', 'mssql')(test_autoflush) # TEMP: test causes mssql to hang
def test_autoflush_expressions(self):
class User(fixtures.Base):
sess.save(u)
self.assertEquals(sess.query(Address).filter(Address.user==u).one(), Address(email_address='foo'))
- @testing.unsupported('sqlite', 'mssql') # TEMP: test causes mssql to hang
- @engines.close_open_connections
def test_autoflush_unbound(self):
class User(object):pass
mapper(User, users)
except:
sess.rollback()
raise
+ test_autoflush_unbound = engines.close_open_connections(test_autoflush_unbound)
+ test_autoflush_unbound = testing.unsupported('sqlite', 'mssql')(test_autoflush_unbound) # TEMP: test causes mssql to hang
- @engines.close_open_connections
def test_autoflush_2(self):
class User(object):pass
mapper(User, users)
assert conn1.execute("select count(1) from users").scalar() == 1
assert testing.db.connect().execute("select count(1) from users").scalar() == 1
sess.commit()
+ test_autoflush_2 = engines.close_open_connections(test_autoflush_2)
# TODO: not doing rollback of attributes right now.
def dont_test_autoflush_rollback(self):
sess.rollback()
assert not sess.is_active
- @engines.close_open_connections
def test_external_joined_transaction(self):
class User(object):pass
mapper(User, users)
trans.rollback() # rolls back
assert len(sess.query(User).all()) == 0
sess.close()
+ test_external_joined_transaction = engines.close_open_connections(test_external_joined_transaction)
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @engines.close_open_connections
def test_external_nested_transaction(self):
class User(object):pass
mapper(User, users)
except:
conn.close()
raise
+ test_external_nested_transaction = engines.close_open_connections(test_external_nested_transaction)
+ test_external_nested_transaction = testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb')(test_external_nested_transaction)
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @engines.close_open_connections
def test_heavy_nesting(self):
session = create_session(bind=testing.db)
session.commit()
assert session.connection().execute("select count(1) from users").scalar() == 2
+ test_heavy_nesting = engines.close_open_connections(test_heavy_nesting)
+ test_heavy_nesting = testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb')(test_heavy_nesting)
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @testing.exclude('mysql', '<', (5, 0, 3))
def test_twophase(self):
# TODO: mock up a failure condition here
# to ensure a rollback succeeds
engine2.dispose()
assert users.count().scalar() == 1
assert addresses.count().scalar() == 1
+ test_twophase = testing.exclude('mysql', '<', (5, 0, 3))(test_twophase)
+ test_twophase = testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb')(test_twophase)
def test_joined_transaction(self):
class User(object):pass
assert len(sess.query(User).all()) == 0
sess.close()
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @testing.exclude('mysql', '<', (5, 0, 3))
def test_nested_transaction(self):
class User(object):pass
mapper(User, users)
sess.commit()
assert len(sess.query(User).all()) == 1
sess.close()
+ test_nested_transaction = testing.exclude('mysql', '<', (5, 0, 3))(test_nested_transaction)
+ test_nested_transaction = testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb')(test_nested_transaction)
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @testing.exclude('mysql', '<', (5, 0, 3))
def test_nested_autotrans(self):
class User(object):pass
mapper(User, users)
sess.commit()
assert len(sess.query(User).all()) == 1
sess.close()
+ test_nested_autotrans = testing.exclude('mysql', '<', (5, 0, 3))(test_nested_autotrans)
+ test_nested_autotrans = testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb')(test_nested_autotrans)
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @testing.exclude('mysql', '<', (5, 0, 3))
def test_nested_transaction_connection_add(self):
class User(object): pass
mapper(User, users)
self.assertEquals(util.Set(sess.query(User).all()), util.Set([u2]))
sess.close()
+ test_nested_transaction_connection_add = testing.exclude('mysql', '<', (5, 0, 3))(test_nested_transaction_connection_add)
+ test_nested_transaction_connection_add = testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb')(test_nested_transaction_connection_add)
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @testing.exclude('mysql', '<', (5, 0, 3))
def test_mixed_transaction_control(self):
class User(object): pass
mapper(User, users)
assert sess.transaction is t1
sess.close()
+ test_mixed_transaction_control = testing.exclude('mysql', '<', (5, 0, 3))(test_mixed_transaction_control)
+ test_mixed_transaction_control = testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb')(test_mixed_transaction_control)
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @testing.exclude('mysql', '<', (5, 0, 3))
def test_mixed_transaction_close(self):
class User(object): pass
mapper(User, users)
sess.close()
self.assertEquals(len(sess.query(User).all()), 1)
+ test_mixed_transaction_close = testing.exclude('mysql', '<', (5, 0, 3))(test_mixed_transaction_close)
+ test_mixed_transaction_close = testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb')(test_mixed_transaction_close)
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @testing.exclude('mysql', '<', (5, 0, 3))
def test_error_on_using_inactive_session(self):
class User(object): pass
mapper(User, users)
except exceptions.InvalidRequestError, e:
self.assertEquals(str(e), "The transaction is inactive due to a rollback in a subtransaction and should be closed")
sess.close()
+ test_error_on_using_inactive_session = testing.exclude('mysql', '<', (5, 0, 3))(test_error_on_using_inactive_session)
+ test_error_on_using_inactive_session = testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', 'oracle', 'maxdb')(test_error_on_using_inactive_session)
- @engines.close_open_connections
def test_bound_connection(self):
class User(object):pass
mapper(User, users)
transaction.rollback()
assert len(sess.query(User).all()) == 0
sess.close()
+ test_bound_connection = engines.close_open_connections(test_bound_connection)
def test_bound_connection_transactional(self):
class User(object):pass
assert c.scalar("select count(1) from users") == 1
- @engines.close_open_connections
def test_save_update_delete(self):
s = create_session()
s.flush()
assert user not in s
assert s.query(User).count() == 0
+ test_save_update_delete = engines.close_open_connections(test_save_update_delete)
def test_is_modified(self):
s = create_session()
obj2 = class_()
assert context.current == object_session(obj2)
- @testing.uses_deprecated('SessionContext')
def test_mapper_extension(self):
context = SessionContext(Session)
class User(object): pass
User.mapper = mapper(User, users, extension=context.mapper_extension)
self.do_test(User, context)
+ test_mapper_extension = testing.uses_deprecated('SessionContext')(test_mapper_extension)
if __name__ == "__main__":
Column('value', String(40), nullable=False)
)
- @engines.close_open_connections
def test_basic(self):
s = Session(scope=None)
class Foo(object):pass
success = True
if testing.db.dialect.supports_sane_multi_rowcount:
assert success
+ test_basic = engines.close_open_connections(test_basic)
- @engines.close_open_connections
def test_versioncheck(self):
"""test that query.with_lockmode performs a 'version check' on an already loaded instance"""
s1 = Session(scope=None)
# assert brand new load is OK too
s1.close()
s1.query(Foo).with_lockmode('read').get(f1s1.id)
+ test_versioncheck = engines.close_open_connections(test_versioncheck)
- @engines.close_open_connections
def test_noversioncheck(self):
"""test that query.with_lockmode works OK when the mapper has no version id col"""
s1 = Session()
f1s2 = s2.query(Foo).with_lockmode('read').get(f1s1.id)
assert f1s2.id == f1s1.id
assert f1s2.value == f1s1.value
+ test_noversioncheck = engines.close_open_connections(test_noversioncheck)
class UnicodeTest(ORMTest):
def define_tables(self, metadata):
Column('data', String(30), )
)
- # not supported on sqlite since sqlite's auto-pk generation only works with
- # single column primary keys
- @testing.fails_on('sqlite')
def test_primarykey(self):
class Entry(object):
pass
Session.close()
e2 = Query(Entry).get((e.multi_id, 2))
self.assert_(e is not e2 and e._instance_key == e2._instance_key)
+ # not supported on sqlite since sqlite's auto-pk generation only works with
+ # single column primary keys
+ test_primarykey = testing.fails_on('sqlite')(test_primarykey)
# this one works with sqlite since we are manually setting up pk values
def test_manualpk(self):
assert u.name == 'test2'
assert u.counter == 2
- @testing.unsupported('mssql')
def test_insert(self):
class User(object):
pass
sess.save(u)
sess.flush()
assert (u.counter == 5) is True
+ test_insert = testing.unsupported('mssql')(test_insert)
class PassiveDeletesTest(ORMTest):
test_needs_fk=True,
)
- @testing.unsupported('sqlite')
def test_basic(self):
class MyClass(object):
pass
sess.commit()
assert mytable.count().scalar() == 0
assert myothertable.count().scalar() == 0
+ test_basic = testing.unsupported('sqlite')(test_basic)
class ExtraPassiveDeletesTest(ORMTest):
def define_tables(self, metadata):
except exceptions.ArgumentError, e:
assert str(e) == "Can't set passive_deletes='all' in conjunction with 'delete' or 'delete-orphan' cascade"
- @testing.unsupported('sqlite')
def test_extra_passive(self):
class MyClass(object):
pass
mc = sess.query(MyClass).get(mc.id)
sess.delete(mc)
self.assertRaises(exceptions.DBAPIError, sess.commit)
+ test_extra_passive = testing.unsupported('sqlite')(test_extra_passive)
- @testing.unsupported('sqlite')
def test_extra_passive_2(self):
class MyClass(object):
pass
sess.delete(mc)
mc.children[0].data = 'some new data'
self.assertRaises(exceptions.DBAPIError, sess.commit)
+ test_extra_passive_2 = testing.unsupported('sqlite')(test_extra_passive_2)
class DefaultTest(ORMTest):
# why no support on oracle ? because oracle doesn't save
# "blank" strings; it saves a single space character.
- @testing.unsupported('oracle')
def test_dont_update_blanks(self):
mapper(User, users)
u = User()
def go():
Session.commit()
self.assert_sql_count(testing.db, go, 0)
+ test_dont_update_blanks = testing.unsupported('oracle')(test_dont_update_blanks)
def test_multitable(self):
"""tests a save of an object where each instance spans two tables. also tests
Column('c1', Integer, primary_key=True),
Column('c2', String(30)))
- @profiling.function_call_count(74, {'2.3': 44, '2.4': 42})
def test_insert(self):
t1.insert().compile()
+ test_insert = profiling.function_call_count(74, {'2.3': 44, '2.4': 42})(test_insert)
- @profiling.function_call_count(75, {'2.3': 47, '2.4': 42})
def test_update(self):
t1.update().compile()
+ test_update = profiling.function_call_count(75, {'2.3': 47, '2.4': 42})(test_update)
- @profiling.function_call_count(228, versions={'2.3': 153, '2.4':116})
def test_select(self):
s = select([t1], t1.c.c2==t2.c.c1)
s.compile()
+ test_select = profiling.function_call_count(228, versions={'2.3': 153, '2.4':116})(test_select)
if __name__ == '__main__':
# and though the solution there is simple, it still doesn't solve the
# issue of "dead" weakrefs sitting in the dict taking up space
- @profiling.function_call_count(63, {'2.3': 42, '2.4': 43})
def test_first_connect(self):
conn = pool.connect()
+ test_first_connect = profiling.function_call_count(63, {'2.3': 42, '2.4': 43})(test_first_connect)
def test_second_connect(self):
conn = pool.connect()
conn.close()
- @profiling.function_call_count(39, {'2.3': 26, '2.4': 26})
def go():
conn2 = pool.connect()
return conn2
+ go = profiling.function_call_count(39, {'2.3': 26, '2.4': 26})(go)
c2 = go()
def test_second_samethread_connect(self):
conn = pool.connect()
- @profiling.function_call_count(7, {'2.3': 4, '2.4': 4})
def go():
return pool.connect()
+ go = profiling.function_call_count(7, {'2.3': 4, '2.4': 4})(go)
c2 = go()
engine = create_engine('postgres:///', creator=player)
metadata = MetaData(engine)
- @profiling.function_call_count(3230, {'2.4': 1796})
def test_profile_1_create_tables(self):
self.test_baseline_1_create_tables()
+ test_profile_1_create_tables = profiling.function_call_count(3230, {'2.4': 1796})(test_profile_1_create_tables)
- @profiling.function_call_count(6064, {'2.4': 3635})
def test_profile_1a_populate(self):
self.test_baseline_1a_populate()
+ test_profile_1a_populate = profiling.function_call_count(6064, {'2.4': 3635})(test_profile_1a_populate)
- @profiling.function_call_count(339, {'2.4': 195})
def test_profile_2_insert(self):
self.test_baseline_2_insert()
+ test_profile_2_insert = profiling.function_call_count(339, {'2.4': 195})(test_profile_2_insert)
- @profiling.function_call_count(4923, {'2.4': 2557})
def test_profile_3_properties(self):
self.test_baseline_3_properties()
+ test_profile_3_properties = profiling.function_call_count(4923, {'2.4': 2557})(test_profile_3_properties)
- @profiling.function_call_count(18119, {'2.4': 10549})
def test_profile_4_expressions(self):
self.test_baseline_4_expressions()
+ test_profile_4_expressions = profiling.function_call_count(18119, {'2.4': 10549})(test_profile_4_expressions)
- @profiling.function_call_count(1617, {'2.4': 1032})
def test_profile_5_aggregates(self):
self.test_baseline_5_aggregates()
+ test_profile_5_aggregates = profiling.function_call_count(1617, {'2.4': 1032})(test_profile_5_aggregates)
- @profiling.function_call_count(1988, {'2.4': 1048})
def test_profile_6_editing(self):
self.test_baseline_6_editing()
+ test_profile_6_editing = profiling.function_call_count(1988, {'2.4': 1048})(test_profile_6_editing)
- @profiling.function_call_count(3614, {'2.4': 2198})
def test_profile_7_multiview(self):
self.test_baseline_7_multiview()
+ test_profile_7_multiview = profiling.function_call_count(3614, {'2.4': 2198})(test_profile_7_multiview)
def test_profile_8_drop(self):
self.test_baseline_8_drop()
def tearDownAll(self):
info_table.drop()
- @testing.fails_on('maxdb')
def testcase(self):
inner = select([case([
[info_table.c.pk < 3,
(6, 5, 'pk_5_data'),
(0, 6, 'pk_6_data')
]
+ testcase = testing.fails_on('maxdb')(testcase)
def test_literal_interpretation(self):
t = table('test', column('col1'))
self.assert_compile(case([(t.c.col1==7, "y")], else_="z"), "CASE WHEN (test.col1 = :col1_1) THEN :param_1 ELSE :param_2 END")
- @testing.fails_on('maxdb')
def testcase_with_dict(self):
query = select([case({
info_table.c.pk < 3: 'lessthan3',
('two', 2),
('other', 3),
]
+ testcase_with_dict = testing.fails_on('maxdb')(testcase_with_dict)
if __name__ == "__main__":
testenv.main()
)
metadata.create_all()
- @testing.unsupported('mysql')
def test_check_constraint(self):
foo = Table('foo', metadata,
Column('id', Integer, primary_key=True),
assert False
except exceptions.SQLError:
assert True
+ test_check_constraint = testing.unsupported('mysql')(test_check_constraint)
def test_unique_constraint(self):
foo = Table('foo', metadata,
l = l.fetchone()
self.assert_(l['col3'] == 55)
- @testing.fails_on_everything_except('postgres')
def testpassiveoverride(self):
"""primarily for postgres, tests that when we get a primary key column back
from reflecting a table which has a default value on it, we pre-execute
self.assert_(l == [(1, 'user', 'lala')])
finally:
testing.db.execute("drop table speedy_users", None)
+ testpassiveoverride = testing.fails_on_everything_except('postgres')(testpassiveoverride)
class PKDefaultTest(TestBase):
def setUpAll(self):
def tearDownAll(self):
metadata.drop_all()
- @testing.unsupported('mssql')
def test_basic(self):
t2.insert().execute(nextid=1)
r = t1.insert().execute(data='hi')
t2.insert().execute(nextid=2)
r = t1.insert().execute(data='there')
assert r.last_inserted_ids() == [2]
+ test_basic = testing.unsupported('mssql')(test_basic)
class AutoIncrementTest(TestBase):
def tearDown(self):
aimeta.drop_all()
- # should fail everywhere... was: @supported('postgres', 'mysql', 'maxdb')
- @testing.fails_on('sqlite')
def testnonautoincrement(self):
# sqlite INT primary keys can be non-unique! (only for ints)
meta = MetaData(testing.db)
nonai_table.insert().execute(id=1, data='row 1')
finally:
nonai_table.drop()
+ # should fail everywhere... was: @supported('postgres', 'mysql', 'maxdb')
+ testnonautoincrement = testing.fails_on('sqlite')(testnonautoincrement)
# TODO: add coverage for increment on a secondary column in a key
def _test_autoincrement(self, bind):
cartitems.select().execute().fetchall()
- @testing.fails_on('maxdb')
- # maxdb db-api seems to double-execute NEXTVAL internally somewhere,
- # throwing off the numbers for these tests...
def test_implicit_sequence_exec(self):
s = Sequence("my_sequence", metadata=MetaData(testing.db))
s.create()
self.assert_(x == 1)
finally:
s.drop()
+ # maxdb db-api seems to double-execute NEXTVAL internally somewhere,
+ # throwing off the numbers for these tests...
+ test_implicit_sequence_exec = testing.fails_on('maxdb')(test_implicit_sequence_exec)
- @testing.fails_on('maxdb')
def teststandalone_explicit(self):
s = Sequence("my_sequence")
s.create(bind=testing.db)
self.assert_(x == 1)
finally:
s.drop(testing.db)
+ teststandalone_explicit = testing.fails_on('maxdb')(teststandalone_explicit)
def test_checkfirst(self):
s = Sequence("my_sequence")
s.drop(testing.db, checkfirst=False)
s.drop(testing.db, checkfirst=True)
- @testing.fails_on('maxdb')
def teststandalone2(self):
x = cartitems.c.cart_id.sequence.execute()
self.assert_(1 <= x <= 4)
+ teststandalone2 = testing.fails_on('maxdb')(teststandalone2)
def tearDownAll(self):
metadata.drop_all()
finally:
meta.drop_all()
- @testing.fails_on_everything_except('postgres')
def test_as_from(self):
# TODO: shouldnt this work on oracle too ?
x = testing.db.func.current_date().execute().scalar()
r = s.alias('datequery').select().scalar()
assert x == y == z == w == q == r
+ test_as_from = testing.fails_on_everything_except('postgres')(test_as_from)
def exec_sorted(statement, *args, **kw):
"""Executes a statement and returns a sorted list plain tuple rows."""
"(SELECT table1.col1 AS col1, table1.col2 AS col2, table1.col3 AS col3 FROM table1 WHERE table1.col1 = :col1_2) AS anon_2 "\
"WHERE anon_1.col2 = anon_2.col2")
- @testing.emits_warning('.*replaced by another column with the same key')
def test_alias(self):
subq = t2.select().alias('subq')
s = select([t1.c.col1, subq.c.col1], from_obj=[t1, subq, t1.join(subq, t1.c.col1==subq.c.col2)])
s4 = sql_util.ClauseAdapter(table('foo')).traverse(s3, clone=True)
assert orig == str(s) == str(s3) == str(s4)
+ test_alias = testing.emits_warning('.*replaced by another column with the same key')(test_alias)
def test_correlated_select(self):
s = select(['*'], t1.c.col1==t2.c.col1, from_obj=[t1, t2]).correlate(t2)
r = users.select(limit=3, order_by=[users.c.user_id]).execute().fetchall()
self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r))
- @testing.unsupported('mssql')
- @testing.fails_on('maxdb')
def test_select_limit_offset(self):
users.insert().execute(user_id=1, user_name='john')
users.insert().execute(user_id=2, user_name='jack')
self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')])
r = users.select(offset=5, order_by=[users.c.user_id]).execute().fetchall()
self.assert_(r==[(6, 'ralph'), (7, 'fido')])
+ test_select_limit_offset = testing.fails_on('maxdb')(test_select_limit_offset)
+ test_select_limit_offset = testing.unsupported('mssql')(test_select_limit_offset)
- @testing.exclude('mysql', '<', (5, 0, 37))
def test_scalar_select(self):
"""test that scalar subqueries with labels get their type propigated to the result set."""
# mysql and/or mysqldb has a bug here, type isn't propagated for scalar
assert isinstance(s2.execute().fetchone()['somelabel'], datetime.datetime)
finally:
datetable.drop()
+ test_scalar_select = testing.exclude('mysql', '<', (5, 0, 37))(test_scalar_select)
def test_order_by(self):
"""Exercises ORDER BY clause generation.
self.assertEqual([x.lower() for x in r.keys()], ['user_name', 'user_id'])
self.assertEqual(r.values(), ['foo', 1])
- @testing.unsupported('oracle', 'firebird', 'maxdb')
def test_column_accessor_shadow(self):
meta = MetaData(testing.db)
shadowed = Table('test_shadowed', meta,
r.close()
finally:
shadowed.drop(checkfirst=True)
+ test_column_accessor_shadow = testing.unsupported('oracle', 'firebird', 'maxdb')(test_column_accessor_shadow)
- @testing.fails_on('maxdb')
def test_in_filtering(self):
"""test the behavior of the in_() function."""
s = users.select(users.c.user_name.in_([]) == None)
r = s.execute().fetchall()
assert len(r) == 1
+ test_in_filtering = testing.fails_on('maxdb')(test_in_filtering)
class CompoundTest(TestBase):
('ccc', 'aaa')]
self.assertEquals(u.execute().fetchall(), wanted)
- @testing.fails_on('maxdb')
def test_union_ordered_alias(self):
(s1, s2) = (
select([t1.c.col3.label('col3'), t1.c.col4.label('col4')],
wanted = [('aaa', 'aaa'), ('bbb', 'bbb'), ('bbb', 'ccc'),
('ccc', 'aaa')]
self.assertEquals(u.alias('bar').select().execute().fetchall(), wanted)
+ test_union_ordered_alias = testing.fails_on('maxdb')(test_union_ordered_alias)
- @testing.unsupported('sqlite', 'mysql', 'oracle')
def test_union_all(self):
e = union_all(
select([t1.c.col3]),
found2 = self._fetchall_sorted(e.alias('foo').select().execute())
self.assertEquals(found2, wanted)
+ test_union_all = testing.unsupported('sqlite', 'mysql', 'oracle')(test_union_all)
- @testing.unsupported('firebird', 'mysql', 'sybase')
def test_intersect(self):
i = intersect(
select([t2.c.col3, t2.c.col4]),
found2 = self._fetchall_sorted(i.alias('bar').select().execute())
self.assertEquals(found2, wanted)
+ test_intersect = testing.unsupported('firebird', 'mysql', 'sybase')(test_intersect)
- @testing.unsupported('firebird', 'mysql', 'oracle', 'sybase')
def test_except_style1(self):
e = except_(union(
select([t1.c.col3, t1.c.col4]),
found = self._fetchall_sorted(e.alias('bar').select().execute())
self.assertEquals(found, wanted)
+ test_except_style1 = testing.unsupported('firebird', 'mysql', 'oracle', 'sybase')(test_except_style1)
- @testing.unsupported('firebird', 'mysql', 'oracle', 'sybase')
def test_except_style2(self):
e = except_(union(
select([t1.c.col3, t1.c.col4]),
found2 = self._fetchall_sorted(e.alias('bar').select().execute())
self.assertEquals(found2, wanted)
+ test_except_style2 = testing.unsupported('firebird', 'mysql', 'oracle', 'sybase')(test_except_style2)
- @testing.unsupported('firebird', 'mysql', 'oracle', 'sqlite', 'sybase')
def test_except_style3(self):
# aaa, bbb, ccc - (aaa, bbb, ccc - (ccc)) = ccc
e = except_(
self.assertEquals(e.execute().fetchall(), [('ccc',)])
self.assertEquals(e.alias('foo').select().execute().fetchall(),
[('ccc',)])
+ test_except_style3 = testing.unsupported('firebird', 'mysql', 'oracle', 'sqlite', 'sybase')(test_except_style3)
- @testing.unsupported('firebird', 'mysql')
def test_composite(self):
u = intersect(
select([t2.c.col3, t2.c.col4]),
found = self._fetchall_sorted(u.execute())
self.assertEquals(found, wanted)
+ test_composite = testing.unsupported('firebird', 'mysql')(test_composite)
- @testing.unsupported('firebird', 'mysql')
def test_composite_alias(self):
ua = intersect(
select([t2.c.col3, t2.c.col4]),
wanted = [('aaa', 'bbb'), ('bbb', 'ccc'), ('ccc', 'aaa')]
found = self._fetchall_sorted(ua.select().execute())
self.assertEquals(found, wanted)
+ test_composite_alias = testing.unsupported('firebird', 'mysql')(test_composite_alias)
class JoinTest(TestBase):
def tearDownAll(self):
metadata.drop_all()
- @testing.fails_on('maxdb')
def test_modulo(self):
self.assertEquals(
select([flds.c.intcol % 3],
order_by=flds.c.idcol).execute().fetchall(),
[(2,),(1,)]
)
+ test_modulo = testing.fails_on('maxdb')(test_modulo)
print res2
assert(res2==[(1,2,3),(2,2,3),(4,3,2)])
- @testing.unsupported('oracle')
def testlabels(self):
"""test the quoting of labels.
where the "UPPERCASE" column of "LaLa" doesnt exist.
"""
x = table1.select(distinct=True).alias("LaLa").select().scalar()
+ testlabels = testing.unsupported('oracle')(testlabels)
def testlabels2(self):
metadata = MetaData()
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable ORDER BY (SELECT myothertable.otherid FROM myothertable WHERE mytable.myid = myothertable.otherid) DESC"
)
- @testing.uses_deprecated('scalar option')
def test_scalar_select(self):
try:
s = select([table1.c.myid, table1.c.name]).as_scalar()
j1 = table1.join(table2, table1.c.myid==table2.c.otherid)
s2 = select([table1, s1], from_obj=j1)
self.assert_compile(s2, "SELECT mytable.myid, mytable.name, mytable.description, (SELECT t2alias.otherid FROM myothertable AS t2alias WHERE mytable.myid = t2alias.otherid) AS anon_1 FROM mytable JOIN myothertable ON mytable.myid = myothertable.otherid")
+ test_scalar_select = testing.uses_deprecated('scalar option')(test_scalar_select)
def test_label_comparison(self):
x = func.lala(table1.c.myid).label('foo')
self.assert_compile(s, "SELECT foo, bar UNION SELECT foo, bar UNION (SELECT foo, bar UNION SELECT foo, bar)")
- @testing.uses_deprecated('//get_params')
def test_binds(self):
for (
stmt,
s = select([table1], or_(table1.c.myid==7, table1.c.myid==8, table1.c.myid==bindparam('myid_1')))
self.assertRaisesMessage(exceptions.CompileError, "conflicts with unique bind parameter of the same name", str, s)
+ test_binds = testing.uses_deprecated('//get_params')(test_binds)
self.assert_compile(select([table1], table1.c.myid.in_([])),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (CASE WHEN (mytable.myid IS NULL) THEN NULL ELSE 0 END = 1)")
- @testing.uses_deprecated('passing in_')
def test_in_deprecated_api(self):
self.assert_compile(select([table1], table1.c.myid.in_('abc')),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE mytable.myid IN (:myid_1)")
self.assert_compile(select([table1], table1.c.myid.in_()),
"SELECT mytable.myid, mytable.name, mytable.description FROM mytable WHERE (CASE WHEN (mytable.myid IS NULL) THEN NULL ELSE 0 END = 1)")
+ test_in_deprecated_api = testing.uses_deprecated('passing in_')(test_in_deprecated_api)
def test_cast(self):
tbl = table('casttest',
self.assert_compile(t.update(inline=True, values={'col3':'foo'}), "UPDATE test SET col1=foo(:foo_1), col2=(SELECT coalesce(max(foo.id)) AS coalesce_1 FROM foo), col3=:col3")
class SchemaTest(TestBase, AssertsCompiledSQL):
- @testing.fails_on('mssql')
def test_select(self):
# these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables
self.assert_compile(table4.select(), "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id, remote_owner.remotetable.value FROM remote_owner.remotetable")
self.assert_compile(s, "SELECT remote_owner.remotetable.rem_id AS remote_owner_remotetable_rem_id, remote_owner.remotetable.datatype_id AS remote_owner_remotetable_datatype_id, remote_owner.remotetable.value "\
"AS remote_owner_remotetable_value FROM remote_owner.remotetable WHERE "\
"remote_owner.remotetable.datatype_id = :datatype_id_1 AND remote_owner.remotetable.value = :value_1")
+ test_select = testing.fails_on('mssql')(test_select)
def test_alias(self):
a = alias(table4, 'remtable')
assert j4.corresponding_column(j2.c.aid) is j4.c.aid
assert j4.corresponding_column(a.c.id) is j4.c.id
- @testing.emits_warning('.*replaced by another column with the same key')
def test_oid(self):
# the oid column of a selectable currently proxies all
# oid columns found within.
assert u.corresponding_column(table2.oid_column) is u.oid_column
assert u.corresponding_column(s.oid_column) is u.oid_column
assert u.corresponding_column(s2.oid_column) is u.oid_column
+ test_oid = testing.emits_warning('.*replaced by another column with the same key')(test_oid)
def test_two_metadata_join_raises(self):
m = MetaData()
except exceptions.InvalidRequestError, e:
assert str(e) == "Unicode type received non-unicode bind param value 'im not unicode'"
- @testing.emits_warning('.*non-unicode bind')
def warns():
# test that data still goes in if warning is emitted....
unicode_table.insert().execute(unicode_varchar='not unicode')
assert (select([unicode_table.c.unicode_varchar]).execute().fetchall() == [('not unicode', )])
+ warns = testing.emits_warning('.*non-unicode bind')(warns)
warns()
finally:
unicode_engine.dispose()
- @testing.fails_on('oracle')
def testblanks(self):
unicode_table.insert().execute(unicode_varchar=u'')
assert select([unicode_table.c.unicode_varchar]).scalar() == u''
+ testblanks = testing.fails_on('oracle')(testblanks)
def testengineparam(self):
"""tests engine-wide unicode conversion"""
testing.db.engine.dialect.convert_unicode = prev_unicode
testing.db.engine.dialect.convert_unicode = prev_assert
- @testing.unsupported('oracle')
def testlength(self):
"""checks the database correctly understands the length of a unicode string"""
teststr = u'aaa\x1234'
self.assert_(testing.db.func.length(teststr).scalar() == len(teststr))
+ testlength = testing.unsupported('oracle')(testlength)
class BinaryTest(TestBase, AssertsExecutionResults):
def setUpAll(self):
def tearDown(self):
numeric_table.delete().execute()
- @testing.fails_if(_missing_decimal)
def test_decimal(self):
from decimal import Decimal
numeric_table.insert().execute(
(1, 3.5, 5.6, Decimal("12.4"), Decimal("15.75")),
(2, 3.5, 5.6, Decimal("12.4"), Decimal("15.75")),
]
+ test_decimal = testing.fails_if(_missing_decimal)(test_decimal)
- @testing.emits_warning('True Decimal types not available')
def test_decimal_fallback(self):
from sqlalchemy.util import Decimal # could be Decimal or float
for row in numeric_table.select().execute().fetchall():
assert isinstance(row['ncasdec'], util.decimal_type)
assert isinstance(row['fcasdec'], util.decimal_type)
+ test_decimal_fallback = testing.emits_warning('True Decimal types not available')(test_decimal_fallback)
class IntervalTest(TestBase, AssertsExecutionResults):
from sqlalchemy.sql import column
class UnicodeSchemaTest(TestBase):
- @testing.unsupported('maxdb', 'oracle', 'sybase')
def setUpAll(self):
global unicode_bind, metadata, t1, t2, t3
test_needs_fk=True,
)
metadata.create_all()
+ setUpAll = testing.unsupported('maxdb', 'oracle', 'sybase')(setUpAll)
- @testing.unsupported('maxdb', 'oracle', 'sybase')
def tearDown(self):
if metadata.tables:
t3.delete().execute()
t2.delete().execute()
t1.delete().execute()
+ tearDown = testing.unsupported('maxdb', 'oracle', 'sybase')(tearDown)
- @testing.unsupported('maxdb', 'oracle', 'sybase')
def tearDownAll(self):
global unicode_bind
metadata.drop_all()
del unicode_bind
+ tearDownAll = testing.unsupported('maxdb', 'oracle', 'sybase')(tearDownAll)
- @testing.unsupported('maxdb', 'oracle', 'sybase')
def test_insert(self):
t1.insert().execute({u'méil':1, u'\u6e2c\u8a66':5})
t2.insert().execute({'a':1, 'b':1})
assert t1.select().execute().fetchall() == [(1, 5)]
assert t2.select().execute().fetchall() == [(1, 1)]
assert t3.select().execute().fetchall() == [(1, 5, 1, 1)]
+ test_insert = testing.unsupported('maxdb', 'oracle', 'sybase')(test_insert)
- @testing.unsupported('maxdb', 'oracle', 'sybase')
def test_reflect(self):
t1.insert().execute({u'méil':2, u'\u6e2c\u8a66':7})
t2.insert().execute({'a':2, 'b':2})
[(2, 7, 2, 2), (1, 5, 1, 1)])
meta.drop_all()
metadata.create_all()
+ test_reflect = testing.unsupported('maxdb', 'oracle', 'sybase')(test_reflect)
class EscapesDefaultsTest(testing.TestBase):
def test_default_exec(self):
FixtureTest.metadata = metadata
class Fixtures(object):
- @property
def user_address_result(self):
return [
User(id=7, addresses=[
]),
User(id=10, addresses=[])
]
+ user_address_result = property(user_address_result)
- @property
def user_all_result(self):
return [
User(id=7, addresses=[
]),
User(id=10, addresses=[])
]
+ user_all_result = property(user_all_result)
- @property
def user_order_result(self):
return [
User(id=7, orders=[
]),
User(id=10)
]
+ user_order_result = property(user_order_result)
- @property
def item_keyword_result(self):
return [
Item(id=1, keywords=[Keyword(name='red'), Keyword(name='big'), Keyword(name='round')]),
Item(id=4, keywords=[]),
Item(id=5, keywords=[]),
]
+ item_keyword_result = property(item_keyword_result)
fixtures = Fixtures()