the tree can now run the full suite of tests out of the box.
- Migrated most @supported to @fails_on, @fails_on_everything_but, or (last
resort) @unsupported. @fails_on revealed a slew of bogus test skippage,
which was corrected.
- Added @fails_on_everything_but. Yes, the first usage *was*
"fails_on_everything_but('postgres')". How did you guess!
- Migrated @supported in dialect/* to the new test-class attribute __only_on__.
- Test classes can also have __unsupported_on__ and __excluded_on__.
from sqlalchemy.sql import table, column
from testlib import *
-class BasicTest(AssertMixin):
- # A simple import of the database/ module should work on all systems.
- def test_import(self):
- # we got this far, right?
- return True
-
class DomainReflectionTest(AssertMixin):
"Test Firebird domains"
- @testing.supported('firebird')
+ __only_on__ = 'firebird'
+
def setUpAll(self):
con = testbase.db.connect()
try:
t time,
dt timestamp)''')
- @testing.supported('firebird')
def tearDownAll(self):
con = testbase.db.connect()
con.execute('DROP TABLE testtable')
con.execute('DROP DOMAIN rem_domain')
con.execute('DROP DOMAIN img_domain')
- @testing.supported('firebird')
def test_table_is_reflected(self):
metadata = MetaData(testbase.db)
table = Table('testtable', metadata, autoload=True)
self.assert_compile(func.foo(1, 2), "foo(:foo_1, :foo_2)")
self.assert_compile(func.current_time(), "CURRENT_TIME")
self.assert_compile(func.foo(), "foo")
-
+
m = MetaData()
t = Table('sometable', m, Column('col1', Integer), Column('col2', Integer))
self.assert_compile(select([func.max(t.c.col1)]), "SELECT max(sometable.col1) FROM sometable")
-
+
if __name__ == '__main__':
testbase.main()
class ReflectionTest(AssertMixin):
"""Extra reflection tests."""
+ __only_on__ = 'maxdb'
+
def _test_decimal(self, tabledef):
"""Checks a variety of FIXED usages.
except exceptions.DatabaseError:
pass
- @testing.supported('maxdb')
def test_decimal_fixed_serial(self):
tabledef = """
CREATE TABLE dectest (
"""
return self._test_decimal(tabledef)
- @testing.supported('maxdb')
def test_decimal_integer_serial(self):
tabledef = """
CREATE TABLE dectest (
"""
return self._test_decimal(tabledef)
- @testing.supported('maxdb')
def test_decimal_implicit_serial(self):
tabledef = """
CREATE TABLE dectest (
"""
return self._test_decimal(tabledef)
- @testing.supported('maxdb')
def test_decimal_smallint_serial(self):
tabledef = """
CREATE TABLE dectest (
"""
return self._test_decimal(tabledef)
- @testing.supported('maxdb')
def test_decimal_sa_types_1(self):
tabledef = Table('dectest', MetaData(),
Column('id', Integer, primary_key=True),
Column('i1', Integer))
return self._test_decimal(tabledef)
- @testing.supported('maxdb')
def test_decimal_sa_types_2(self):
tabledef = Table('dectest', MetaData(),
Column('id', Integer, primary_key=True),
Column('i1', Integer))
return self._test_decimal(tabledef)
- @testing.supported('maxdb')
def test_decimal_sa_types_3(self):
tabledef = Table('dectest', MetaData(),
Column('id', Integer, primary_key=True),
Column('i1', Integer))
return self._test_decimal(tabledef)
- @testing.supported('maxdb')
def test_assorted_type_aliases(self):
"""Ensures that aliased types are reflected properly."""
If any of these fail, that's good- the bug is fixed!
"""
-
- @testing.supported('maxdb')
+
+ __only_on__ = 'maxdb'
+
def test_dbapi_breaks_sequences(self):
con = testbase.db.connect().connection
finally:
cr.execute('DROP SEQUENCE busto')
- @testing.supported('maxdb')
def test_dbapi_breaks_mod_binds(self):
con = testbase.db.connect().connection
# OK
cr.execute('SELECT MOD(?, 2) FROM DUAL', [3])
- @testing.supported('maxdb')
def test_dbapi_breaks_close(self):
dialect = testbase.db.dialect
cargs, ckw = dialect.create_connect_args(testbase.db.url)
except dialect.dbapi.DatabaseError:
self.assert_(True)
- @testing.supported('maxdb')
def test_modulo_operator(self):
st = str(select([sql.column('col') % 5]).compile(testbase.db))
self.assertEquals(st, 'SELECT mod(col, ?) FROM DUAL')
class CompileTest(SQLCompileTest):
__dialect__ = mssql.MSSQLDialect()
-
+
def test_insert(self):
t = table('sometable', column('somecolumn'))
self.assert_compile(t.insert(), "INSERT INTO sometable (somecolumn) VALUES (:somecolumn)")
def test_count(self):
t = table('sometable', column('somecolumn'))
self.assert_compile(t.count(), "SELECT count(sometable.somecolumn) AS tbl_row_count FROM sometable")
-
+
def test_union(self):
- t1 = table('t1',
+ t1 = table('t1',
column('col1'),
column('col2'),
column('col3'),
column('col2'),
column('col3'),
column('col4'))
-
+
(s1, s2) = (
select([t1.c.col3.label('col3'), t1.c.col4.label('col4')], t1.c.col2.in_(["t1col2r1", "t1col2r2"])),
select([t2.c.col3.label('col3'), t2.c.col4.label('col4')], t2.c.col2.in_(["t2col2r2", "t2col2r3"]))
- )
+ )
u = union(s1, s2, order_by=['col3', 'col4'])
self.assert_compile(u, "SELECT t1.col3 AS col3, t1.col4 AS col4 FROM t1 WHERE t1.col2 IN (:t1_col2_1, :t1_col2_2) "\
"UNION SELECT t2.col3 AS col3, t2.col4 AS col4 FROM t2 WHERE t2.col2 IN (:t2_col2_1, :t2_col2_2) ORDER BY col3, col4")
m = MetaData()
t = Table('sometable', m, Column('col1', Integer), Column('col2', Integer))
self.assert_compile(select([func.max(t.c.col1)]), "SELECT max(sometable.col1) AS max_1 FROM sometable")
-
+
+class ReflectionTest(PersistTest):
+ __only_on__ = 'mssql'
+
+ def testidentity(self):
+ meta = MetaData(testbase.db)
+ table = Table(
+ 'identity_test', meta,
+ Column('col1', Integer, Sequence('fred', 2, 3), primary_key=True)
+ )
+ table.create()
+
+ meta2 = MetaData(testbase.db)
+ try:
+ table2 = Table('identity_test', meta2, autoload=True)
+ assert table2.c['col1'].sequence.start == 2
+ assert table2.c['col1'].sequence.increment == 3
+ finally:
+ table.drop()
+
+
+class QueryTest(PersistTest):
+ __only_on__ = 'mssql'
+
+ def test_fetchid_trigger(self):
+ meta = MetaData(testbase.db)
+ t1 = Table('t1', meta,
+ Column('id', Integer, Sequence('fred', 100, 1), primary_key=True),
+ Column('descr', String(200)))
+ t2 = Table('t2', meta,
+ Column('id', Integer, Sequence('fred', 200, 1), primary_key=True),
+ Column('descr', String(200)))
+ meta.create_all()
+ con = testbase.db.connect()
+ con.execute("""create trigger paj on t1 for insert as
+ insert into t2 (descr) select descr from inserted""")
+
+ try:
+ tr = con.begin()
+ r = con.execute(t2.insert(), descr='hello')
+ self.assert_(r.last_inserted_ids() == [200])
+ r = con.execute(t1.insert(), descr='hello')
+ self.assert_(r.last_inserted_ids() == [100])
+
+ finally:
+ tr.commit()
+ con.execute("""drop trigger paj""")
+ meta.drop_all()
+
+ def test_insertid_schema(self):
+ meta = MetaData(testbase.db)
+ con = testbase.db.connect()
+ con.execute('create schema paj')
+ tbl = Table('test', meta, Column('id', Integer, primary_key=True), schema='paj')
+ tbl.create()
+ try:
+ tbl.insert().execute({'id':1})
+ finally:
+ tbl.drop()
+ con.execute('drop schema paj')
+
+ def test_insertid_reserved(self):
+ meta = MetaData(testbase.db)
+ table = Table(
+ 'select', meta,
+ Column('col', Integer, primary_key=True)
+ )
+ table.create()
+
+ meta2 = MetaData(testbase.db)
+ try:
+ table.insert().execute(col=7)
+ finally:
+ table.drop()
+
+ def test_select_limit_nooffset(self):
+ metadata = MetaData(testbase.db)
+
+ users = Table('query_users', metadata,
+ Column('user_id', INT, primary_key = True),
+ Column('user_name', VARCHAR(20)),
+ )
+ addresses = Table('query_addresses', metadata,
+ Column('address_id', Integer, primary_key=True),
+ Column('user_id', Integer, ForeignKey('query_users.user_id')),
+ Column('address', String(30)))
+ metadata.create_all()
+
+ try:
+ try:
+ r = users.select(limit=3, offset=2,
+ order_by=[users.c.user_id]).execute().fetchall()
+ assert False # InvalidRequestError should have been raised
+ except exceptions.InvalidRequestError:
+ pass
+ finally:
+ metadata.drop_all()
+
+class GenerativeQueryTest(PersistTest):
+ __only_on__ = 'mssql'
+
+ def setUpAll(self):
+ global foo, metadata
+ metadata = MetaData(testbase.db)
+ foo = Table('foo', metadata,
+ Column('id', Integer, Sequence('foo_id_seq'),
+ primary_key=True),
+ Column('bar', Integer),
+ Column('range', Integer))
+
+ mapper(Foo, foo)
+ metadata.create_all()
+
+ sess = create_session(bind=testbase.db)
+ for i in range(100):
+ sess.save(Foo(bar=i, range=i%10))
+ sess.flush()
+
+ def tearDownAll(self):
+ metadata.drop_all()
+ clear_mappers()
+
+ def test_slice_mssql(self):
+ sess = create_session(bind=testbase.db)
+ query = sess.query(Foo)
+ orig = query.all()
+ assert list(query[:10]) == orig[:10]
+ assert list(query[:10]) == orig[:10]
+
+
if __name__ == "__main__":
testbase.main()
class TypesTest(AssertMixin):
"Test MySQL column types"
- @testing.supported('mysql')
+ __only_on__ = 'mysql'
+
def test_basic(self):
meta1 = MetaData(testbase.db)
table = Table(
finally:
meta1.drop_all()
- @testing.supported('mysql')
def test_numeric(self):
"Exercise type specification and options for numeric types."
raise
numeric_table.drop()
- @testing.supported('mysql')
@testing.exclude('mysql', '<', (4, 1, 1))
def test_charset(self):
"""Exercise CHARACTER SET and COLLATE-ish options on string types."""
raise
charset_table.drop()
- @testing.supported('mysql')
@testing.exclude('mysql', '<', (5, 0, 5))
def test_bit_50(self):
"""Exercise BIT types on 5.0+ (not valid for all engine types)"""
finally:
meta.drop_all()
- @testing.supported('mysql')
def test_boolean(self):
"""Test BOOL/TINYINT(1) compatability and reflection."""
finally:
meta.drop_all()
- @testing.supported('mysql')
@testing.exclude('mysql', '<', (4, 1, 0))
def test_timestamp(self):
"""Exercise funky TIMESTAMP default syntax."""
finally:
meta.drop_all()
- @testing.supported('mysql')
def test_year(self):
"""Exercise YEAR."""
meta.drop_all()
- @testing.supported('mysql')
def test_set(self):
"""Exercise the SET type."""
finally:
meta.drop_all()
- @testing.supported('mysql')
def test_enum(self):
"""Exercise the ENUM type."""
self.assert_eq(res, expected)
enum_table.drop()
- @testing.supported('mysql')
@testing.exclude('mysql', '>', (3))
def test_enum_parse(self):
"""More exercises for the ENUM type."""
finally:
enum_table.drop()
- @testing.supported('mysql')
@testing.exclude('mysql', '<', (5, 0, 0))
def test_type_reflection(self):
# (ask_for, roundtripped_as_if_different)
finally:
m.drop_all()
- @testing.supported('mysql')
def test_autoincrement(self):
meta = MetaData(testbase.db)
try:
class SQLTest(SQLCompileTest):
"""Tests MySQL-dialect specific compilation."""
- __dialect__ = testbase.db.dialect
- @testing.supported('mysql')
+ __dialect__ = mysql.dialect()
+
def test_precolumns(self):
- dialect = testbase.db.dialect
+ dialect = self.__dialect__
def gen(distinct=None, prefixes=None):
kw = {}
gen(True, ['high_priority', sql.text('sql_cache')]),
'SELECT high_priority sql_cache DISTINCT q')
- @testing.supported('mysql')
def test_limit(self):
t = sql.table('t', sql.column('col1'), sql.column('col2'))
"SELECT t.col1, t.col2 FROM t LIMIT 10, 18446744073709551615"
)
- @testing.supported('mysql')
def test_update_limit(self):
t = sql.table('t', sql.column('col1'), sql.column('col2'))
"UPDATE t SET col1=%s WHERE t.col2 = %s LIMIT 1"
)
- @testing.supported('mysql')
def test_cast(self):
t = sql.table('t', sql.column('col'))
m = mysql
(m.MSEnum, "t.col"),
(m.MSEnum("'1'", "'2'"), "t.col"),
- (m.MSSet, "t.col"),
+ (m.MSSet, "t.col"),
(m.MSSet("'1'", "'2'"), "t.col"),
]
from sqlalchemy import *
from sqlalchemy.sql import table, column
from sqlalchemy.databases import oracle
-
from testlib import *
class OutParamTest(AssertMixin):
- @testing.supported('oracle')
+ __only_on__ = 'oracle'
+
def setUpAll(self):
testbase.db.execute("""
create or replace procedure foo(x_in IN number, x_out OUT number, y_out OUT number) IS
end;
""")
- @testing.supported('oracle')
def test_out_params(self):
result = testbase.db.execute(text("begin foo(:x, :y, :z); end;", bindparams=[bindparam('x', Numeric), outparam('y', Numeric), outparam('z', Numeric)]), x=5)
assert result.out_parameters == {'y':10, 'z':75}, result.out_parameters
print result.out_parameters
- @testing.supported('oracle')
def tearDownAll(self):
testbase.db.execute("DROP PROCEDURE foo")
class CompileTest(SQLCompileTest):
__dialect__ = oracle.OracleDialect()
-
+
def test_subquery(self):
t = table('sometable', column('col1'), column('col2'))
s = select([t])
s = select([s.c.col1, s.c.col2])
-
+
self.assert_compile(s, "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2 FROM sometable)")
def test_limit(self):
t = table('sometable', column('col1'), column('col2'))
-
+
s = select([t]).limit(10).offset(20)
-
+
self.assert_compile(s, "SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, sometable.col2 AS col2, "
"ROW_NUMBER() OVER (ORDER BY sometable.rowid) AS ora_rn FROM sometable) WHERE ora_rn>20 AND ora_rn<=30"
)
-
+
s = select([s.c.col1, s.c.col2])
-
+
self.assert_compile(s, "SELECT col1, col2 FROM (SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, "
"sometable.col2 AS col2, ROW_NUMBER() OVER (ORDER BY sometable.rowid) AS ora_rn FROM sometable) WHERE ora_rn>20 AND ora_rn<=30)")
- # testing this twice to ensure oracle doesn't modify the original statement
+ # testing this twice to ensure oracle doesn't modify the original statement
self.assert_compile(s, "SELECT col1, col2 FROM (SELECT col1, col2 FROM (SELECT sometable.col1 AS col1, "
"sometable.col2 AS col2, ROW_NUMBER() OVER (ORDER BY sometable.rowid) AS ora_rn FROM sometable) WHERE ora_rn>20 AND ora_rn<=30)")
"sometable.col2 AS col2, ROW_NUMBER() OVER (ORDER BY sometable.col2) AS ora_rn FROM sometable ORDER BY sometable.col2) WHERE ora_rn>20 AND ora_rn<=30")
def test_outer_join(self):
- table1 = table('mytable',
+ table1 = table('mytable',
column('myid', Integer),
column('name', String),
column('description', String),
)
table2 = table(
- 'myothertable',
+ 'myothertable',
column('otherid', Integer),
column('othername', String),
)
table3 = table(
- 'thirdtable',
+ 'thirdtable',
column('userid', Integer),
column('otherstuff', String),
)
),
from_obj = [ outerjoin(table1, table2, table1.c.myid == table2.c.otherid) ]
)
- self.assert_compile(query,
+ self.assert_compile(query,
"SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername \
FROM mytable, myothertable WHERE mytable.myid = myothertable.otherid(+) AND \
(mytable.name = :mytable_name_1 OR mytable.myid = :mytable_myid_1 OR \
query = table1.outerjoin(table2, table1.c.myid==table2.c.otherid).outerjoin(table3, table3.c.userid==table2.c.otherid)
self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable LEFT OUTER JOIN myothertable ON mytable.myid = myothertable.otherid LEFT OUTER JOIN thirdtable ON thirdtable.userid = myothertable.otherid")
- self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable, myothertable, thirdtable WHERE mytable.myid = myothertable.otherid(+) AND thirdtable.userid(+) = myothertable.otherid", dialect=oracle.dialect(use_ansi=False))
+ self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable, myothertable, thirdtable WHERE mytable.myid = myothertable.otherid(+) AND thirdtable.userid(+) = myothertable.otherid", dialect=oracle.dialect(use_ansi=False))
query = table1.join(table2, table1.c.myid==table2.c.otherid).join(table3, table3.c.userid==table2.c.otherid)
- self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable, myothertable, thirdtable WHERE mytable.myid = myothertable.otherid AND thirdtable.userid = myothertable.otherid", dialect=oracle.dialect(use_ansi=False))
-
+ self.assert_compile(query.select(), "SELECT mytable.myid, mytable.name, mytable.description, myothertable.otherid, myothertable.othername, thirdtable.userid, thirdtable.otherstuff FROM mytable, myothertable, thirdtable WHERE mytable.myid = myothertable.otherid AND thirdtable.userid = myothertable.otherid", dialect=oracle.dialect(use_ansi=False))
+
def test_alias_outer_join(self):
address_types = table('address_types',
column('id'),
column('email_address')
)
at_alias = address_types.alias()
-
+
s = select([at_alias, addresses]).\
select_from(addresses.outerjoin(at_alias, addresses.c.address_type_id==at_alias.c.id)).\
where(addresses.c.user_id==7).\
"address_types.rowid")
class TypesTest(SQLCompileTest):
+ __only_on__ = 'oracle'
+
def test_no_clobs_for_string_params(self):
"""test that simple string params get a DBAPI type of VARCHAR, not CLOB.
- this is to prevent setinputsizes from setting up cx_oracle.CLOBs on
+ this is to prevent setinputsizes from setting up cx_oracle.CLOBs on
string-based bind params [ticket:793]."""
-
+
class FakeDBAPI(object):
def __getattr__(self, attr):
return attr
dialect = oracle.OracleDialect()
dbapi = FakeDBAPI()
-
+
b = bindparam("foo", "hello world!")
assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING'
b = bindparam("foo", u"hello world!")
assert b.type.dialect_impl(dialect).get_dbapi_type(dbapi) == 'STRING'
-
- @testing.supported('oracle')
+
def test_longstring(self):
metadata = MetaData(testbase.db)
testbase.db.execute("""
(
ID NUMERIC(22) PRIMARY KEY,
ADD_USER VARCHAR2(20) NOT NULL
- )
+ )
""")
try:
t = Table("z_test", metadata, autoload=True)
assert t.select().execute().fetchall() == [(1, 'foobar')]
finally:
testbase.db.execute("DROP TABLE Z_TEST")
-
+
class SequenceTest(SQLCompileTest):
def test_basic(self):
seq = Sequence("my_seq_no_schema")
seq = Sequence("My_Seq", schema="Some_Schema")
assert dialect.identifier_preparer.format_sequence(seq) == '"Some_Schema"."My_Seq"'
-
-
+
+
if __name__ == '__main__':
testbase.main()
class CompileTest(SQLCompileTest):
def test_update_returning(self):
dialect = postgres.dialect()
- table1 = table('mytable',
+ table1 = table('mytable',
column('myid', Integer),
column('name', String),
column('description', String),
)
-
+
u = update(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING mytable.myid, mytable.name", dialect=dialect)
-
+
u = update(table1, values=dict(name='foo'), postgres_returning=[table1])
self.assert_compile(u, "UPDATE mytable SET name=%(name)s "\
"RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect)
-
+
u = update(table1, values=dict(name='foo'), postgres_returning=[func.length(table1.c.name)])
self.assert_compile(u, "UPDATE mytable SET name=%(name)s RETURNING length(mytable.name)", dialect=dialect)
-
+
def test_insert_returning(self):
dialect = postgres.dialect()
- table1 = table('mytable',
+ table1 = table('mytable',
column('myid', Integer),
column('name', String),
column('description', String),
)
-
+
i = insert(table1, values=dict(name='foo'), postgres_returning=[table1.c.myid, table1.c.name])
self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING mytable.myid, mytable.name", dialect=dialect)
-
+
i = insert(table1, values=dict(name='foo'), postgres_returning=[table1])
self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) "\
"RETURNING mytable.myid, mytable.name, mytable.description", dialect=dialect)
-
+
i = insert(table1, values=dict(name='foo'), postgres_returning=[func.length(table1.c.name)])
self.assert_compile(i, "INSERT INTO mytable (name) VALUES (%(name)s) RETURNING length(mytable.name)", dialect=dialect)
class ReturningTest(AssertMixin):
- @testing.supported('postgres')
+ __only_on__ = 'postgres'
+
@testing.exclude('postgres', '<', (8, 2))
def test_update_returning(self):
meta = MetaData(testbase.db)
- table = Table('tables', meta,
+ table = Table('tables', meta,
Column('id', Integer, primary_key=True),
Column('persons', Integer),
Column('full', Boolean)
table.create()
try:
table.insert().execute([{'persons': 5, 'full': False}, {'persons': 3, 'full': False}])
-
+
result = table.update(table.c.persons > 4, dict(full=True), postgres_returning=[table.c.id]).execute()
self.assertEqual(result.fetchall(), [(1,)])
-
+
result2 = select([table.c.id, table.c.full]).order_by(table.c.id).execute()
self.assertEqual(result2.fetchall(), [(1,True),(2,False)])
finally:
table.drop()
- @testing.supported('postgres')
@testing.exclude('postgres', '<', (8, 2))
def test_insert_returning(self):
meta = MetaData(testbase.db)
- table = Table('tables', meta,
+ table = Table('tables', meta,
Column('id', Integer, primary_key=True),
Column('persons', Integer),
Column('full', Boolean)
table.create()
try:
result = table.insert(postgres_returning=[table.c.id]).execute({'persons': 1, 'full': False})
-
+
self.assertEqual(result.fetchall(), [(1,)])
-
+
# Multiple inserts only return the last row
result2 = table.insert(postgres_returning=[table]).execute(
[{'persons': 2, 'full': False}, {'persons': 3, 'full': True}])
-
+
self.assertEqual(result2.fetchall(), [(3,3,True)])
-
+
result3 = table.insert(postgres_returning=[(table.c.id*2).label('double_id')]).execute({'persons': 4, 'full': False})
self.assertEqual([dict(row) for row in result3], [{'double_id':8}])
-
+
result4 = testbase.db.execute('insert into tables (id, persons, "full") values (5, 10, true) returning persons')
self.assertEqual([dict(row) for row in result4], [{'persons': 10}])
finally:
table.drop()
-
-
+
+
class InsertTest(AssertMixin):
- @testing.supported('postgres')
+ __only_on__ = 'postgres'
+
def setUpAll(self):
global metadata
metadata = MetaData(testbase.db)
-
- @testing.supported('postgres')
+
def tearDown(self):
metadata.drop_all()
metadata.tables.clear()
-
- @testing.supported('postgres')
+
def test_compiled_insert(self):
- table = Table('testtable', metadata,
+ table = Table('testtable', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30)))
-
+
metadata.create_all()
ins = table.insert(values={'data':bindparam('x')}).compile()
ins.execute({'x':"five"}, {'x':"seven"})
assert table.select().execute().fetchall() == [(1, 'five'), (2, 'seven')]
-
- @testing.supported('postgres')
+
def test_sequence_insert(self):
- table = Table('testtable', metadata,
+ table = Table('testtable', metadata,
Column('id', Integer, Sequence('my_seq'), primary_key=True),
Column('data', String(30)))
metadata.create_all()
self._assert_data_with_sequence(table, "my_seq")
- @testing.supported('postgres')
def test_opt_sequence_insert(self):
- table = Table('testtable', metadata,
+ table = Table('testtable', metadata,
Column('id', Integer, Sequence('my_seq', optional=True), primary_key=True),
Column('data', String(30)))
metadata.create_all()
self._assert_data_autoincrement(table)
- @testing.supported('postgres')
def test_autoincrement_insert(self):
- table = Table('testtable', metadata,
+ table = Table('testtable', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30)))
metadata.create_all()
self._assert_data_autoincrement(table)
- @testing.supported('postgres')
def test_noautoincrement_insert(self):
- table = Table('testtable', metadata,
+ table = Table('testtable', metadata,
Column('id', Integer, primary_key=True, autoincrement=False),
Column('data', String(30)))
metadata.create_all()
self._assert_data_noautoincrement(table)
-
+
def _assert_data_autoincrement(self, table):
def go():
# execute with explicit id
r = table.insert().execute({'id':30, 'data':'d1'})
assert r.last_inserted_ids() == [30]
-
+
# execute with prefetch id
r = table.insert().execute({'data':'d2'})
assert r.last_inserted_ids() == [1]
-
+
# executemany with explicit ids
table.insert().execute({'id':31, 'data':'d3'}, {'id':32, 'data':'d4'})
-
+
# executemany, uses SERIAL
table.insert().execute({'data':'d5'}, {'data':'d6'})
-
+
# single execute, explicit id, inline
table.insert(inline=True).execute({'id':33, 'data':'d7'})
-
+
# single execute, inline, uses SERIAL
table.insert(inline=True).execute({'data':'d8'})
-
+
# note that the test framework doesnt capture the "preexecute" of a seqeuence
# or default. we just see it in the bind params.
-
+
self.assert_sql(testbase.db, go, [], with_sequences=[
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
[{'data':'d8'}]
),
])
-
+
assert table.select().execute().fetchall() == [
(30, 'd1'),
(1, 'd2'),
]
table.delete().execute()
- # test the same series of events using a reflected
+ # test the same series of events using a reflected
# version of the table
m2 = MetaData(testbase.db)
table = Table(table.name, m2, autoload=True)
table.insert().execute({'data':'d5'}, {'data':'d6'})
table.insert(inline=True).execute({'id':33, 'data':'d7'})
table.insert(inline=True).execute({'data':'d8'})
-
+
self.assert_sql(testbase.db, go, [], with_sequences=[
(
"INSERT INTO testtable (id, data) VALUES (:id, :data)",
[{'data':'d8'}]
),
])
-
+
assert table.select().execute().fetchall() == [
(30, 'd1'),
(5, 'd2'),
(8, 'd8'),
]
table.delete().execute()
-
+
def _assert_data_with_sequence(self, table, seqname):
def go():
table.insert().execute({'id':30, 'data':'d1'})
(33, 'd7'),
(4, 'd8'),
]
-
- # cant test reflection here since the Sequence must be
+
+ # cant test reflection here since the Sequence must be
# explicitly specified
-
+
def _assert_data_noautoincrement(self, table):
table.insert().execute({'id':30, 'data':'d1'})
try:
assert False
except exceptions.IntegrityError, e:
assert "violates not-null constraint" in str(e)
-
+
table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'})
table.insert(inline=True).execute({'id':33, 'data':'d4'})
-
+
assert table.select().execute().fetchall() == [
(30, 'd1'),
(31, 'd2'),
]
table.delete().execute()
- # test the same series of events using a reflected
+ # test the same series of events using a reflected
# version of the table
m2 = MetaData(testbase.db)
table = Table(table.name, m2, autoload=True)
assert False
except exceptions.IntegrityError, e:
assert "violates not-null constraint" in str(e)
-
+
table.insert().execute({'id':31, 'data':'d2'}, {'id':32, 'data':'d3'})
table.insert(inline=True).execute({'id':33, 'data':'d4'})
-
+
assert table.select().execute().fetchall() == [
(30, 'd1'),
(31, 'd2'),
(32, 'd3'),
(33, 'd4'),
]
-
+
class DomainReflectionTest(AssertMixin):
"Test PostgreSQL domains"
- @testing.supported('postgres')
+ __only_on__ = 'postgres'
+
def setUpAll(self):
con = testbase.db.connect()
try:
con.execute('CREATE TABLE alt_schema.testtable(question integer, answer alt_schema.testdomain, anything integer)')
con.execute('CREATE TABLE crosschema (question integer, answer alt_schema.testdomain)')
- @testing.supported('postgres')
def tearDownAll(self):
con = testbase.db.connect()
con.execute('DROP TABLE testtable')
con.execute('DROP DOMAIN testdomain')
con.execute('DROP DOMAIN alt_schema.testdomain')
- @testing.supported('postgres')
def test_table_is_reflected(self):
metadata = MetaData(testbase.db)
table = Table('testtable', metadata, autoload=True)
self.assertEquals(set(table.columns.keys()), set(['question', 'answer']), "Columns of reflected table didn't equal expected columns")
self.assertEquals(table.c.answer.type.__class__, postgres.PGInteger)
-
- @testing.supported('postgres')
+
def test_domain_is_reflected(self):
metadata = MetaData(testbase.db)
table = Table('testtable', metadata, autoload=True)
self.assertEquals(str(table.columns.answer.default.arg), '42', "Reflected default value didn't equal expected value")
self.assertFalse(table.columns.answer.nullable, "Expected reflected column to not be nullable.")
- @testing.supported('postgres')
def test_table_is_reflected_alt_schema(self):
metadata = MetaData(testbase.db)
table = Table('testtable', metadata, autoload=True, schema='alt_schema')
self.assertEquals(set(table.columns.keys()), set(['question', 'answer', 'anything']), "Columns of reflected table didn't equal expected columns")
self.assertEquals(table.c.anything.type.__class__, postgres.PGInteger)
- @testing.supported('postgres')
def test_schema_domain_is_reflected(self):
metadata = MetaData(testbase.db)
table = Table('testtable', metadata, autoload=True, schema='alt_schema')
self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value")
self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
- @testing.supported('postgres')
def test_crosschema_domain_is_reflected(self):
metadata = MetaData(testbase.db)
table = Table('crosschema', metadata, autoload=True)
self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
class MiscTest(AssertMixin):
- @testing.supported('postgres')
+ __only_on__ = 'postgres'
+
def test_date_reflection(self):
m1 = MetaData(testbase.db)
- t1 = Table('pgdate', m1,
+ t1 = Table('pgdate', m1,
Column('date1', DateTime(timezone=True)),
Column('date2', DateTime(timezone=False))
)
finally:
m1.drop_all()
- @testing.supported('postgres')
def test_pg_weirdchar_reflection(self):
meta1 = MetaData(testbase.db)
subject = Table("subject", meta1,
self.assert_((subject.c['id$']==referer.c.ref).compare(subject.join(referer).onclause))
finally:
meta1.drop_all()
-
- @testing.supported('postgres')
+
def test_checksfor_sequence(self):
meta1 = MetaData(testbase.db)
- t = Table('mytable', meta1,
+ t = Table('mytable', meta1,
Column('col1', Integer, Sequence('fooseq')))
try:
testbase.db.execute("CREATE SEQUENCE fooseq")
finally:
t.drop(checkfirst=True)
- @testing.supported('postgres')
def test_distinct_on(self):
t = Table('mytable', MetaData(testbase.db),
Column('id', Integer, primary_key=True),
'SELECT DISTINCT ON (mytable.id, mytable.a) mytable.id, mytable.a \n'
'FROM mytable')
- @testing.supported('postgres')
def test_schema_reflection(self):
"""note: this test requires that the 'alt_schema' schema be separate and accessible by the test user"""
finally:
meta1.drop_all()
- @testing.supported('postgres')
def test_schema_reflection_2(self):
meta1 = MetaData(testbase.db)
subject = Table("subject", meta1,
self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause))
finally:
meta1.drop_all()
-
- @testing.supported('postgres')
+
def test_schema_reflection_3(self):
meta1 = MetaData(testbase.db)
subject = Table("subject", meta1,
self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause))
finally:
meta1.drop_all()
-
- @testing.supported('postgres')
+
def test_preexecute_passivedefault(self):
- """test that when we get a primary key column back
+ """test that when we get a primary key column back
from reflecting a table which has a default value on it, we pre-execute
that PassiveDefault upon insert."""
-
+
try:
meta = MetaData(testbase.db)
testbase.db.execute("""
finally:
testbase.db.execute("drop table speedy_users", None)
- @testing.supported('postgres')
def test_create_partial_index(self):
tbl = Table('testtbl', MetaData(), Column('data',Integer))
idx = Index('test_idx1', tbl.c.data, postgres_where=and_(tbl.c.data > 5, tbl.c.data < 10))
-
+
executed_sql = []
mock_strategy = MockEngineStrategy()
mock_conn = mock_strategy.create('postgres://', executed_sql.append)
-
+
idx.create(mock_conn)
-
+
assert executed_sql == ['CREATE INDEX test_idx1 ON testtbl (data) WHERE testtbl.data > 5 AND testtbl.data < 10']
class TimezoneTest(AssertMixin):
- """test timezone-aware datetimes. psycopg will return a datetime with a tzinfo attached to it,
- if postgres returns it. python then will not let you compare a datetime with a tzinfo to a datetime
- that doesnt have one. this test illustrates two ways to have datetime types with and without timezone
- info. """
- @testing.supported('postgres')
+ """Test timezone-aware datetimes.
+
+ psycopg will return a datetime with a tzinfo attached to it, if postgres
+ returns it. python then will not let you compare a datetime with a tzinfo
+ to a datetime that doesnt have one. this test illustrates two ways to
+ have datetime types with and without timezone info.
+ """
+
+ __only_on__ = 'postgres'
+
def setUpAll(self):
global tztable, notztable, metadata
metadata = MetaData(testbase.db)
Column("name", String(20)),
)
metadata.create_all()
- @testing.supported('postgres')
def tearDownAll(self):
metadata.drop_all()
- @testing.supported('postgres')
def test_with_timezone(self):
# get a date with a tzinfo
somedate = testbase.db.connect().scalar(func.current_timestamp().select())
c = tztable.update(tztable.c.id==1).execute(name='newname')
print tztable.select(tztable.c.id==1).execute().fetchone()
- @testing.supported('postgres')
def test_without_timezone(self):
# get a date without a tzinfo
somedate = datetime.datetime(2005, 10,20, 11, 52, 00)
print notztable.select(tztable.c.id==1).execute().fetchone()
class ArrayTest(AssertMixin):
- @testing.supported('postgres')
+ __only_on__ = 'postgres'
+
def setUpAll(self):
global metadata, arrtable
metadata = MetaData(testbase.db)
-
+
arrtable = Table('arrtable', metadata,
Column('id', Integer, primary_key=True),
Column('intarr', postgres.PGArray(Integer)),
Column('strarr', postgres.PGArray(String), nullable=False)
)
metadata.create_all()
- @testing.supported('postgres')
def tearDownAll(self):
metadata.drop_all()
-
- @testing.supported('postgres')
+
def test_reflect_array_column(self):
metadata2 = MetaData(testbase.db)
tbl = Table('arrtable', metadata2, autoload=True)
self.assertTrue(isinstance(tbl.c.strarr.type, postgres.PGArray))
self.assertTrue(isinstance(tbl.c.intarr.type.item_type, Integer))
self.assertTrue(isinstance(tbl.c.strarr.type.item_type, String))
-
- @testing.supported('postgres')
+
def test_insert_array(self):
arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def'])
results = arrtable.select().execute().fetchall()
self.assertEquals(results[0]['strarr'], ['abc','def'])
arrtable.delete().execute()
- @testing.supported('postgres')
def test_array_where(self):
arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def'])
arrtable.insert().execute(intarr=[4,5,6], strarr='ABC')
self.assertEquals(len(results), 1)
self.assertEquals(results[0]['intarr'], [1,2,3])
arrtable.delete().execute()
-
- @testing.supported('postgres')
+
def test_array_concat(self):
arrtable.insert().execute(intarr=[1,2,3], strarr=['abc', 'def'])
results = select([arrtable.c.intarr + [4,5,6]]).execute().fetchall()
class TestTypes(AssertMixin):
- @testing.supported('sqlite')
+ __only_on__ = 'sqlite'
+
def test_date(self):
meta = MetaData(testbase.db)
t = Table('testdate', meta,
Column('id', Integer, primary_key=True),
- Column('adate', Date),
+ Column('adate', Date),
Column('adatetime', DateTime))
meta.create_all()
try:
d2 = datetime.datetime(2007, 10, 30)
t.insert().execute(adate=str(d1), adatetime=str(d2))
-
+
self.assert_(t.select().execute().fetchall()[0] ==
(1, datetime.date(2007, 10, 30),
datetime.datetime(2007, 10, 30)))
-
+
finally:
meta.drop_all()
class DialectTest(AssertMixin):
- @testing.supported('sqlite')
+ __only_on__ = 'sqlite'
+
def test_extra_reserved_words(self):
"""Tests reserved words in identifiers.
meta = MetaData(testbase.db)
t = Table('reserved', meta,
Column('safe', Integer),
- Column('true', Integer),
+ Column('true', Integer),
Column('false', Integer),
Column('column', Integer))
finally:
meta.drop_all()
- @testing.supported('sqlite')
def test_quoted_identifiers(self):
"""Tests autoload of tables created with quoted column names."""
class InsertTest(AssertMixin):
"""Tests inserts and autoincrement."""
+ __only_on__ = 'sqlite'
+
# empty insert (i.e. INSERT INTO table DEFAULT VALUES)
# fails as recently as sqlite 3.3.6. passes on 3.4.1. this syntax
# is nowhere to be found in the sqlite3 documentation or changelog, so can't
finally:
table.drop()
- @testing.supported('sqlite')
@testing.exclude('sqlite', '<', (3, 4))
def test_empty_insert_pk1(self):
self._test_empty_insert(
Table('a', MetaData(testbase.db),
Column('id', Integer, primary_key=True)))
- @testing.supported('sqlite')
@testing.exclude('sqlite', '<', (3, 4))
def test_empty_insert_pk2(self):
self.assertRaises(
Column('x', Integer, primary_key=True),
Column('y', Integer, primary_key=True)))
- @testing.supported('sqlite')
@testing.exclude('sqlite', '<', (3, 4))
def test_empty_insert_pk3(self):
self.assertRaises(
Column('y', Integer, PassiveDefault('123'),
primary_key=True)))
- @testing.supported('sqlite')
@testing.exclude('sqlite', '<', (3, 4))
def test_empty_insert_pk4(self):
self._test_empty_insert(
Column('x', Integer, primary_key=True),
Column('y', Integer, PassiveDefault('123'))))
- @testing.supported('sqlite')
@testing.exclude('sqlite', '<', (3, 4))
def test_empty_insert_nopk1(self):
self._test_empty_insert(
Table('e', MetaData(testbase.db),
Column('id', Integer)))
-
- @testing.supported('sqlite')
+
@testing.exclude('sqlite', '<', (3, 4))
def test_empty_insert_nopk2(self):
self._test_empty_insert(
Column('x', Integer),
Column('y', Integer)))
- @testing.supported('sqlite')
def test_inserts_with_spaces(self):
tbl = Table('tbl', MetaData('sqlite:///'),
Column('with space', Integer),
finally:
tbl.drop()
-
+
+
if __name__ == "__main__":
testbase.main()
Column('user_name', VARCHAR(20)),
)
metadata.create_all()
-
+
def tearDown(self):
testbase.db.connect().execute(users.delete())
def tearDownAll(self):
metadata.drop_all()
-
- @testing.supported('sqlite', 'maxdb')
+
+ @testing.fails_on_everything_except('sqlite', 'maxdb')
def test_raw_qmark(self):
for conn in (testbase.db, testbase.db.connect()):
conn.execute("insert into users (user_id, user_name) values (?, ?)", (1,"jack"))
assert res.fetchall() == [(1, "jack"), (2, "fred"), (3, "ed"), (4, "horse"), (5, "barney"), (6, "donkey"), (7, 'sally')]
conn.execute("delete from users")
- @testing.supported('mysql', 'postgres')
+ @testing.fails_on_everything_except('mysql', 'postgres')
+ # some psycopg2 versions bomb this.
def test_raw_sprintf(self):
for conn in (testbase.db, testbase.db.connect()):
conn.execute("insert into users (user_id, user_name) values (%s, %s)", [1,"jack"])
# pyformat is supported for mysql, but skipping because a few driver
# versions have a bug that bombs out on this test. (1.2.2b3, 1.2.2c1, 1.2.2)
- @testing.supported('postgres')
+ @testing.unsupported('mysql')
+ @testing.fails_on_everything_except('postgres')
def test_raw_python(self):
for conn in (testbase.db, testbase.db.connect()):
conn.execute("insert into users (user_id, user_name) values (%(id)s, %(name)s)", {'id':1, 'name':'jack'})
assert res.fetchall() == [(1, "jack"), (2, "ed"), (3, "horse"), (4, 'sally')]
conn.execute("delete from users")
- @testing.supported('sqlite')
+ @testing.fails_on_everything_except('sqlite')
def test_raw_named(self):
for conn in (testbase.db, testbase.db.connect()):
conn.execute("insert into users (user_id, user_name) values (:id, :name)", {'id':1, 'name':'jack'})
assert True
if __name__ == "__main__":
- testbase.main()
+ testbase.main()
finally:
table.drop()
- @testing.supported('mssql')
- def testidentity(self):
- meta = MetaData(testbase.db)
- table = Table(
- 'identity_test', meta,
- Column('col1', Integer, Sequence('fred', 2, 3), primary_key=True)
- )
- table.create()
-
- meta2 = MetaData(testbase.db)
- try:
- table2 = Table('identity_test', meta2, autoload=True)
- assert table2.c['col1'].sequence.start == 2
- assert table2.c['col1'].sequence.increment == 3
- finally:
- table.drop()
-
@testing.unsupported('oracle')
def testreserved(self):
# check a table that uses an SQL reserved name doesn't cause an error
assert buf.index("CREATE TABLE someschema.table1") > -1
assert buf.index("CREATE TABLE someschema.table2") > -1
- @testing.supported('maxdb', 'mysql', 'postgres')
+ @testing.unsupported('sqlite', 'firebird')
+ # fixme: revisit these below.
+ @testing.fails_on('oracle', 'mssql', 'sybase', 'access')
def test_explicit_default_schema(self):
engine = testbase.db
- schema = engine.dialect.get_default_schema_name(engine)
if testing.against('mysql'):
schema = testbase.db.url.database
elif testing.against('postgres'):
schema = 'public'
+ else:
+ schema = engine.dialect.get_default_schema_name(engine)
- metadata = MetaData(testbase.db)
+ metadata = MetaData(engine)
table1 = Table('table1', metadata,
- Column('col1', Integer, primary_key=True),
- schema=schema)
+ Column('col1', Integer, primary_key=True),
+ schema=schema)
table2 = Table('table2', metadata,
- Column('col1', Integer, primary_key=True),
- Column('col2', Integer, ForeignKey('%s.table1.col1' % schema)),
- schema=schema)
- metadata.create_all()
- metadata.create_all(checkfirst=True)
- metadata.clear()
+ Column('col1', Integer, primary_key=True),
+ Column('col2', Integer,
+ ForeignKey('%s.table1.col1' % schema)),
+ schema=schema)
+ try:
+ metadata.create_all()
+ metadata.create_all(checkfirst=True)
+ metadata.clear()
- table1 = Table('table1', metadata, autoload=True, schema=schema)
- table2 = Table('table2', metadata, autoload=True, schema=schema)
- metadata.drop_all()
+ table1 = Table('table1', metadata, autoload=True, schema=schema)
+ table2 = Table('table2', metadata, autoload=True, schema=schema)
+ finally:
+ metadata.drop_all()
class HasSequenceTest(PersistTest):
Column('user_name', String(40)),
)
- @testing.supported('firebird', 'postgres', 'oracle')
+ @testing.unsupported('sqlite', 'mysql', 'mssql', 'access', 'sybase')
def test_hassequence(self):
metadata.create_all(bind=testbase.db)
self.assertEqual(testbase.db.dialect.has_sequence(testbase.db, 'user_id_seq'), True)
test_needs_acid=True,
)
users.create(testbase.db)
-
+
def tearDown(self):
testbase.db.connect().execute(users.delete())
def tearDownAll(self):
users.drop(testbase.db)
-
+
def testcommits(self):
connection = testbase.db.connect()
transaction = connection.begin()
result = connection.execute("select * from query_users")
assert len(result.fetchall()) == 3
transaction.commit()
-
+
def testrollback(self):
"""test a basic rollback"""
connection = testbase.db.connect()
connection.execute(users.insert(), user_id=2, user_name='user2')
connection.execute(users.insert(), user_id=3, user_name='user3')
transaction.rollback()
-
+
result = connection.execute("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
def testraise(self):
connection = testbase.db.connect()
-
+
transaction = connection.begin()
try:
connection.execute(users.insert(), user_id=1, user_name='user1')
except Exception , e:
print "Exception: ", e
transaction.rollback()
-
+
result = connection.execute("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
-
+
@testing.exclude('mysql', '<', (5, 0, 3))
def testnestedrollback(self):
connection = testbase.db.connect()
-
+
try:
transaction = connection.begin()
try:
assert str(e) == 'uh oh' # and not "This transaction is inactive"
finally:
connection.close()
-
+
@testing.exclude('mysql', '<', (5, 0, 3))
def testnesting(self):
assert len(result.fetchall()) == 0
connection.close()
-
- @testing.supported('postgres', 'mysql', 'oracle', 'maxdb')
+
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access')
@testing.exclude('mysql', '<', (5, 0, 3))
def testnestedsubtransactionrollback(self):
connection = testbase.db.connect()
trans2.rollback()
connection.execute(users.insert(), user_id=3, user_name='user3')
transaction.commit()
-
+
self.assertEquals(
connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),
[(1,),(3,)]
)
connection.close()
- @testing.supported('postgres', 'mysql', 'oracle', 'maxdb')
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access')
@testing.exclude('mysql', '<', (5, 0, 3))
def testnestedsubtransactioncommit(self):
connection = testbase.db.connect()
trans2.commit()
connection.execute(users.insert(), user_id=3, user_name='user3')
transaction.commit()
-
+
self.assertEquals(
connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),
[(1,),(2,),(3,)]
)
connection.close()
- @testing.supported('postgres', 'mysql', 'oracle', 'maxdb')
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access')
@testing.exclude('mysql', '<', (5, 0, 3))
def testrollbacktosubtransaction(self):
connection = testbase.db.connect()
trans3.rollback()
connection.execute(users.insert(), user_id=4, user_name='user4')
transaction.commit()
-
+
self.assertEquals(
connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),
[(1,),(4,)]
)
connection.close()
-
- @testing.supported('postgres', 'mysql')
+
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
@testing.exclude('mysql', '<', (5, 0, 3))
def testtwophasetransaction(self):
connection = testbase.db.connect()
-
+
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=1, user_name='user1')
transaction.prepare()
transaction.commit()
-
+
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=2, user_name='user2')
transaction.commit()
-
+
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=3, user_name='user3')
transaction.rollback()
-
+
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=4, user_name='user4')
transaction.prepare()
transaction.rollback()
-
+
self.assertEquals(
connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),
[(1,),(2,)]
)
connection.close()
- @testing.supported('postgres', 'mysql')
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
@testing.exclude('mysql', '<', (5, 0, 3))
def testmixedtwophasetransaction(self):
connection = testbase.db.connect()
-
+
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=1, user_name='user1')
-
+
transaction2 = connection.begin()
connection.execute(users.insert(), user_id=2, user_name='user2')
-
+
transaction3 = connection.begin_nested()
connection.execute(users.insert(), user_id=3, user_name='user3')
-
+
transaction4 = connection.begin()
connection.execute(users.insert(), user_id=4, user_name='user4')
transaction4.commit()
-
+
transaction3.rollback()
-
+
connection.execute(users.insert(), user_id=5, user_name='user5')
-
+
transaction2.commit()
-
+
transaction.prepare()
-
+
transaction.commit()
-
+
self.assertEquals(
connection.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),
[(1,),(2,),(5,)]
)
connection.close()
-
- @testing.supported('postgres')
+
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
+ # fixme: see if this is still true and/or can be convert to fails_on()
+ @testing.unsupported('mysql')
def testtwophaserecover(self):
# MySQL recovery doesn't currently seem to work correctly
# Prepared transactions disappear when connections are closed and even
# when they aren't it doesn't seem possible to use the recovery id.
connection = testbase.db.connect()
-
+
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=1, user_name='user1')
transaction.prepare()
-
+
connection.close()
connection2 = testbase.db.connect()
-
+
self.assertEquals(
connection2.execute(select([users.c.user_id]).order_by(users.c.user_id)).fetchall(),
[]
)
-
+
recoverables = connection2.recover_twophase()
self.assertTrue(
transaction.xid in recoverables
)
-
+
connection2.commit_prepared(transaction.xid, recover=True)
self.assertEquals(
[(1,)]
)
connection2.close()
-
- @testing.supported('postgres', 'mysql')
+
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
@testing.exclude('mysql', '<', (5, 0, 3))
def testmultipletwophase(self):
conn = testbase.db.connect()
-
+
xa = conn.begin_twophase()
conn.execute(users.insert(), user_id=1, user_name='user1')
xa.prepare()
xa.commit()
-
+
xa = conn.begin_twophase()
conn.execute(users.insert(), user_id=2, user_name='user2')
xa.prepare()
xa.rollback()
-
+
xa = conn.begin_twophase()
conn.execute(users.insert(), user_id=3, user_name='user3')
xa.rollback()
-
+
xa = conn.begin_twophase()
conn.execute(users.insert(), user_id=4, user_name='user4')
xa.prepare()
xa.commit()
-
+
result = conn.execute(select([users.c.user_name]).order_by(users.c.user_id))
self.assertEqual(result.fetchall(), [('user1',),('user4',)])
-
+
conn.close()
-
+
class AutoRollbackTest(PersistTest):
def setUpAll(self):
global metadata
metadata = MetaData()
-
+
def tearDownAll(self):
metadata.drop_all(testbase.db)
-
+
@testing.unsupported('sqlite')
def testrollback_deadlock(self):
"""test that returning connections to the pool clears any object locks."""
users.create(conn1)
conn1.execute("select * from deadlock_users")
conn1.close()
- # without auto-rollback in the connection pool's return() logic, this deadlocks in Postgres,
- # because conn1 is returned to the pool but still has a lock on "deadlock_users"
+
+ # without auto-rollback in the connection pool's return() logic, this
+ # deadlocks in Postgres, because conn1 is returned to the pool but
+ # still has a lock on "deadlock_users".
# comment out the rollback in pool/ConnectionFairy._close() to see !
users.drop(conn2)
conn2.close()
def tearDownAll(self):
users.drop(tlengine)
tlengine.dispose()
-
+
def test_connection_close(self):
"""test that when connections are closed for real, transactions are rolled back and disposed."""
-
+
c = tlengine.contextual_connect()
c.begin()
assert tlengine.session.in_transaction()
assert len(result.fetchall()) == 0
finally:
external_connection.close()
-
+
def testrollback(self):
"""test a basic rollback"""
tlengine.begin()
def testcommits(self):
assert tlengine.connect().execute("select count(1) from query_users").scalar() == 0
-
+
connection = tlengine.contextual_connect()
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name='user1')
assert len(result.fetchall()) == 3
finally:
external_connection.close()
-
+
@testing.unsupported('sqlite')
@testing.exclude('mysql', '<', (5, 0, 3))
def testnesting(self):
@testing.exclude('mysql', '<', (5, 0, 3))
def testmixednesting(self):
- """tests nesting of transactions off the TLEngine directly inside of
+ """tests nesting of transactions off the TLEngine directly inside of
tranasctions off the connection from the TLEngine"""
external_connection = tlengine.connect()
self.assert_(external_connection.connection is not tlengine.contextual_connect().connection)
finally:
clear_mappers()
-
+
def testconnections(self):
"""tests that contextual_connect is threadlocal"""
c1 = tlengine.contextual_connect()
c2.close()
assert c1.connection.connection is not None
+
class ForUpdateTest(PersistTest):
def setUpAll(self):
global counters, metadata
con = testbase.db.connect()
sel = counters.select(for_update=update_style,
whereclause=counters.c.counter_id==1)
-
+
for i in xrange(count):
trans = con.begin()
try:
break
con.close()
- @testing.supported('mysql', 'oracle', 'postgres', 'maxdb')
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access')
+
def testqueued_update(self):
"""Test SELECT FOR UPDATE with concurrent modifications.
thread.join()
return errors
-
- @testing.supported('mysql', 'oracle', 'postgres', 'maxdb')
+
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access')
def testqueued_select(self):
"""Simple SELECT FOR UPDATE conflict test"""
sys.stderr.write("Failure: %s\n" % e)
self.assert_(len(errors) == 0)
- @testing.supported('oracle', 'postgres', 'maxdb')
+ @testing.unsupported('sqlite', 'mysql', 'mssql', 'firebird',
+ 'sybase', 'access')
def testnowait_select(self):
"""Simple SELECT FOR UPDATE NOWAIT conflict test"""
errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)],
update_style='nowait')
self.assert_(len(errors) != 0)
-
+
if __name__ == "__main__":
- testbase.main()
+ testbase.main()
assert list(query[-5:]) == orig[-5:]
assert query[10:20][5] == orig[10:20][5]
- @testing.supported('mssql')
- def test_slice_mssql(self):
- sess = create_session(bind=testbase.db)
- query = sess.query(Foo)
- orig = query.all()
- assert list(query[:10]) == orig[:10]
- assert list(query[:10]) == orig[:10]
-
def test_aggregate(self):
sess = create_session(bind=testbase.db)
query = sess.query(Foo)
assert query.filter(foo.c.bar<30).apply_max(foo.c.bar).first() == 29
assert query.filter(foo.c.bar<30).apply_max(foo.c.bar).one() == 29
- @testing.unsupported('mysql')
def test_aggregate_1(self):
- # this one fails in mysql as the result comes back as a string
+ if (testing.against('mysql') and
+ testbase.db.dialect.dbapi.version_info[:4] == (1, 2, 1, 'gamma')):
+ return
+
query = create_session(bind=testbase.db).query(Foo)
assert query.filter(foo.c.bar<30).sum(foo.c.bar) == 435
- @testing.unsupported('postgres', 'mysql', 'firebird', 'mssql')
+ @testing.fails_on('postgres', 'mysql', 'firebird', 'mssql')
def test_aggregate_2(self):
query = create_session(bind=testbase.db).query(Foo)
assert query.filter(foo.c.bar<30).avg(foo.c.bar) == 14.5
- @testing.supported('postgres', 'mysql', 'firebird', 'mssql')
+ @testing.fails_on_everything_except('sqlite', 'postgres', 'mysql',
+ 'firebird', 'mssql')
def test_aggregate_2_int(self):
query = create_session(bind=testbase.db).query(Foo)
assert int(query.filter(foo.c.bar<30).avg(foo.c.bar)) == 14
- @testing.unsupported('postgres', 'mysql', 'firebird', 'mssql')
+ @testing.fails_on('postgres', 'mysql', 'firebird', 'mssql')
def test_aggregate_3(self):
query = create_session(bind=testbase.db).query(Foo)
assert query.filter(foo.c.bar<30).apply_avg(foo.c.bar).first() == 14.5
})
session = create_session(bind=testbase.db)
query = session.query(tables.User)
- x = query.select_from([tables.users.outerjoin(tables.orders).outerjoin(tables.orderitems)]).\
+ x = query.select_from(tables.users.outerjoin(tables.orders).outerjoin(tables.orderitems)).\
filter(or_(tables.Order.c.order_id==None,tables.Item.c.item_id==2))
print x.compile()
self.assert_result(list(x), tables.User, *tables.user_result[1:3])
l = sess.query(Bar).select()
print l[0]
print l[0].foos
- self.assert_result(l, Bar,
+ self.assert_unordered_result(l, Bar,
# {'id':1, 'data':'barfoo', 'bid':1, 'foos':(Foo, [{'id':2,'data':'subfoo1'}, {'id':3,'data':'subfoo2'}])},
{'id':b.id, 'data':'barfoo', 'foos':(Foo, [{'id':f1.id,'data':'subfoo1'}, {'id':f2.id,'data':'subfoo2'}])},
)
b.foos.append(Foo("foo #1"))
b.foos.append(Foo("foo #2"))
sess.flush()
- compare = repr(b) + repr(b.foos)
+ compare = repr(b) + repr(sorted([repr(o) for o in b.foos]))
sess.clear()
l = sess.query(Bar).select()
print repr(l[0]) + repr(l[0].foos)
- self.assert_(repr(l[0]) + repr(l[0].foos) == compare)
+ found = repr(l[0]) + repr(sorted([repr(o) for o in l[0].foos]))
+ self.assertEqual(found, compare)
@testing.fails_on('maxdb')
def testadvanced(self):
Place.mapper = mapper(Place, place, properties = {
'thingies':relation(mapper(PlaceThingy, place_thingy), lazy=False)
})
-
+
Transition.mapper = mapper(Transition, transition, properties = dict(
inputs = relation(Place.mapper, place_output, lazy=False),
outputs = relation(Place.mapper, place_input, lazy=False),
sess.clear()
r = sess.query(Transition).select()
- self.assert_result(r, Transition,
- {'name':'transition1',
- 'inputs' : (Place, [{'name':'place1'}]),
- 'outputs' : (Place, [{'name':'place2'}, {'name':'place3'}])
- }
- )
+ self.assert_unordered_result(r, Transition,
+ {'name': 'transition1',
+ 'inputs': (Place, [{'name':'place1'}]),
+ 'outputs': (Place, [{'name':'place2'}, {'name':'place3'}])
+ })
def testbidirectional(self):
"""tests a many-to-many backrefs"""
from testlib import *
class RelationTest(PersistTest):
- """this is essentially an extension of the "dependency.py" topological sort test.
- in this test, a table is dependent on two other tables that are otherwise unrelated to each other.
- the dependency sort must insure that this childmost table is below both parent tables in the outcome
- (a bug existed where this was not always the case).
- while the straight topological sort tests should expose this, since the sorting can be different due
- to subtle differences in program execution, this test case was exposing the bug whereas the simpler tests
- were not."""
+ """An extended topological sort test
+
+ This is essentially an extension of the "dependency.py" topological sort
+ test. In this test, a table is dependent on two other tables that are
+ otherwise unrelated to each other. The dependency sort must insure that
+ this childmost table is below both parent tables in the outcome (a bug
+ existed where this was not always the case).
+
+ While the straight topological sort tests should expose this, since the
+ sorting can be different due to subtle differences in program execution,
+ this test case was exposing the bug whereas the simpler tests were not.
+ """
+
def setUpAll(self):
global metadata, tbl_a, tbl_b, tbl_c, tbl_d
d3 = D(); d3.name = "d3"; d3.b_row = b; d3.c_row = c
session.save_or_update(a)
session.save_or_update(b)
-
+
def tearDown(self):
conn = testbase.db.connect()
conn.drop(tbl_d)
def tearDownAll(self):
metadata.drop_all(testbase.db)
-
+
def testDeleteRootTable(self):
session.flush()
session.delete(a) # works as expected
session.flush()
-
+
def testDeleteMiddleTable(self):
session.flush()
session.delete(c) # fails
session.flush()
-
+
class RelationTest2(PersistTest):
- """this test tests a relationship on a column that is included in multiple foreign keys,
- as well as a self-referential relationship on a composite key where one column in the foreign key
- is 'joined to itself'."""
+ """Tests a relationship on a column included in multiple foreign keys.
+
+ This test tests a relationship on a column that is included in multiple
+ foreign keys, as well as a self-referential relationship on a composite
+ key where one column in the foreign key is 'joined to itself'.
+ """
+
def setUpAll(self):
global metadata, company_tbl, employee_tbl
metadata = MetaData(testbase.db)
-
+
company_tbl = Table('company', metadata,
Column('company_id', Integer, primary_key=True),
Column('name', Unicode(30)))
ForeignKeyConstraint(['company_id', 'reports_to_id'],
['employee.company_id', 'employee.emp_id']))
metadata.create_all()
-
+
def tearDownAll(self):
- metadata.drop_all()
+ metadata.drop_all()
def testexplicit(self):
"""test with mappers that have fairly explicit join conditions"""
self.company = company
self.emp_id = emp_id
self.reports_to = reports_to
-
+
mapper(Company, company_tbl)
mapper(Employee, employee_tbl, properties= {
'company':relation(Company, primaryjoin=employee_tbl.c.company_id==company_tbl.c.company_id, backref='employees'),
and_(
employee_tbl.c.emp_id==employee_tbl.c.reports_to_id,
employee_tbl.c.company_id==employee_tbl.c.company_id
- ),
+ ),
foreignkey=[employee_tbl.c.company_id, employee_tbl.c.emp_id],
backref='employees')
})
mapper(Company, company_tbl)
mapper(Employee, employee_tbl, properties= {
'company':relation(Company, backref='employees'),
- 'reports_to':relation(Employee,
+ 'reports_to':relation(Employee,
foreignkey=[employee_tbl.c.company_id, employee_tbl.c.emp_id],
backref='employees')
})
assert [x.name for x in test_e1.employees] == ['emp2', 'emp3']
assert sess.query(Employee).get([c1.company_id, 3]).reports_to.name == 'emp1'
assert sess.query(Employee).get([c2.company_id, 3]).reports_to.name == 'emp5'
-
+
class RelationTest3(PersistTest):
def setUpAll(self):
global jobs, pageversions, pages, metadata, Job, Page, PageVersion, PageComment
import datetime
- metadata = MetaData(testbase.db)
+ metadata = MetaData(testbase.db)
jobs = Table("jobs", metadata,
Column("jobno", Unicode(15), primary_key=True),
Column("created", DateTime, nullable=False, default=datetime.datetime.now),
def tearDownAll(self):
clear_mappers()
- metadata.drop_all()
+ metadata.drop_all()
def testbasic(self):
"""test the combination of complicated join conditions with post_update"""
"""test syncrules on foreign keys that are also primary"""
def define_tables(self, metadata):
global tableA, tableB
- tableA = Table("A", metadata,
+ tableA = Table("A", metadata,
Column("id",Integer,primary_key=True),
Column("foo",Integer,),
)
sess = create_session()
sess.save(a1)
sess.flush()
-
+
sess.delete(a1)
try:
sess.flush()
assert False
except exceptions.AssertionError, e:
assert str(e).startswith("Dependency rule tried to blank-out primary key column 'B.id' on instance ")
-
+
def test_no_nullPK_BtoA(self):
class A(object):pass
class B(object):pass
sess = create_session()
sess.save(b1)
try:
- # this raises an error as of r3695. in that rev, the attributes package was modified so that a
+ # this raises an error as of r3695. in that rev, the attributes package was modified so that a
# setting of "None" shows up as a change, which in turn fires off dependency.py and then triggers
# the rule.
sess.flush()
except exceptions.AssertionError, e:
assert str(e).startswith("Dependency rule tried to blank-out primary key column 'B.id' on instance ")
- @testing.supported('sqlite', 'mysql')
+ @testing.fails_on_everything_except('sqlite', 'mysql')
def test_nullPKsOK_BtoA(self):
# postgres cant handle a nullable PK column...?
- tableC = Table('tablec', tableA.metadata,
+ tableC = Table('tablec', tableA.metadata,
Column('id', Integer, primary_key=True),
Column('a_id', Integer, ForeignKey('A.id'), primary_key=True, autoincrement=False, nullable=True))
tableC.create()
-
+
class A(object):pass
class C(object):pass
mapper(C, tableC, properties={
sess.save(c1)
# test that no error is raised.
sess.flush()
-
+
def test_delete_cascade_BtoA(self):
- """test that the 'blank the PK' error doesnt get raised when the child is to be deleted as part of a
+ """test that the 'blank the PK' error doesnt get raised when the child is to be deleted as part of a
cascade"""
class A(object):pass
class B(object):pass
assert b1 not in sess
sess.clear()
clear_mappers()
-
+
def test_delete_cascade_AtoB(self):
- """test that the 'blank the PK' error doesnt get raised when the child is to be deleted as part of a
+ """test that the 'blank the PK' error doesnt get raised when the child is to be deleted as part of a
cascade"""
class A(object):pass
class B(object):pass
sess = create_session()
sess.save(a1)
sess.flush()
-
+
sess.delete(a1)
sess.flush()
assert a1 not in sess
assert b1 not in sess
sess.clear()
clear_mappers()
-
+
def test_delete_manual_AtoB(self):
class A(object):pass
class B(object):pass
sess.save(a1)
sess.save(b1)
sess.flush()
-
+
sess.delete(a1)
sess.delete(b1)
sess.flush()
assert b1 not in sess
class RelationTest5(ORMTest):
- """test a map to a select that relates to a map to the table"""
+ """Test a map to a select that relates to a map to the table."""
+
def define_tables(self, metadata):
global items
items = Table('items', metadata,
def test_basic(self):
class Container(object):pass
class LineItem(object):pass
-
+
container_select = select(
[items.c.policyNum, items.c.policyEffDate, items.c.type],
- distinct=True,
+ distinct=True,
).alias('container_select')
mapper(LineItem, items)
assert len(newcon.lineItems) == 10
for old, new in zip(con.lineItems, newcon.lineItems):
assert old.id == new.id
-
-
+
+
class TypeMatchTest(ORMTest):
"""test errors raised when trying to add items whose type is not handled by a relation"""
def define_tables(self, metadata):
global a, b, c, d
- a = Table("a", metadata,
+ a = Table("a", metadata,
Column('aid', Integer, primary_key=True),
Column('data', String(30)))
- b = Table("b", metadata,
+ b = Table("b", metadata,
Column('bid', Integer, primary_key=True),
Column("a_id", Integer, ForeignKey("a.aid")),
Column('data', String(30)))
- c = Table("c", metadata,
+ c = Table("c", metadata,
Column('cid', Integer, primary_key=True),
Column("b_id", Integer, ForeignKey("b.bid")),
Column('data', String(30)))
- d = Table("d", metadata,
+ d = Table("d", metadata,
Column('did', Integer, primary_key=True),
Column("a_id", Integer, ForeignKey("a.aid")),
Column('data', String(30)))
mapper(A, a, properties={'bs':relation(B)})
mapper(B, b)
mapper(C, c)
-
+
a1 = A()
b1 = B()
c1 = C()
mapper(A, a, properties={'bs':relation(B, cascade="none")})
mapper(B, b)
mapper(C, c)
-
+
a1 = A()
b1 = B()
c1 = C()
mapper(A, a, properties={'bs':relation(B, cascade="none")})
mapper(B, b)
mapper(C, c, inherits=B)
-
+
a1 = A()
b1 = B()
c1 = C()
assert False
except exceptions.FlushError, err:
assert str(err).startswith("Attempting to flush an item of type %s on collection 'A.bs (B)', which is handled by mapper 'Mapper|B|b' and does not load items of that type. Did you mean to use a polymorphic mapper for this relationship ?" % C)
-
+
def test_m2o_nopoly_onflush(self):
class A(object):pass
class B(A):pass
class TypedAssociationTable(ORMTest):
def define_tables(self, metadata):
global t1, t2, t3
-
+
class MySpecialType(types.TypeDecorator):
impl = String
def convert_bind_param(self, value, dialect):
return "lala" + value
def convert_result_value(self, value, dialect):
return value[4:]
-
- t1 = Table('t1', metadata,
+
+ t1 = Table('t1', metadata,
Column('col1', MySpecialType(30), primary_key=True),
Column('col2', String(30)))
- t2 = Table('t2', metadata,
+ t2 = Table('t2', metadata,
Column('col1', MySpecialType(30), primary_key=True),
Column('col2', String(30)))
t3 = Table('t3', metadata,
)
def testm2m(self):
"""test many-to-many tables with special types for candidate keys"""
-
+
class T1(object):pass
class T2(object):pass
mapper(T2, t2)
sess.flush()
assert t3.count().scalar() == 2
-
+
a.t2s.remove(c)
sess.flush()
-
+
assert t3.count().scalar() == 1
-
+
# TODO: move these tests to either attributes.py test or its own module
class CustomCollectionsTest(ORMTest):
def define_tables(self, metadata):
sometable = Table('sometable', metadata,
Column('col1',Integer, primary_key=True),
Column('data', String(30)))
- someothertable = Table('someothertable', metadata,
+ someothertable = Table('someothertable', metadata,
Column('col1', Integer, primary_key=True),
Column('scol1', Integer, ForeignKey(sometable.c.col1)),
Column('data', String(20))
f = sess.query(Foo).get(f.col1)
assert len(list(f.bars)) == 2
f.bars.clear()
-
+
def testdict(self):
"""test that a 'dict' can be used as a collection and can lazyload."""
def remove(self, item):
if id(item) in self:
del self[id(item)]
-
+
mapper(Foo, sometable, properties={
'bars':relation(Bar, collection_class=AppenderDict)
})
pass
class Bar(object):
def __init__(self, data): self.data = data
-
+
mapper(Foo, sometable, properties={
'bars':relation(Bar,
collection_class=collections.column_mapped_collection(someothertable.c.data))
p.children[4:] = o
assert control == p.children
assert control == list(p.children)
-
+
o = Child()
control.insert(0, o)
p.children.insert(0, o)
sess.save(p1)
sess.flush()
sess.clear()
-
+
p2 = sess.query(Parent).get(p1.col1)
o = list(p2.children)
assert len(o) == 3
Column('data', String(40)),
Column('t2id', Integer, ForeignKey('t2.id'))
)
-
+
def test_basic(self):
class C1(object):pass
class C2(object):pass
class C3(object):pass
-
+
mapper(C1, t1, properties={
't2s':relation(C2),
't2_view':relation(C2, viewonly=True, primaryjoin=and_(t1.c.id==t2.c.t1id, t3.c.t2id==t2.c.id, t3.c.data==t1.c.data))
mapper(C3, t3, properties={
't2':relation(C2)
})
-
+
c1 = C1()
c1.data = 'c1data'
c2a = C2()
sess.save(c3)
sess.flush()
sess.clear()
-
+
c1 = sess.query(C1).get(c1.id)
assert set([x.id for x in c1.t2s]) == set([c2a.id, c2b.id])
assert set([x.id for x in c1.t2_view]) == set([c2b.id])
c1 = sess.query(C1).get(c1.t1id)
assert set([x.t2id for x in c1.t2s]) == set([c2a.t2id, c2b.t2id])
assert set([x.t2id for x in c1.t2_view]) == set([c2b.t2id])
-
-
+
+
if __name__ == "__main__":
- testbase.main()
+ testbase.main()
class SessionTest(AssertMixin):
def setUpAll(self):
tables.create()
-
+
def tearDownAll(self):
tables.drop()
-
+
def tearDown(self):
SessionCls.close_all()
tables.delete()
clear_mappers()
-
+
def setUp(self):
pass
# then see if expunge fails
session.expunge(u)
-
+
@engines.close_open_connections
def test_binds_from_expression(self):
"""test that Session can extract Table objects from ClauseElements and match them to tables."""
sess = Session()
sess.execute(users.insert(), params=dict(user_id=1, user_name='ed'))
assert sess.execute(users.select()).fetchall() == [(1, 'ed')]
-
+
mapper(Address, addresses)
mapper(User, users, properties={
'addresses':relation(Address, backref=backref("user", cascade="all"), cascade="all")
sess.execute(users.insert(), params=dict(user_id=2, user_name='fred'))
assert sess.execute(users.select()).fetchall() == [(1, 'ed'), (2, 'fred')]
sess.close()
-
+
@testing.unsupported('sqlite', 'mssql') # TEMP: test causes mssql to hang
@engines.close_open_connections
def test_transaction(self):
mapper(User, users)
conn1 = testbase.db.connect()
conn2 = testbase.db.connect()
-
+
sess = create_session(transactional=True, bind=conn1)
u = User()
sess.save(u)
assert conn1.execute("select count(1) from users").scalar() == 1
assert testbase.db.connect().execute("select count(1) from users").scalar() == 1
sess.close()
-
+
@testing.unsupported('sqlite', 'mssql') # TEMP: test causes mssql to hang
@engines.close_open_connections
def test_autoflush(self):
mapper(User, users)
conn1 = testbase.db.connect()
conn2 = testbase.db.connect()
-
+
sess = create_session(bind=conn1, transactional=True, autoflush=True)
u = User()
u.user_name='ed'
assert conn1.execute("select count(1) from users").scalar() == 1
assert testbase.db.connect().execute("select count(1) from users").scalar() == 1
sess.close()
-
+
@testing.unsupported('sqlite', 'mssql') # TEMP: test causes mssql to hang
@engines.close_open_connections
def test_autoflush_unbound(self):
except:
sess.rollback()
raise
-
+
@engines.close_open_connections
def test_autoflush_2(self):
class User(object):pass
mapper(User, users)
conn1 = testbase.db.connect()
conn2 = testbase.db.connect()
-
+
sess = create_session(bind=conn1, transactional=True, autoflush=True)
u = User()
u.user_name='ed'
mapper(User, users, properties={
'addresses':relation(Address)
})
-
+
sess = create_session(transactional=True, autoflush=True)
u = sess.query(User).get(8)
newad = Address()
assert u.user_name == 'ed'
assert len(u.addresses) == 3
assert newad not in u.addresses
-
-
+
+
@engines.close_open_connections
def test_external_joined_transaction(self):
class User(object):pass
conn = testbase.db.connect()
trans = conn.begin()
sess = create_session(bind=conn, transactional=True, autoflush=True)
- sess.begin()
+ sess.begin()
u = User()
sess.save(u)
sess.flush()
assert len(sess.query(User).select()) == 0
sess.close()
- @testing.supported('postgres', 'mysql')
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
@engines.close_open_connections
def test_external_nested_transaction(self):
class User(object):pass
u1 = User()
sess.save(u1)
sess.flush()
-
- sess.begin_nested()
+
+ sess.begin_nested()
u2 = User()
sess.save(u2)
sess.flush()
sess.rollback()
-
- trans.commit()
+
+ trans.commit()
assert len(sess.query(User).select()) == 1
except:
conn.close()
raise
-
- @testing.supported('postgres', 'mysql')
+
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
@engines.close_open_connections
def test_heavy_nesting(self):
session = create_session(bind=testbase.db)
session.connection().execute("insert into users (user_name) values ('user1')")
session.begin()
-
+
session.begin_nested()
session.connection().execute("insert into users (user_name) values ('user2')")
session.commit()
assert session.connection().execute("select count(1) from users").scalar() == 2
-
-
- @testing.supported('postgres', 'mysql')
+
+
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
@testing.exclude('mysql', '<', (5, 0, 3))
def test_twophase(self):
# TODO: mock up a failure condition here
class Address(object):pass
mapper(User, users)
mapper(Address, addresses)
-
+
engine2 = create_engine(testbase.db.url)
sess = create_session(transactional=False, autoflush=False, twophase=True)
sess.bind_mapper(User, testbase.db)
engine2.dispose()
assert users.count().scalar() == 1
assert addresses.count().scalar() == 1
-
+
def test_joined_transaction(self):
class User(object):pass
mapper(User, users)
sess = create_session(transactional=True, autoflush=True)
- sess.begin()
+ sess.begin()
u = User()
sess.save(u)
sess.flush()
assert len(sess.query(User).select()) == 0
sess.close()
- @testing.supported('postgres', 'mysql')
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
@testing.exclude('mysql', '<', (5, 0, 3))
def test_nested_transaction(self):
class User(object):pass
sess.save(u2)
sess.flush()
- sess.rollback()
-
+ sess.rollback()
+
sess.commit()
assert len(sess.query(User).select()) == 1
sess.close()
- @testing.supported('postgres', 'mysql')
+ @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
+ 'oracle', 'maxdb')
@testing.exclude('mysql', '<', (5, 0, 3))
def test_nested_autotrans(self):
class User(object):pass
u2 = User()
sess.save(u2)
sess.flush()
-
- sess.rollback()
+
+ sess.rollback()
sess.commit()
assert len(sess.query(User).select()) == 1
sess.save(u)
sess.flush()
assert transaction.get_or_add(testbase.db) is transaction.get_or_add(c) is c
-
+
try:
transaction.add(testbase.db.connect())
assert False
- except exceptions.InvalidRequestError, e:
+ except exceptions.InvalidRequestError, e:
assert str(e) == "Session already has a Connection associated for the given Connection's Engine"
try:
transaction.get_or_add(testbase.db.connect())
assert False
- except exceptions.InvalidRequestError, e:
+ except exceptions.InvalidRequestError, e:
assert str(e) == "Session already has a Connection associated for the given Connection's Engine"
try:
transaction.add(testbase.db)
assert False
- except exceptions.InvalidRequestError, e:
+ except exceptions.InvalidRequestError, e:
assert str(e) == "Session already has a Connection associated for the given Engine"
-
+
transaction.rollback()
assert len(sess.query(User).select()) == 0
sess.close()
assert c.scalar("select count(1) from users") == 1
c.execute("delete from users")
assert c.scalar("select count(1) from users") == 0
-
+
c = testbase.db.connect()
trans = c.begin()
trans.commit()
assert not c.in_transaction()
assert c.scalar("select count(1) from users") == 1
-
-
+
+
@engines.close_open_connections
def test_save_update_delete(self):
-
+
s = create_session()
class User(object):
pass
mapper(User, users)
-
+
user = User()
try:
assert False
except exceptions.InvalidRequestError, e:
assert str(e) == "Instance 'User@%s' is not persisted" % hex(id(user))
-
+
s.save(user)
s.flush()
user = s.query(User).one()
s.expunge(user)
assert user not in s
-
+
# modify outside of session, assert changes remain/get saved
user.user_name = "fred"
s.update(user)
assert s.query(User).count() == 1
user = s.query(User).one()
assert user.user_name == 'fred'
-
+
# ensure its not dirty if no changes occur
s.clear()
assert user not in s
s.update(user)
assert user in s
assert user not in s.dirty
-
+
try:
s.save(user)
assert False
except exceptions.InvalidRequestError, e:
assert str(e) == "Instance 'User@%s' is already persistent" % hex(id(user))
-
+
s2 = create_session()
try:
s2.delete(user)
assert False
except exceptions.InvalidRequestError, e:
assert "is already attached to session" in str(e)
-
+
u2 = s2.query(User).get(user.user_id)
try:
s.delete(u2)
assert False
except exceptions.InvalidRequestError, e:
assert "already persisted with a different identity" in str(e)
-
+
s.delete(user)
s.flush()
assert user not in s
assert s.query(User).count() == 0
-
+
def test_is_modified(self):
s = create_session()
class User(object):pass
class Address(object):pass
-
+
mapper(User, users, properties={'addresses':relation(Address)})
mapper(Address, addresses)
-
+
# save user
u = User()
u.user_name = 'fred'
s.save(u)
s.flush()
s.clear()
-
+
user = s.query(User).one()
assert user not in s.dirty
assert not s.is_modified(user)
s.flush()
assert user not in s.dirty
assert not s.is_modified(user)
-
+
a = Address()
user.addresses.append(a)
assert user in s.dirty
assert s.is_modified(user)
assert not s.is_modified(user, include_collections=False)
-
-
+
+
def test_weak_ref(self):
"""test the weak-referencing identity map, which strongly-references modified items."""
-
+
s = create_session()
class User(object):pass
mapper(User, users)
-
+
# save user
s.save(User())
s.flush()
gc.collect()
assert len(s.identity_map) == 0
assert len(s.identity_map.data) == 0
-
+
user = s.query(User).one()
user.user_name = 'fred'
user = None
gc.collect()
assert len(s.identity_map) == 1
assert len(s.identity_map.data) == 1
-
+
s.flush()
gc.collect()
assert len(s.identity_map) == 0
assert len(s.identity_map.data) == 0
-
+
assert s.query(User).one().user_name == 'fred'
-
+
def test_strong_ref(self):
s = create_session(weak_identity_map=False)
class User(object):pass
mapper(User, users)
-
+
# save user
s.save(User())
s.flush()
s.flush()
self.assert_(s.prune() == 0)
self.assert_(len(s.identity_map) == 0)
-
+
def test_no_save_cascade(self):
mapper(Address, addresses)
mapper(User, users, properties=dict(
s.clear()
assert s.query(User).one().user_id == u.user_id
assert s.query(Address).first() is None
-
+
clear_mappers()
-
+
tables.delete()
mapper(Address, addresses)
mapper(User, users, properties=dict(
addresses=relation(Address, cascade="all", backref=backref("user", cascade="none"))
))
-
+
s = create_session()
u = User()
a = Address()
self._assert_key(key, (User, (1,), None))
key = s.identity_key(User, row=row, entity_name="en")
self._assert_key(key, (User, (1,), "en"))
-
+
def test_extension(self):
mapper(User, users)
log = []
u = User()
sess.save(u)
sess.flush()
-
+
assert log == ['before_flush', 'after_flush', 'before_commit', 'after_commit', 'after_flush_postexec']
-
+
log = []
sess = create_session(transactional=True, extension=MyExt())
u = User()
log = []
sess.commit()
assert log == ['before_commit', 'after_commit']
-
+
def test_pickled_update(self):
mapper(User, users)
sess1 = create_session()
sess2 = create_session()
-
+
u1 = User()
sess1.save(u1)
-
+
try:
sess2.save(u1)
assert False
except exceptions.InvalidRequestError, e:
assert "already attached to session" in str(e)
-
+
u2 = pickle.loads(pickle.dumps(u1))
-
+
sess2.save(u2)
-
+
def test_duplicate_update(self):
mapper(User, users)
Session = sessionmaker()
- sess = Session()
+ sess = Session()
u1 = User()
sess.save(u1)
sess.flush()
assert u1.user_id is not None
-
+
sess.expunge(u1)
-
+
assert u1 not in sess
-
+
u2 = sess.query(User).get(u1.user_id)
assert u2 is not None and u2 is not u1
assert u2 in sess
-
+
self.assertRaises(Exception, lambda: sess.update(u1))
sess.expunge(u2)
assert u2 not in sess
-
+
u1.user_name = "John"
u2.user_name = "Doe"
sess.update(u1)
assert u1 in sess
-
+
sess.flush()
-
+
sess.clear()
u3 = sess.query(User).get(u1.user_id)
def define_tables(self, metadata):
global table, table2
- table = Table('sometable', metadata,
+ table = Table('sometable', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30)))
- table2 = Table('someothertable', metadata,
+ table2 = Table('someothertable', metadata,
Column('id', Integer, primary_key=True),
Column('someid', None, ForeignKey('sometable.id'))
)
-
+
def test_basic(self):
Session = scoped_session(sessionmaker())
Session.save(s)
Session.commit()
Session.remove()
-
+
assert SomeObject(id=1, data="hello", options=[SomeOtherObject(someid=1)]) == Session.query(SomeObject).one()
-
-
+
+
class ScopedMapperTest(PersistTest):
def setUpAll(self):
global metadata, table, table2
metadata = MetaData(testbase.db)
- table = Table('sometable', metadata,
+ table = Table('sometable', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30)))
- table2 = Table('someothertable', metadata,
+ table2 = Table('someothertable', metadata,
Column('id', Integer, primary_key=True),
Column('someid', None, ForeignKey('sometable.id'))
)
global SomeObject, SomeOtherObject
class SomeObject(object):pass
class SomeOtherObject(object):pass
-
+
global Session
-
+
Session = scoped_session(create_session)
Session.mapper(SomeObject, table, properties={
'options':relation(SomeOtherObject)
def tearDownAll(self):
metadata.drop_all()
-
+
def tearDown(self):
for table in metadata.table_iterator(reverse=True):
table.delete().execute()
pass
Session.mapper(Bar, table2, extension=[ext])
assert hasattr(Bar, 'query')
-
+
class Baz(object):
pass
Session.mapper(Baz, table2, extension=ext)
assert hasattr(Baz, 'query')
-
+
def test_validating_constructor(self):
s2 = SomeObject(someid=12)
s3 = SomeOtherObject(someid=123, bogus=345)
class ScopedMapperTest2(ORMTest):
def define_tables(self, metadata):
global table, table2
- table = Table('sometable', metadata,
+ table = Table('sometable', metadata,
Column('id', Integer, primary_key=True),
Column('data', String(30)),
Column('type', String(30))
-
+
)
- table2 = Table('someothertable', metadata,
+ table2 = Table('someothertable', metadata,
Column('id', Integer, primary_key=True),
Column('someid', None, ForeignKey('sometable.id')),
Column('somedata', String(30)),
)
-
+
def test_inheritance(self):
def expunge_list(l):
for x in l:
Session.expunge(x)
return l
-
+
class BaseClass(fixtures.Base):
pass
class SubClass(BaseClass):
pass
-
+
Session = scoped_session(sessionmaker())
Session.mapper(BaseClass, table, polymorphic_identity='base', polymorphic_on=table.c.type)
Session.mapper(SubClass, table2, polymorphic_identity='sub', inherits=BaseClass)
-
+
b = BaseClass(data='b1')
s = SubClass(data='s1', somedata='somedata')
Session.commit()
Session.clear()
-
+
assert expunge_list([BaseClass(data='b1'), SubClass(data='s1', somedata='somedata')]) == BaseClass.query.all()
assert expunge_list([SubClass(data='s1', somedata='somedata')]) == SubClass.query.all()
-
-
-if __name__ == "__main__":
+
+
+if __name__ == "__main__":
testbase.main()
# coding: utf-8
+
+"""Tests unitofwork operations."""
+
import testbase
import pickleable
from sqlalchemy import *
from sqlalchemy.orm import *
from testlib import *
from testlib.tables import *
-from testlib import tables, fixtures
+from testlib import engines, tables, fixtures
-"""tests unitofwork operations"""
# TODO: convert suite to not use Session.mapper, use fixtures.Base
# with explicit session.save()
class UnitOfWorkTest(object):
pass
-
+
class HistoryTest(ORMTest):
metadata = tables.metadata
def define_tables(self, metadata):
pass
-
+
def test_backref(self):
s = Session()
class User(object):pass
m = mapper(User, users, properties = dict(
addresses = relation(am, backref='user', lazy=False))
)
-
+
u = User(_sa_session=s)
a = Address(_sa_session=s)
a.user = u
s.close()
u = s.query(m).select()[0]
print u.addresses[0].user
-
+
class VersioningTest(ORMTest):
def define_tables(self, metadata):
global version_table
f1 = Foo(value='f1', _sa_session=s)
f2 = Foo(value='f2', _sa_session=s)
s.commit()
-
+
f1.value='f1rev2'
s.commit()
s2 = Session()
# Only dialects with a sane rowcount can detect the ConcurrentModificationError
if testbase.db.dialect.supports_sane_rowcount:
assert success
-
+
s.close()
f1 = s.query(Foo).get(f1.id)
f2 = s.query(Foo).get(f2.id)
-
+
f1_s.value='f1rev4'
s2.commit()
success = True
if testbase.db.dialect.supports_sane_multi_rowcount:
assert success
-
+
@engines.close_open_connections
def test_versioncheck(self):
"""test that query.with_lockmode performs a 'version check' on an already loaded instance"""
s1.query(Foo).load(f1s1.id)
# now assert version OK
s1.query(Foo).with_lockmode('read').get(f1s1.id)
-
+
# assert brand new load is OK too
s1.close()
s1.query(Foo).with_lockmode('read').get(f1s1.id)
-
+
@engines.close_open_connections
def test_noversioncheck(self):
"""test that query.with_lockmode works OK when the mapper has no version id col"""
f1s2 = s2.query(Foo).with_lockmode('read').get(f1s1.id)
assert f1s2.id == f1s1.id
assert f1s2.value == f1s1.value
-
+
class UnicodeTest(ORMTest):
def define_tables(self, metadata):
global uni_table, uni_table2
self.assert_(t1.txt == txt)
Session.commit()
self.assert_(t1.txt == txt)
-
+
def test_relation(self):
class Test(object):
def __init__(self, txt):
self.txt = txt
class Test2(object):pass
-
+
mapper(Test, uni_table, properties={
't2s':relation(Test2)
})
mapper(Test2, uni_table2)
-
+
txt = u"\u0160\u0110\u0106\u010c\u017d"
t1 = Test(txt=txt)
t1.t2s.append(Test2())
assert len(t1.t2s) == 2
class UnicodeSchemaTest(ORMTest):
- @testing.supported('sqlite', 'postgres')
- def define_tables(self, metadata):
- global t1, t2, t3
+ __unsupported_on__ = ('oracle', 'mssql', 'firebird', 'sybase',
+ 'access', 'maxdb')
+ __excluded_on__ = (('mysql', '<', (4, 1, 1)),)
+
+ metadata = MetaData(engines.utf8_engine())
- #unicode_bind = utf8_engine()
+ def define_tables(self, metadata):
+ global t1, t2
t1 = Table('unitable1', metadata,
Column(u'méil', Integer, primary_key=True, key='a'),
Column(u'\u6e2c\u8a66_2', Integer, key="e"),
test_needs_fk=True,
)
-
- @testing.supported('sqlite', 'postgres')
+
def test_mapping(self):
class A(fixtures.Base):pass
class B(fixtures.Base):pass
assert new_a1.t2s[0].d == b1.d
Session.clear()
- @testing.supported('sqlite', 'postgres')
def test_inheritance_mapping(self):
class A(fixtures.Base):pass
class B(A):pass
# breaks the comparison ?????
l = Session.query(A).all()
assert [A(b=5), B(e=7)] == l
-
+
class MutableTypesTest(ORMTest):
def define_tables(self, metadata):
global table
{'mutabletest_id': f1.id, 'val': u'hi', 'data':f1.data}
),
])
-
+
def test_nocomparison(self):
"""test that types marked as MutableType get changes detected on them when the type has no __eq__ method"""
class Foo(object):pass
f1 = Foo()
f1.data = pickleable.BarWithoutCompare(4,5)
Session.commit()
-
+
def go():
Session.commit()
self.assert_sql_count(testbase.db, go, 0)
-
+
Session.close()
f2 = Session.query(Foo).get_by(id=f1.id)
def go():
Session.commit()
self.assert_sql_count(testbase.db, go, 1)
-
+
Session.close()
f3 = Session.query(Foo).get_by(id=f1.id)
print f2.data, f3.data
def go():
Session.commit()
self.assert_sql_count(testbase.db, go, 0)
-
+
def test_unicode(self):
"""test that two equivalent unicode values dont get flagged as changed.
-
+
apparently two equal unicode objects dont compare via "is" in all cases, so this
tests the compare_values() call on types.String and its usage via types.Unicode."""
class Foo(object):pass
Column('id', Integer, Sequence('mutableidseq', optional=True), primary_key=True),
Column('data', PickleType(comparator=operator.eq)),
)
-
+
def test_dicts(self):
"""dictionaries dont pickle the same way twice, sigh."""
def go():
Session.commit()
self.assert_sql_count(testbase.db, go, 1)
-
+
Session.clear()
f = Session.query(Foo).get(f1.id)
assert f.data == [{'personne': {'nom': u'Smith', 'pers_id': 1, 'prenom': u'john', 'civilite': u'Mr', \
'int_3': False, 'int_2': False, 'int_1': u'23', 'VenSoir': False, 'str_1': u'Test', \
'SamMidi': False, 'str_2': u'chien', 'DimMidi': False, 'SamSoir': True, 'SamAcc': False}}]
-
+
class PKTest(ORMTest):
def define_tables(self, metadata):
global table, table2, table3
table = Table(
- 'multipk', metadata,
+ 'multipk', metadata,
Column('multi_id', Integer, Sequence("multi_id_seq", optional=True), primary_key=True),
Column('multi_rev', Integer, primary_key=True),
Column('name', String(50), nullable=False),
Column('value', String(100))
)
-
+
table2 = Table('multipk2', metadata,
Column('pk_col_1', String(30), primary_key=True),
Column('pk_col_2', String(30), primary_key=True),
)
# not supported on sqlite since sqlite's auto-pk generation only works with
- # single column primary keys
- @testing.unsupported('sqlite')
+ # single column primary keys
+ @testing.fails_on('sqlite')
def test_primarykey(self):
class Entry(object):
pass
Session.close()
e2 = Query(Entry).get((e.multi_id, 2))
self.assert_(e is not e2 and e._instance_key == e2._instance_key)
-
+
# this one works with sqlite since we are manually setting up pk values
def test_manualpk(self):
class Entry(object):
e.pk_col_2 = 'pk1_related'
e.data = 'im the data'
Session.commit()
-
+
def test_keypks(self):
import datetime
class Entity(object):
class ForeignPKTest(ORMTest):
"""tests mapper detection of the relationship direction when parent/child tables are joined on their
primary keys"""
-
+
def define_tables(self, metadata):
global people, peoplesites
Column('firstname', String(10)),
Column('lastname', String(10)),
)
-
+
peoplesites = Table("peoplesites", metadata,
- Column('person', String(10), ForeignKey("people.person"),
+ Column('person', String(10), ForeignKey("people.person"),
primary_key=True),
Column('site', String(10)),
)
-
+
def test_basic(self):
class PersonSite(object):pass
class Person(object):pass
m2 = mapper(Person, people,
properties = {
- 'sites' : relation(PersonSite),
+ 'sites' : relation(PersonSite),
},
)
compile_mappers()
assert u.name == 'test2'
assert u.counter == 2
self.assert_sql_count(testbase.db, go, 1)
-
+
sess.clear()
u = sess.query(User).get(u.id)
assert u.name == 'test2'
assert u.counter == 2
-
+
@testing.unsupported('mssql')
def test_insert(self):
class User(object):
pass
class MyOtherClass(object):
pass
-
+
mapper(MyOtherClass, myothertable)
mapper(MyClass, mytable, properties={
ForeignKeyConstraint(['parent_id'],['mytable.id']), # no CASCADE, the same as ON DELETE RESTRICT
test_needs_fk=True,
)
-
+
def test_assertions(self):
class MyClass(object):
pass
class MyOtherClass(object):
pass
-
+
mapper(MyOtherClass, myothertable)
-
+
try:
mapper(MyClass, mytable, properties={
'children':relation(MyOtherClass, passive_deletes='all', cascade="all")
assert False
except exceptions.ArgumentError, e:
assert str(e) == "Can't set passive_deletes='all' in conjunction with 'delete' or 'delete-orphan' cascade"
-
+
@testing.unsupported('sqlite')
def test_extra_passive(self):
class MyClass(object):
pass
class MyOtherClass(object):
pass
-
+
mapper(MyOtherClass, myothertable)
mapper(MyClass, mytable, properties={
except exceptions.DBAPIError:
assert True
-
+
class DefaultTest(ORMTest):
"""tests that when saving objects whose table contains DefaultGenerators, either python-side, preexec or database-side,
- the newly saved instances receive all the default values either through a post-fetch or getting the pre-exec'ed
+ the newly saved instances receive all the default values either through a post-fetch or getting the pre-exec'ed
defaults back from the engine."""
-
+
def define_tables(self, metadata):
db = testbase.db
use_string_defaults = db.engine.__module__.endswith('postgres') or db.engine.__module__.endswith('oracle') or db.engine.__module__.endswith('sqlite')
hohotype = Integer
self.hohoval = 9
self.althohoval = 15
-
+
global default_table
default_table = Table('default_test', metadata,
Column('id', Integer, Sequence("dt_seq", optional=True), primary_key=True),
Column('foober', String(30), default="im foober", onupdate="im the update")
)
-
+
def test_insert(self):
class Hoho(object):pass
mapper(Hoho, default_table)
-
+
h1 = Hoho(hoho=self.althohoval)
h2 = Hoho(counter=12)
h3 = Hoho(hoho=self.althohoval, counter=12)
h4 = Hoho()
h5 = Hoho(foober='im the new foober')
Session.commit()
-
+
self.assert_(h1.hoho==self.althohoval)
self.assert_(h3.hoho==self.althohoval)
-
+
def go():
# test deferred load of attribues, one select per instance
self.assert_(h2.hoho==h4.hoho==h5.hoho==self.hohoval)
self.assert_sql_count(testbase.db, go, 3)
-
+
def go():
self.assert_(h1.counter == h4.counter==h5.counter==7)
self.assert_sql_count(testbase.db, go, 1)
-
+
def go():
self.assert_(h3.counter == h2.counter == 12)
self.assert_(h2.foober == h3.foober == h4.foober == 'im foober')
self.assert_(h5.foober=='im the new foober')
self.assert_sql_count(testbase.db, go, 0)
-
+
Session.close()
-
+
l = Hoho.query.all()
-
+
(h1, h2, h3, h4, h5) = l
-
+
self.assert_(h1.hoho==self.althohoval)
self.assert_(h3.hoho==self.althohoval)
self.assert_(h2.hoho==h4.hoho==h5.hoho==self.hohoval)
self.assert_(h1.counter == h4.counter==h5.counter==7)
self.assert_(h2.foober == h3.foober == h4.foober == 'im foober')
self.assert_(h5.foober=='im the new foober')
-
+
def test_insert_nopostfetch(self):
# populates the PassiveDefaults explicitly so there is no "post-update"
class Hoho(object):pass
mapper(Hoho, default_table)
-
+
h1 = Hoho(hoho="15", counter="15")
-
+
Session.commit()
def go():
self.assert_(h1.hoho=="15")
self.assert_(h1.counter=="15")
self.assert_(h1.foober=="im foober")
self.assert_sql_count(testbase.db, go, 0)
-
+
def test_update(self):
class Hoho(object):pass
mapper(Hoho, default_table)
class OneToManyTest(ORMTest):
metadata = tables.metadata
-
+
def define_tables(self, metadata):
pass
u2.user_name = 'user2modified'
u1.addresses.append(a3)
del u1.addresses[0]
- self.assert_sql(testbase.db, lambda: Session.commit(),
+ self.assert_sql(testbase.db, lambda: Session.commit(),
[
(
"UPDATE users SET user_name=:user_name WHERE users.user_id = :users_user_id",
m2 = mapper(Address, addresses)
m = mapper(User, users, properties={
'boston_addresses' : relation(m2, primaryjoin=
- and_(users.c.user_id==addresses.c.user_id,
+ and_(users.c.user_id==addresses.c.user_id,
addresses.c.email_address.like('%boston%'))),
'newyork_addresses' : relation(m2, primaryjoin=
- and_(users.c.user_id==addresses.c.user_id,
+ and_(users.c.user_id==addresses.c.user_id,
addresses.c.email_address.like('%newyork%'))),
})
u = User()
metadata = tables.metadata
def define_tables(self, metadata):
pass
-
+
def setUp(self):
super(SaveTest, self).setUp()
keywords.insert().execute(
u2.user_name = 'savetester2'
Session.save(u)
-
+
Session.flush([u])
Session.commit()
nu = Session.get(m, u.user_id)
print "U: " + repr(u) + "NU: " + repr(nu)
self.assert_(u is nu)
-
+
# clear out the identity map, so next get forces a SELECT
Session.close()
nu = Session.get(m, u.user_id)
self.assert_(u is not nu and u.user_id == nu.user_id and nu.user_name == 'savetester')
Session.close()
-
+
# change first users name and save
Session.update(u)
u.user_name = 'modifiedname'
print repr(u.user_id), repr(userlist[0].user_id), repr(userlist[0].user_name)
self.assert_(u.user_id == userlist[0].user_id and userlist[0].user_name == 'modifiedname')
self.assert_(u2.user_id == userlist[1].user_id and userlist[1].user_name == 'savetester2')
-
+
def test_synonym(self):
class User(object):
def _get_name(self):
def _set_name(self, name):
self.user_name = name + ":User"
name = property(_get_name, _set_name)
-
+
mapper(User, users, properties={
'name':synonym('user_name')
})
-
+
u = User()
u.name = "some name"
assert u.name == 'User:some name:User'
Session.clear()
u = Session.query(User).first()
assert u.name == 'User:some name:User'
-
+
def test_lazyattr_commit(self):
"""tests that when a lazy-loaded list is unloaded, and a commit occurs, that the
'passive' call on that list does not blow away its value"""
-
+
m1 = mapper(User, users, properties = {
'addresses': relation(mapper(Address, addresses))
})
-
+
u = User()
u.addresses.append(Address())
u.addresses.append(Address())
u1.user_name = 'newname'
Session.commit()
self.assert_(len(u1.addresses) == 4)
-
+
def test_inherits(self):
m1 = mapper(User, users)
-
+
class AddressUser(User):
"""a user object that also has the users mailing address."""
pass
AddressUser,
addresses, inherits=m1
)
-
+
au = AddressUser()
Session.commit()
Session.close()
l = Session.query(AddressUser).selectone()
self.assert_(l.user_id == au.user_id and l.address_id == au.address_id)
-
+
def test_deferred(self):
"""test deferred column operations"""
-
+
mapper(User, users, properties={
'user_name':deferred(users.c.user_name)
})
-
+
# dont set deferred attribute, commit session
u = User()
u.user_id=42
Session.commit()
assert list(Session.execute(users.select(), mapper=User)) == [(42, 'some name')]
Session.clear()
-
+
# assert that a set operation doesn't trigger a load operation
u = Session.query(User).filter(User.user_name=='some name').one()
def go():
self.assert_sql_count(testbase.db, go, 0)
Session.flush()
assert list(Session.execute(users.select(), mapper=User)) == [(42, 'some other name')]
-
+
Session.clear()
-
+
# test assigning None to an unloaded deferred also works
u = Session.query(User).filter(User.user_name=='some other name').one()
u.user_name = None
Session.flush()
assert list(Session.execute(users.select(), mapper=User)) == [(42, None)]
-
-
+
+
# why no support on oracle ? because oracle doesn't save
- # "blank" strings; it saves a single space character.
- @testing.unsupported('oracle')
+ # "blank" strings; it saves a single space character.
+ @testing.unsupported('oracle')
def test_dont_update_blanks(self):
mapper(User, users)
u = User()
"""tests a save of an object where each instance spans two tables. also tests
redefinition of the keynames for the column properties."""
usersaddresses = sql.join(users, addresses, users.c.user_id == addresses.c.user_id)
- m = mapper(User, usersaddresses,
+ m = mapper(User, usersaddresses,
properties = dict(
- email = addresses.c.email_address,
+ email = addresses.c.email_address,
foo_id = [users.c.user_id, addresses.c.user_id],
)
)
-
+
u = User()
u.user_name = 'multitester'
u.email = 'multi@test.org'
id = m.primary_key_from_instance(u)
Session.close()
-
+
u = Session.get(User, id)
assert u.user_name == 'multitester'
-
+
usertable = users.select(users.c.user_id.in_([u.foo_id])).execute().fetchall()
self.assertEqual(usertable[0].values(), [u.foo_id, 'multitester'])
addresstable = addresses.select(addresses.c.address_id.in_([u.address_id])).execute().fetchall()
Session.close()
u = Session.get(User, id)
assert u.user_name == 'imnew'
-
+
def test_history_get(self):
"""tests that the history properly lazy-fetches data when it wasnt otherwise loaded"""
mapper(User, users, properties={
'addresses':relation(Address, cascade="all, delete-orphan")
})
mapper(Address, addresses)
-
+
u = User()
u.addresses.append(Address())
u.addresses.append(Address())
Session.commit()
assert users.count().scalar() == 0
assert addresses.count().scalar() == 0
-
-
-
+
+
+
def test_batchmode(self):
"""test the 'batch=False' flag on mapper()"""
-
+
class TestExtension(MapperExtension):
def before_insert(self, mapper, connection, instance):
self.current_instance = instance
u2 = User()
u2.username = 'user2'
Session.commit()
-
+
clear_mappers()
-
+
m = mapper(User, users, extension=TestExtension())
u1 = User()
u1.username = 'user1'
assert False
except AssertionError:
assert True
-
-
+
+
class ManyToOneTest(ORMTest):
metadata = tables.metadata
-
+
def define_tables(self, metadata):
pass
-
+
def test_m2o_onetoone(self):
# TODO: put assertion in here !!!
m = mapper(Address, addresses, properties = dict(
a.user = User()
a.user.user_name = elem['user_name']
objects.append(a)
-
+
Session.commit()
objects[2].email_address = 'imnew@foo.bar'
objects[3].user = User()
"UPDATE email_addresses SET email_address=:email_address WHERE email_addresses.address_id = :email_addresses_address_id":
lambda ctx: {'email_address': 'imnew@foo.bar', 'email_addresses_address_id': objects[2].address_id}
,
-
+
"UPDATE email_addresses SET user_id=:user_id WHERE email_addresses.address_id = :email_addresses_address_id":
lambda ctx: {'user_id': objects[3].user.user_id, 'email_addresses_address_id': objects[3].address_id}
},
-
+
],
with_sequences=[
(
"UPDATE email_addresses SET email_address=:email_address WHERE email_addresses.address_id = :email_addresses_address_id":
lambda ctx: {'email_address': 'imnew@foo.bar', 'email_addresses_address_id': objects[2].address_id}
,
-
+
"UPDATE email_addresses SET user_id=:user_id WHERE email_addresses.address_id = :email_addresses_address_id":
lambda ctx: {'user_id': objects[3].user.user_id, 'email_addresses_address_id': objects[3].address_id}
},
-
+
])
l = sql.select([users, addresses], sql.and_(users.c.user_id==addresses.c.user_id, addresses.c.address_id==a.address_id)).execute()
assert l.fetchone().values() == [a.user.user_id, 'asdf8d', a.address_id, a.user_id, 'theater@foo.com']
a1.email_address = 'emailaddress1'
u1 = User()
u1.user_name='user1'
-
+
a1.user = u1
Session.commit()
Session.close()
u1 = Session.query(User).get(u1.user_id)
u2 = Session.query(User).get(u2.user_id)
assert a1.user is u1
-
+
a1.user = u2
Session.commit()
Session.close()
assert sess.query(Address).get(a1.address_id).user is None
assert sess.query(User).get(u1.user_id).addresses == []
-
+
class ManyToManyTest(ORMTest):
metadata = tables.metadata
-
+
def define_tables(self, metadata):
pass
-
+
def test_manytomany(self):
items = orderitems
item.keywords.append(k)
Session.commit()
-
+
l = Session.query(m).select(items.c.item_name.in_([e['item_name'] for e in data[1:]]), order_by=[items.c.item_name])
self.assert_result(l, *data)
lambda ctx: [{'item_id': objects[5].item_id, 'keyword_id': k.keyword_id}]
)
],
-
+
with_sequences = [
{
"UPDATE items SET item_name=:item_name WHERE items.item_id = :items_item_id":
"DELETE FROM itemkeywords WHERE itemkeywords.item_id = :item_id AND itemkeywords.keyword_id = :keyword_id",
[{'item_id': objects[5].item_id, 'keyword_id': dkid}]
),
- (
+ (
"INSERT INTO itemkeywords (item_id, keyword_id) VALUES (:item_id, :keyword_id)",
lambda ctx: [{'item_id': objects[2].item_id, 'keyword_id': k.keyword_id}]
)
])
-
+
Session.delete(objects[3])
Session.commit()
i.keywords.append(k1)
i.keywords.append(k2)
Session.commit()
-
+
assert itemkeywords.count().scalar() == 2
i.keywords = []
Session.commit()
def test_scalar(self):
"""test that dependency.py doesnt try to delete an m2m relation referencing None."""
-
+
mapper(Keyword, keywords)
mapper(Item, orderitems, properties = dict(
keyword = relation(Keyword, secondary=itemkeywords, uselist=False),
))
-
+
i = Item()
Session.commit()
Session.delete(i)
Session.commit()
-
-
+
+
def test_manytomany_update(self):
"""tests some history operations on a many to many"""
return other.__class__ == Keyword and other.name == self.name
def __repr__(self):
return "Keyword(%s, %s)" % (getattr(self, 'keyword_id', 'None'), self.name)
-
+
mapper(Keyword, keywords)
mapper(Item, orderitems, properties = dict(
keywords = relation(Keyword, secondary=itemkeywords, lazy=False, order_by=keywords.c.name),
item.keywords.append(k2)
item.keywords.append(k3)
Session.commit()
-
+
item.keywords = []
item.keywords.append(k1)
item.keywords.append(k2)
Session.commit()
-
+
Session.close()
item = Session.query(Item).get(item.item_id)
print [k1, k2]
print item.keywords
assert item.keywords == [k1, k2]
-
+
def test_association(self):
"""basic test of an association object"""
class IKAssociation(object):
))
data = [Item,
- {'item_name': 'a_item1', 'keywords' : (IKAssociation,
+ {'item_name': 'a_item1', 'keywords' : (IKAssociation,
[
{'keyword' : (Keyword, {'name': 'big'})},
- {'keyword' : (Keyword, {'name': 'green'})},
+ {'keyword' : (Keyword, {'name': 'green'})},
{'keyword' : (Keyword, {'name': 'purple'})},
{'keyword' : (Keyword, {'name': 'round'})}
]
- )
+ )
},
- {'item_name': 'a_item2', 'keywords' : (IKAssociation,
+ {'item_name': 'a_item2', 'keywords' : (IKAssociation,
[
{'keyword' : (Keyword, {'name': 'huge'})},
- {'keyword' : (Keyword, {'name': 'violet'})},
+ {'keyword' : (Keyword, {'name': 'violet'})},
{'keyword' : (Keyword, {'name': 'yellow'})}
]
- )
+ )
},
- {'item_name': 'a_item3', 'keywords' : (IKAssociation,
+ {'item_name': 'a_item3', 'keywords' : (IKAssociation,
[
{'keyword' : (Keyword, {'name': 'big'})},
- {'keyword' : (Keyword, {'name': 'blue'})},
+ {'keyword' : (Keyword, {'name': 'blue'})},
]
- )
+ )
}
]
for elem in data[1:]:
Session.close()
l = Item.query.filter(items.c.item_name.in_([e['item_name'] for e in data[1:]])).order_by(items.c.item_name).all()
self.assert_result(l, *data)
-
+
class SaveTest2(ORMTest):
-
+
def define_tables(self, metadata):
global users, addresses
users = Table('users', metadata,
Column('rel_user_id', Integer, ForeignKey(users.c.user_id)),
Column('email_address', String(20)),
)
-
+
def test_m2o_nonmatch(self):
m = mapper(Address, addresses, properties = dict(
user = relation(mapper(User, users), lazy = True, uselist = False)
{'rel_user_id': 2, 'email_address': 'thesdf@asdf.com'}
)
],
-
+
with_sequences = [
(
"INSERT INTO users (user_id, user_name) VALUES (:user_id, :user_name)",
class BooleanColTest(ORMTest):
def define_tables(self, metadata):
global t
- t =Table('t1', metadata,
+ t =Table('t1', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(30)),
Column('value', Boolean))
-
+
def test_boolean(self):
# use the regular mapper
from sqlalchemy.orm import mapper
-
+
class T(fixtures.Base):
pass
mapper(T, t)
-
+
sess = create_session()
t1 = T(value=True, name="t1")
t2 = T(value=False, name="t2")
sess.save(t1)
sess.save(t2)
sess.save(t3)
-
+
sess.flush()
-
+
for clear in (False, True):
if clear:
sess.clear()
if clear:
sess.clear()
self.assertEquals(sess.query(T).filter(T.value==False).all(), [T(value=False, name="t2")])
-
+
t2 = sess.query(T).get(t2.id)
t2.value = True
sess.flush()
t2.value = False
sess.flush()
self.assertEquals(sess.query(T).filter(T.value==True).all(), [T(value=True, name="t1"),T(value=True, name="t3")])
-
-
+
+
class RowSwitchTest(ORMTest):
def define_tables(self, metadata):
global t1, t2, t3, t1t3
-
+
global T1, T2, T3
-
+
Session.remove()
-
+
# parent
t1 = Table('t1', metadata,
Column('id', Integer, primary_key=True),
Column('t1id', Integer, ForeignKey('t1.id'),nullable=False),
Column('t3id', Integer, ForeignKey('t3.id'),nullable=False),
)
-
+
class T1(fixtures.Base):
pass
class T3(fixtures.Base):
pass
-
+
def tearDown(self):
Session.remove()
super(RowSwitchTest, self).tearDown()
-
+
def test_onetomany(self):
mapper(T1, t1, properties={
't2s':relation(T2, cascade="all, delete-orphan")
})
mapper(T2, t2)
-
+
sess = Session(autoflush=False)
-
+
o1 = T1(data='some t1', id=1)
o1.t2s.append(T2(data='some t2', id=1))
o1.t2s.append(T2(data='some other t2', id=2))
-
+
sess.save(o1)
sess.flush()
-
+
assert list(sess.execute(t1.select(), mapper=T1)) == [(1, 'some t1')]
assert list(sess.execute(t2.select(), mapper=T1)) == [(1, 'some t2', 1), (2, 'some other t2', 1)]
-
+
o2 = T1(data='some other t1', id=o1.id, t2s=[
T2(data='third t2', id=3),
T2(data='fourth t2', id=4),
assert list(sess.execute(t3.select(), mapper=T1)) == [(3, 'third t3'), (4, 'fourth t3')]
def test_manytoone(self):
-
+
mapper(T2, t2, properties={
't1':relation(T1)
})
assert list(sess.execute(t1.select(), mapper=T1)) == [(2, 'some other t1')]
assert list(sess.execute(t2.select(), mapper=T1)) == [(1, 'some other t2', 2)]
-
-
-
+
+
+
if __name__ == "__main__":
- testbase.main()
+ testbase.main()
function calls made during the test. The count can vary between Python
2.4 and 2.5.
"""
-
- @testing.supported('postgres')
- @profiling.profiled('create', call_range=(1500, 1880), always=True)
+
+ __only_on__ = 'postgres'
+
+ @profiling.profiled('create', call_range=(1500, 1880), always=True)
def test_1_create_tables(self):
global metadata
metadata = MetaData(testbase.db)
-
+
Zoo = Table('Zoo', metadata,
Column('ID', Integer, Sequence('zoo_id_seq'), primary_key=True, index=True),
Column('Name', Unicode(255)),
Column('LastEscape', DateTime),
Column('Admission', Float),
)
-
+
Animal = Table('Animal', metadata,
Column('ID', Integer, Sequence('animal_id_seq'), primary_key=True),
Column('ZooID', Integer, ForeignKey('Zoo.ID'), index=True),
Column('AlternateFoodID', Integer),
)
metadata.create_all()
-
- @testing.supported('postgres')
+
@profiling.profiled('populate', call_range=(2700, 3700), always=True)
def test_1a_populate(self):
Zoo = metadata.tables['Zoo']
Animal = metadata.tables['Animal']
-
+
wap = Zoo.insert().execute(Name=u'Wild Animal Park',
Founded=datetime.date(2000, 1, 1),
# 59 can give rounding errors with divmod, which
LastEscape=datetime.datetime(2004, 7, 29, 5, 6, 7),
Admission=4.95,
).last_inserted_ids()[0]
-
+
sdz = Zoo.insert().execute(Name =u'San Diego Zoo',
Founded = datetime.date(1935, 9, 13),
Opens = datetime.time(9, 0, 0),
Admission = 0,
).last_inserted_ids()[0]
-
+
Zoo.insert().execute(
Name = u'Montr\xe9al Biod\xf4me',
Founded = datetime.date(1992, 6, 19),
Opens = datetime.time(9, 0, 0),
Admission = 11.75,
)
-
+
seaworld = Zoo.insert().execute(
Name =u'Sea_World', Admission = 60).last_inserted_ids()[0]
-
+
# Let's add a crazy futuristic Zoo to test large date values.
lp = Zoo.insert().execute(Name =u'Luna Park',
Founded = datetime.date(2072, 7, 17),
Opens = datetime.time(0, 0, 0),
Admission = 134.95,
).last_inserted_ids()[0]
-
+
# Animals
leopardid = Animal.insert().execute(Species=u'Leopard', Lifespan=73.5,
).last_inserted_ids()[0]
Animal.update(Animal.c.ID==leopardid).execute(ZooID=wap,
LastEscape=datetime.datetime(2004, 12, 21, 8, 15, 0, 999907))
-
+
lion = Animal.insert().execute(Species=u'Lion', ZooID=wap).last_inserted_ids()[0]
Animal.insert().execute(Species=u'Slug', Legs=1, Lifespan=.75)
-
+
tiger = Animal.insert().execute(Species=u'Tiger', ZooID=sdz
).last_inserted_ids()[0]
-
+
# Override Legs.default with itself just to make sure it works.
Animal.insert().execute(Species=u'Bear', Legs=4)
Animal.insert().execute(Species=u'Ostrich', Legs=2, Lifespan=103.2)
Animal.insert().execute(Species=u'Centipede', Legs=100)
-
+
emp = Animal.insert().execute(Species=u'Emperor Penguin', Legs=2,
ZooID=seaworld).last_inserted_ids()[0]
adelie = Animal.insert().execute(Species=u'Adelie Penguin', Legs=2,
ZooID=seaworld).last_inserted_ids()[0]
-
+
Animal.insert().execute(Species=u'Millipede', Legs=1000000, ZooID=sdz)
-
+
# Add a mother and child to test relationships
bai_yun = Animal.insert().execute(Species=u'Ape', Name=u'Bai Yun',
Legs=2).last_inserted_ids()[0]
Animal.insert().execute(Species=u'Ape', Name=u'Hua Mei', Legs=2,
MotherID=bai_yun)
-
- @testing.supported('postgres')
+
@profiling.profiled('insert', call_range=(150, 220), always=True)
def test_2_insert(self):
Animal = metadata.tables['Animal']
i = Animal.insert()
for x in xrange(ITERATIONS):
tick = i.execute(Species=u'Tick', Name=u'Tick %d' % x, Legs=8)
-
- @testing.supported('postgres')
+
@profiling.profiled('properties', call_range=(2300, 3030), always=True)
def test_3_properties(self):
Zoo = metadata.tables['Zoo']
Animal = metadata.tables['Animal']
-
+
def fullobject(select):
"""Iterate over the full result row."""
return list(select.execute().fetchone())
-
+
for x in xrange(ITERATIONS):
# Zoos
WAP = fullobject(Zoo.select(Zoo.c.Name==u'Wild Animal Park'))
SDZ = fullobject(Zoo.select(Zoo.c.Founded==datetime.date(1935, 9, 13)))
Biodome = fullobject(Zoo.select(Zoo.c.Name==u'Montr\xe9al Biod\xf4me'))
seaworld = fullobject(Zoo.select(Zoo.c.Admission == float(60)))
-
+
# Animals
leopard = fullobject(Animal.select(Animal.c.Species ==u'Leopard'))
ostrich = fullobject(Animal.select(Animal.c.Species==u'Ostrich'))
millipede = fullobject(Animal.select(Animal.c.Legs==1000000))
ticks = fullobject(Animal.select(Animal.c.Species==u'Tick'))
-
- @testing.supported('postgres')
+
@profiling.profiled('expressions', call_range=(9200, 12050), always=True)
def test_4_expressions(self):
Zoo = metadata.tables['Zoo']
Animal = metadata.tables['Animal']
-
+
def fulltable(select):
"""Iterate over the full result table."""
return [list(row) for row in select.execute().fetchall()]
-
+
for x in xrange(ITERATIONS):
assert len(fulltable(Zoo.select())) == 5
assert len(fulltable(Animal.select())) == ITERATIONS + 12
assert len(fulltable(Animal.select(Animal.c.Lifespan > 70))) == 2
assert len(fulltable(Animal.select(Animal.c.Species.startswith(u'L')))) == 2
assert len(fulltable(Animal.select(Animal.c.Species.endswith(u'pede')))) == 2
-
+
assert len(fulltable(Animal.select(Animal.c.LastEscape != None))) == 1
assert len(fulltable(Animal.select(None == Animal.c.LastEscape
))) == ITERATIONS + 11
-
+
# In operator (containedby)
assert len(fulltable(Animal.select(Animal.c.Species.like(u'%pede%')))) == 2
assert len(fulltable(Animal.select(Animal.c.Species.in_([u'Lion', u'Tiger', u'Bear'])))) == 3
-
+
# Try In with cell references
class thing(object): pass
pet, pet2 = thing(), thing()
pet.Name, pet2.Name =u'Slug', u'Ostrich'
assert len(fulltable(Animal.select(Animal.c.Species.in_([pet.Name, pet2.Name])))) == 2
-
+
# logic and other functions
assert len(fulltable(Animal.select(Animal.c.Species.like(u'Slug')))) == 1
assert len(fulltable(Animal.select(Animal.c.Species.like(u'%pede%')))) == 2
name =u'Lion'
assert len(fulltable(Animal.select(func.length(Animal.c.Species) == len(name)
))) == ITERATIONS + 3
-
+
assert len(fulltable(Animal.select(Animal.c.Species.like(u'%i%')
))) == ITERATIONS + 7
-
+
# Test now(), today(), year(), month(), day()
assert len(fulltable(Zoo.select(Zoo.c.Founded != None
and Zoo.c.Founded < func.current_timestamp(_type=Date)))) == 3
assert len(fulltable(Animal.select(func.date_part('year', Animal.c.LastEscape) == 2004))) == 1
assert len(fulltable(Animal.select(func.date_part('month', Animal.c.LastEscape) == 12))) == 1
assert len(fulltable(Animal.select(func.date_part('day', Animal.c.LastEscape) == 21))) == 1
-
- @testing.supported('postgres')
+
@profiling.profiled('aggregates', call_range=(800, 1170), always=True)
def test_5_aggregates(self):
Animal = metadata.tables['Animal']
Zoo = metadata.tables['Zoo']
-
+
for x in xrange(ITERATIONS):
# views
view = select([Animal.c.Legs]).execute().fetchall()
legs = [x[0] for x in view]
legs.sort()
-
+
expected = {'Leopard': 73.5,
'Slug': .75,
'Tiger': None,
for species, lifespan in select([Animal.c.Species, Animal.c.Lifespan]
).execute().fetchall():
assert lifespan == expected[species]
-
+
expected = [u'Montr\xe9al Biod\xf4me', 'Wild Animal Park']
e = select([Zoo.c.Name],
and_(Zoo.c.Founded != None,
Zoo.c.Founded >= datetime.date(1990, 1, 1)))
values = [val[0] for val in e.execute().fetchall()]
assert set(values) == set(expected)
-
+
# distinct
legs = [x[0] for x in
select([Animal.c.Legs], distinct=True).execute().fetchall()]
legs.sort()
-
- @testing.supported('postgres')
+
@profiling.profiled('editing', call_range=(1050, 1180), always=True)
def test_6_editing(self):
Zoo = metadata.tables['Zoo']
-
+
for x in xrange(ITERATIONS):
# Edit
SDZ = Zoo.select(Zoo.c.Name==u'San Diego Zoo').execute().fetchone()
Founded = datetime.date(1900, 1, 1),
Opens = datetime.time(7, 30, 0),
Admission = "35.00")
-
+
# Test edits
SDZ = Zoo.select(Zoo.c.Name==u'The San Diego Zoo').execute().fetchone()
assert SDZ['Founded'] == datetime.date(1900, 1, 1), SDZ['Founded']
-
+
# Change it back
Zoo.update(Zoo.c.ID==SDZ['ID']).execute(
Name =u'San Diego Zoo',
Founded = datetime.date(1935, 9, 13),
Opens = datetime.time(9, 0, 0),
Admission = "0")
-
+
# Test re-edits
SDZ = Zoo.select(Zoo.c.Name==u'San Diego Zoo').execute().fetchone()
assert SDZ['Founded'] == datetime.date(1935, 9, 13)
-
- @testing.supported('postgres')
+
@profiling.profiled('multiview', call_range=(1900, 2300), always=True)
def test_7_multiview(self):
Zoo = metadata.tables['Zoo']
Animal = metadata.tables['Animal']
-
+
def fulltable(select):
"""Iterate over the full result table."""
return [list(row) for row in select.execute().fetchall()]
-
+
for x in xrange(ITERATIONS):
za = fulltable(select([Zoo.c.ID] + list(Animal.c),
Zoo.c.Name ==u'San Diego Zoo',
from_obj = [join(Zoo, Animal)]))
-
+
SDZ = Zoo.select(Zoo.c.Name==u'San Diego Zoo')
-
+
e = fulltable(select([Zoo.c.ID, Animal.c.ID],
and_(Zoo.c.Name==u'San Diego Zoo',
Animal.c.Species==u'Leopard'),
from_obj = [join(Zoo, Animal)]))
-
+
# Now try the same query with INNER, LEFT, and RIGHT JOINs.
e = fulltable(select([Zoo.c.Name, Animal.c.Species],
from_obj=[join(Zoo, Animal)]))
e = fulltable(select([Zoo.c.Name, Animal.c.Species],
from_obj=[outerjoin(Animal, Zoo)]))
- @testing.supported('postgres')
def test_8_drop(self):
metadata.drop_all()
from sqlalchemy.orm import mapper, create_session
from testlib import *
+
class DefaultTest(PersistTest):
def setUpAll(self):
db = testbase.db
metadata = MetaData(db)
default_generator = {'x':50}
-
+
def mydefault():
default_generator['x'] += 1
return default_generator['x']
def myupdate_with_ctx(ctx):
conn = ctx.connection
return conn.execute(select([text('13')])).scalar()
-
+
def mydefault_using_connection(ctx):
conn = ctx.connection
try:
# ensure a "close()" on this connection does nothing,
# since its a "branched" connection
conn.close()
-
+
use_function_defaults = testing.against('postgres', 'oracle')
is_oracle = testing.against('oracle')
-
+
# select "count(1)" returns different results on different DBs
# also correct for "current_date" compatible as column default, value differences
currenttime = func.current_date(type_=Date, bind=db)
def1 = def2 = "3"
ts = 3
deftype = Integer
-
+
t = Table('default_test1', metadata,
# python function
Column('col1', Integer, primary_key=True, default=mydefault),
-
+
# python literal
Column('col2', String(20), default="imthedefault", onupdate="im the update"),
-
+
# preexecute expression
Column('col3', Integer, default=func.length('abcdef'), onupdate=func.length('abcdefghijk')),
-
+
# SQL-side default from sql expression
Column('col4', deftype, PassiveDefault(def1)),
-
+
# SQL-side default from literal expression
Column('col5', deftype, PassiveDefault(def2)),
-
+
# preexecute + update timestamp
Column('col6', Date, default=currenttime, onupdate=currenttime),
-
+
Column('boolcol1', Boolean, default=True),
Column('boolcol2', Boolean, default=False),
-
+
# python function which uses ExecutionContext
Column('col7', Integer, default=mydefault_using_connection, onupdate=myupdate_with_ctx),
-
+
# python builtin
Column('col8', Date, default=datetime.date.today, onupdate=datetime.date.today)
)
def tearDownAll(self):
t.drop()
-
+
def tearDown(self):
default_generator['x'] = 50
t.delete().execute()
-
+
def testargsignature(self):
ex_msg = \
"ColumnDefault Python function takes zero or one positional arguments"
for fn in fn3, fn4, fn5, fn6, fn7:
c = ColumnDefault(fn)
-
+
def teststandalone(self):
c = testbase.db.engine.contextual_connect()
x = c.execute(t.c.col1.default)
self.assert_(y == 'imthedefault')
self.assert_(z == f)
self.assert_(f2==11)
-
+
def testinsert(self):
r = t.insert().execute()
assert r.lastrow_has_defaults()
r = t.insert(inline=True).execute()
assert r.lastrow_has_defaults()
assert util.Set(r.context.postfetch_cols) == util.Set([t.c.col3, t.c.col5, t.c.col4, t.c.col6])
-
+
t.insert().execute()
t.insert().execute()
l = t.select().execute()
today = datetime.date.today()
self.assert_(l.fetchall() == [
- (51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),
- (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),
+ (51, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),
+ (52, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),
(53, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),
(54, 'imthedefault', f, ts, ts, ctexec, True, False, 12, today),
])
t.insert(values={'col3':50}).execute()
l = t.select().execute()
self.assert_(l.fetchone()['col3'] == 50)
-
+
def testupdatemany(self):
# MySQL-Python 1.2.2 breaks functions in execute_many :(
if (testing.against('mysql') and
t.update(t.c.col1==bindparam('pkval')).execute(
{'pkval':51,'col7':None, 'col8':None, 'boolcol1':False},
)
-
-
+
t.update(t.c.col1==bindparam('pkval')).execute(
{'pkval':51,},
{'pkval':52,},
ctexec = currenttime.scalar()
today = datetime.date.today()
self.assert_(l.fetchall() == [(51, 'im the update', f2, ts, ts, ctexec, False, False, 13, today), (52, 'im the update', f2, ts, ts, ctexec, True, False, 13, today), (53, 'im the update', f2, ts, ts, ctexec, True, False, 13, today)])
-
-
+
def testupdate(self):
r = t.insert().execute()
pk = r.last_inserted_ids()[0]
l = l.fetchone()
self.assert_(l == (pk, 'im the update', f2, None, None, ctexec, True, False, 13, datetime.date.today()))
self.assert_(f2==11)
-
+
def testupdatevalues(self):
r = t.insert().execute()
pk = r.last_inserted_ids()[0]
l = l.fetchone()
self.assert_(l['col3'] == 55)
- @testing.supported('postgres')
+ @testing.fails_on_everything_except('postgres')
def testpassiveoverride(self):
- """primarily for postgres, tests that when we get a primary key column back
+ """primarily for postgres, tests that when we get a primary key column back
from reflecting a table which has a default value on it, we pre-execute
- that PassiveDefault upon insert, even though PassiveDefault says
+ that PassiveDefault upon insert, even though PassiveDefault says
"let the database execute this", because in postgres we must have all the primary
key values in memory before insert; otherwise we cant locate the just inserted row."""
class PKDefaultTest(PersistTest):
def setUpAll(self):
global metadata, t1, t2
-
+
metadata = MetaData(testbase.db)
-
- t2 = Table('t2', metadata,
+
+ t2 = Table('t2', metadata,
Column('nextid', Integer))
-
+
t1 = Table('t1', metadata,
Column('id', Integer, primary_key=True, default=select([func.max(t2.c.nextid)]).as_scalar()),
Column('data', String(30)))
-
+
metadata.create_all()
-
+
def tearDownAll(self):
metadata.drop_all()
-
+
@testing.unsupported('mssql')
def test_basic(self):
t2.insert().execute(nextid=1)
t2.insert().execute(nextid=2)
r = t1.insert().execute(data='there')
assert r.last_inserted_ids() == [2]
-
-
+
+
class AutoIncrementTest(PersistTest):
def setUp(self):
global aitable, aimeta
-
+
aimeta = MetaData(testbase.db)
- aitable = Table("aitest", aimeta,
+ aitable = Table("aitest", aimeta,
Column('id', Integer, Sequence('ai_id_seq', optional=True),
primary_key=True),
Column('int1', Integer),
def tearDown(self):
aimeta.drop_all()
- @testing.supported('postgres', 'mysql', 'maxdb')
+ # should fail everywhere... was: @supported('postgres', 'mysql', 'maxdb')
+ @testing.fails_on('sqlite')
def testnonautoincrement(self):
+ # sqlite INT primary keys can be non-unique! (only for ints)
meta = MetaData(testbase.db)
- nonai_table = Table("nonaitest", meta,
+ nonai_table = Table("nonaitest", meta,
Column('id', Integer, autoincrement=False, primary_key=True),
Column('data', String(20)))
nonai_table.create(checkfirst=True)
try:
try:
- # postgres will fail on first row, mysql fails on second row
+ # postgres + mysql strict will fail on first row,
+ # mysql in legacy mode fails on second row
nonai_table.insert().execute(data='row 1')
nonai_table.insert().execute(data='row 2')
assert False
nonai_table.insert().execute(id=1, data='row 1')
finally:
- nonai_table.drop()
+ nonai_table.drop()
# TODO: add coverage for increment on a secondary column in a key
def _test_autoincrement(self, bind):
def test_autoincrement_fk(self):
if not testbase.db.dialect.supports_pk_autoincrement:
return True
-
+
metadata = MetaData(testbase.db)
# No optional sequence here.
metadata.drop_all()
-
class SequenceTest(PersistTest):
- @testing.supported('postgres', 'oracle', 'maxdb')
+ __unsupported_on__ = ('sqlite', 'mysql', 'mssql', 'firebird',
+ 'sybase', 'access')
+
def setUpAll(self):
global cartitems, sometable, metadata
metadata = MetaData(testbase.db)
- cartitems = Table("cartitems", metadata,
+ cartitems = Table("cartitems", metadata,
Column("cart_id", Integer, Sequence('cart_id_seq'), primary_key=True),
Column("description", String(40)),
Column("createdate", DateTime())
sometable = Table( 'Manager', metadata,
Column('obj_id', Integer, Sequence('obj_id_seq'), ),
Column('name', String, ),
- Column('id', Integer, Sequence('Manager_id_seq', optional=True), primary_key=True),
+ Column('id', Integer, Sequence('Manager_id_seq', optional=True),
+ primary_key=True),
)
-
+
metadata.create_all()
-
- @testing.supported('postgres', 'oracle', 'maxdb')
+
def testseqnonpk(self):
"""test sequences fire off as defaults on non-pk columns"""
(4, "name4", 4),
]
- @testing.supported('postgres', 'oracle', 'maxdb')
def testsequence(self):
cartitems.insert().execute(description='hi')
cartitems.insert().execute(description='there')
assert select([func.count(cartitems.c.cart_id)],
and_(cartitems.c.description == 'lala',
cartitems.c.cart_id == id_)).scalar() == 1
-
+
cartitems.select().execute().fetchall()
-
- @testing.supported('postgres', 'oracle')
+
+ @testing.fails_on('maxdb')
# maxdb db-api seems to double-execute NEXTVAL internally somewhere,
# throwing off the numbers for these tests...
def test_implicit_sequence_exec(self):
finally:
s.drop()
- @testing.supported('postgres', 'oracle')
+ @testing.fails_on('maxdb')
def teststandalone_explicit(self):
s = Sequence("my_sequence")
s.create(bind=testbase.db)
self.assert_(x == 1)
finally:
s.drop(testbase.db)
-
- @testing.supported('postgres', 'oracle', 'maxdb')
+
def test_checkfirst(self):
s = Sequence("my_sequence")
s.create(testbase.db, checkfirst=False)
s.create(testbase.db, checkfirst=True)
s.drop(testbase.db, checkfirst=False)
s.drop(testbase.db, checkfirst=True)
-
- @testing.supported('postgres', 'oracle')
+
+ @testing.fails_on('maxdb')
def teststandalone2(self):
x = cartitems.c.cart_id.sequence.execute()
self.assert_(1 <= x <= 4)
-
- @testing.supported('postgres', 'oracle', 'maxdb')
- def tearDownAll(self):
+
+ def tearDownAll(self):
metadata.drop_all()
+
if __name__ == "__main__":
testbase.main()
import testbase
import datetime
from sqlalchemy import *
-from sqlalchemy import exceptions, sql
+from sqlalchemy import databases, exceptions, sql
from sqlalchemy.sql.compiler import BIND_TEMPLATES
from sqlalchemy.engine import default
from sqlalchemy import types as sqltypes
from testlib import *
-# TODO: add a helper function to testlib for this
-from sqlalchemy.databases import sqlite, postgres, mysql, oracle, firebird, mssql
-dialects = [x.dialect() for x in [sqlite, postgres, mysql, oracle, firebird, mssql]]
+from sqlalchemy.databases import *
+# every dialect in databases.__all__ is expected to pass these tests.
+dialects = [getattr(databases, mod).dialect()
+ for mod in databases.__all__
+ # fixme!
+ if mod not in ('access',)]
+
+# if the configured dialect is out-of-tree or not yet in __all__, include it
+# too.
+if testbase.db.name not in databases.__all__:
+ dialects.append(testbase.db.dialect)
+
class CompileTest(SQLCompileTest):
def test_compile(self):
else:
self.assert_compile(func.nosuchfunction(), "nosuchfunction()", dialect=dialect)
self.assert_compile(func.char_length('foo'), "char_length(%s)" % bindtemplate % {'name':'param_1', 'position':1}, dialect=dialect)
-
+
def test_constructor(self):
try:
func.current_timestamp('somearg')
assert False
except TypeError:
assert True
-
+
def test_typing(self):
assert isinstance(func.coalesce(datetime.date(2007, 10, 5), datetime.date(2005, 10, 15)).type, sqltypes.Date)
assert isinstance(func.coalesce(None, datetime.date(2005, 10, 15)).type, sqltypes.Date)
-
+
assert isinstance(func.concat("foo", "bar").type, sqltypes.String)
-
+
class ExecuteTest(PersistTest):
def test_standalone_execute(self):
t2.update(values={t2.c.value:func.length("asfdaasdf"), t2.c.stuff:"foo"}).execute()
print "HI", select([t2.c.value, t2.c.stuff]).execute().fetchone()
assert select([t2.c.value, t2.c.stuff]).execute().fetchone() == (9, "foo")
-
finally:
meta.drop_all()
- @testing.supported('postgres')
+ @testing.fails_on_everything_except('postgres')
def test_as_from(self):
# TODO: shouldnt this work on oracle too ?
x = testbase.db.func.current_date().execute().scalar()
if __name__ == '__main__':
testbase.main()
-
\ No newline at end of file
r = users.select(offset=5, order_by=[users.c.user_id]).execute().fetchall()
self.assert_(r==[(6, 'ralph'), (7, 'fido')])
- @testing.supported('mssql')
- @testing.fails_on('maxdb')
- def test_select_limit_nooffset(self):
- try:
- r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall()
- assert False # InvalidRequestError should have been raised
- except exceptions.InvalidRequestError:
- pass
-
- @testing.unsupported('mysql')
+ @testing.exclude('mysql', '<', (5, 0, 0))
def test_scalar_select(self):
"""test that scalar subqueries with labels get their type propigated to the result set."""
- # mysql and/or mysqldb has a bug here, type isnt propigated for scalar subquery.
+ # mysql and/or mysqldb has a bug here, type isn't propagated for scalar
+ # subquery.
datetable = Table('datetable', metadata,
Column('id', Integer, primary_key=True),
Column('today', DateTime))
finally:
shadowed.drop(checkfirst=True)
- @testing.supported('mssql')
- def test_fetchid_trigger(self):
- meta = MetaData(testbase.db)
- t1 = Table('t1', meta,
- Column('id', Integer, Sequence('fred', 100, 1), primary_key=True),
- Column('descr', String(200)))
- t2 = Table('t2', meta,
- Column('id', Integer, Sequence('fred', 200, 1), primary_key=True),
- Column('descr', String(200)))
- meta.create_all()
- con = testbase.db.connect()
- con.execute("""create trigger paj on t1 for insert as
- insert into t2 (descr) select descr from inserted""")
-
- try:
- tr = con.begin()
- r = con.execute(t2.insert(), descr='hello')
- self.assert_(r.last_inserted_ids() == [200])
- r = con.execute(t1.insert(), descr='hello')
- self.assert_(r.last_inserted_ids() == [100])
-
- finally:
- tr.commit()
- con.execute("""drop trigger paj""")
- meta.drop_all()
-
- @testing.supported('mssql')
- def test_insertid_schema(self):
- meta = MetaData(testbase.db)
- con = testbase.db.connect()
- con.execute('create schema paj')
- tbl = Table('test', meta, Column('id', Integer, primary_key=True), schema='paj')
- tbl.create()
- try:
- tbl.insert().execute({'id':1})
- finally:
- tbl.drop()
- con.execute('drop schema paj')
-
- @testing.supported('mssql')
- def test_insertid_reserved(self):
- meta = MetaData(testbase.db)
- table = Table(
- 'select', meta,
- Column('col', Integer, primary_key=True)
- )
- table.create()
-
- meta2 = MetaData(testbase.db)
- try:
- table.insert().execute(col=7)
- finally:
- table.drop()
-
@testing.fails_on('maxdb')
def test_in_filtering(self):
"""test the behavior of the in_() function."""
'between': lambda val, pair: val >= pair[0] and val <= pair[1],
}
-def unsupported(*dbs):
- """Mark a test as unsupported by one or more database implementations"""
+def fails_on(*dbs):
+ """Mark a test as expected to fail on one or more database implementations.
+
+ Unlike ``unsupported``, tests marked as ``fails_on`` will be run
+ for the named databases. The test is expected to fail and the unit test
+ logic is inverted: if the test fails, a success is reported. If the test
+ succeeds, a failure is reported.
+ """
def decorate(fn):
fn_name = fn.__name__
def maybe(*args, **kw):
- if config.db.name in dbs:
- print "'%s' unsupported on DB implementation '%s'" % (
- fn_name, config.db.name)
- return True
- else:
+ if config.db.name not in dbs:
return fn(*args, **kw)
+ else:
+ try:
+ fn(*args, **kw)
+ except Exception, ex:
+ print ("'%s' failed as expected on DB implementation "
+ "'%s': %s" % (
+ fn_name, config.db.name, str(ex)))
+ return True
+ else:
+ raise AssertionError(
+ "Unexpected success for '%s' on DB implementation '%s'" %
+ (fn_name, config.db.name))
try:
maybe.__name__ = fn_name
except:
return maybe
return decorate
-def fails_on(*dbs):
- """Mark a test as expected to fail on one or more database implementations.
+def fails_on_everything_except(*dbs):
+ """Mark a test as expected to fail on most database implementations.
- Unlike ``unsupported``, tests marked as ``fails_on`` will be run
- for the named databases. The test is expected to fail and the unit test
- logic is inverted: if the test fails, a success is reported. If the test
- succeeds, a failure is reported.
+ Like ``fails_on``, except failure is the expected outcome on all
+ databases except those listed.
"""
def decorate(fn):
fn_name = fn.__name__
def maybe(*args, **kw):
- if config.db.name not in dbs:
+ if config.db.name in dbs:
return fn(*args, **kw)
else:
try:
return maybe
return decorate
-def supported(*dbs):
- """Mark a test as supported by one or more database implementations"""
+def unsupported(*dbs):
+ """Mark a test as unsupported by one or more database implementations.
+
+ 'unsupported' tests will be skipped unconditionally. Useful for feature
+ tests that cause deadlocks or other fatal problems.
+ """
def decorate(fn):
fn_name = fn.__name__
def maybe(*args, **kw):
if config.db.name in dbs:
- return fn(*args, **kw)
- else:
print "'%s' unsupported on DB implementation '%s'" % (
fn_name, config.db.name)
return True
+ else:
+ return fn(*args, **kw)
try:
maybe.__name__ = fn_name
except:
def exclude(db, op, spec):
"""Mark a test as unsupported by specific database server versions.
- Stackable, both with other excludes and supported/unsupported. Examples::
+ Stackable, both with other excludes and other decorators. Examples::
# Not supported by mydb versions less than 1, 0
@exclude('mydb', '<', (1,0))
# Other operators work too
def decorate(fn):
fn_name = fn.__name__
def maybe(*args, **kw):
- if config.db.name != db:
- return fn(*args, **kw)
-
- have = config.db.dialect.server_version_info(
- config.db.contextual_connect())
-
- oper = hasattr(op, '__call__') and op or _ops[op]
-
- if oper(have, spec):
+ if _is_excluded(db, op, spec):
print "'%s' unsupported on DB %s version '%s'" % (
- fn_name, config.db.name, have)
+ fn_name, config.db.name, _server_version())
return True
else:
return fn(*args, **kw)
return maybe
return decorate
+def _is_excluded(db, op, spec):
+ """Return True if the configured db matches an exclusion specification.
+
+ db:
+ A dialect name
+ op:
+ An operator or stringified operator, such as '=='
+ spec:
+ A value that will be compared to the dialect's server_version_info
+ using the supplied operator.
+
+ Examples::
+ # Not supported by mydb versions less than 1, 0
+ _is_excluded('mydb', '<', (1,0))
+ # Other operators work too
+ _is_excluded('bigdb', '==', (9,0,9))
+ _is_excluded('yikesdb', 'in', ((0, 3, 'alpha2'), (0, 3, 'alpha3')))
+ """
+
+ if config.db.name != db:
+ return False
+
+ version = _server_version()
+
+ oper = hasattr(op, '__call__') and op or _ops[op]
+ return oper(version, spec)
+
+def _server_version(bind=None):
+ """Return a server_version_info tuple."""
+
+ if bind is None:
+ bind = config.db
+ return bind.dialect.server_version_info(bind.contextual_connect())
+
+
def against(*queries):
"""Boolean predicate, compares to testing database configuration.
return query
class PersistTest(unittest.TestCase):
+ # A sequence of dialect names to exclude from the test class.
+ __unsupported_on__ = ()
+
+ # If present, test class is only runnable for the *single* specified
+ # dialect. If you need multiple, use __unsupported_on__ and invert.
+ __only_on__ = None
def __init__(self, *args, **params):
unittest.TestCase.__init__(self, *args, **params)
_otest_metadata = MetaData(config.db)
else:
_otest_metadata = self.metadata
- _otest_metadata.bind = config.db
+ if self.metadata.bind is None:
+ _otest_metadata.bind = config.db
self.define_tables(_otest_metadata)
_otest_metadata.create_all()
self.insert_data()
return self(result)
def __call__(self, result):
- try:
- if self._initTest is not None:
- self._initTest.setUpAll()
- except:
- # skip tests if global setup fails
- ex = self.__exc_info()
- for test in self._tests:
- result.addError(test, ex)
- return False
+ init = getattr(self, '_initTest', None)
+ if init is not None:
+ if (hasattr(init, '__unsupported_on__') and
+ config.db.name in init.__unsupported_on__):
+ print "'%s' unsupported on DB implementation '%s'" % (
+ init.__class__.__name__, config.db.name)
+ return True
+ if (getattr(init, '__only_on__', None) not in (None,config.db.name)):
+ print "'%s' unsupported on DB implementation '%s'" % (
+ init.__class__.__name__, config.db.name)
+ return True
+ for rule in getattr(init, '__excluded_on__', ()):
+ if _is_excluded(*rule):
+ print "'%s' unsupported on DB %s version %s" % (
+ init.__class__.__name__, config.db.name,
+ _server_version())
+ return True
+ try:
+ init.setUpAll()
+ except:
+ # skip tests if global setup fails
+ ex = self.__exc_info()
+ for test in self._tests:
+ result.addError(test, ex)
+ return False
try:
return self.do_run(result)
finally:
try:
- if self._initTest is not None:
- self._initTest.tearDownAll()
+ if init is not None:
+ init.tearDownAll()
except:
- result.addError(self._initTest, self.__exc_info())
+ result.addError(init, self.__exc_info())
pass
def __exc_info(self):