import unicodedata
import sqlalchemy as sa
-from sqlalchemy import exc as sa_exc
-from sqlalchemy import types as sql_types
from sqlalchemy import schema, events, event, inspect
from sqlalchemy import MetaData, Integer, String
-from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.testing import ComparesTables, \
engines, AssertsCompiledSQL, fixtures
from sqlalchemy.testing.schema import Table, Column
Column('test1', sa.CHAR(5), nullable=False),
Column('test2', sa.Float(5), nullable=False),
Column('test3', sa.Text),
- Column('test4', sa.Numeric(10, 2), nullable = False),
+ Column('test4', sa.Numeric(10, 2), nullable=False),
Column('test5', sa.Date),
Column('parent_user_id', sa.Integer,
sa.ForeignKey('engine_users.user_id')),
def test_extend_existing(self):
meta = self.metadata
- t1 = Table('t', meta,
+ Table('t', meta,
Column('id', Integer, primary_key=True),
Column('x', Integer),
Column('y', Integer),
establishes the FK not present in the DB.
"""
- a = Table('a', self.metadata, Column('id', Integer, primary_key=True))
- b = Table('b', self.metadata, Column('id', Integer, primary_key=True),
+ Table('a', self.metadata, Column('id', Integer, primary_key=True))
+ Table('b', self.metadata, Column('id', Integer, primary_key=True),
Column('a_id', Integer))
self.metadata.create_all()
the in-python one only.
"""
- a = Table('a', self.metadata, Column('id', Integer, primary_key=True))
- b = Table('b', self.metadata, Column('id', Integer, primary_key=True),
+ Table('a', self.metadata, Column('id', Integer, primary_key=True))
+ Table('b', self.metadata, Column('id', Integer, primary_key=True),
Column('a_id', Integer, sa.ForeignKey('a.id')))
self.metadata.create_all()
DB means the FK is skipped and doesn't get installed at all.
"""
- a = Table('a', self.metadata, Column('id', Integer, primary_key=True))
- b = Table('b', self.metadata, Column('id', Integer, primary_key=True),
+ Table('a', self.metadata, Column('id', Integer, primary_key=True))
+ Table('b', self.metadata, Column('id', Integer, primary_key=True),
Column('a_id', Integer, sa.ForeignKey('a.id')))
self.metadata.create_all()
@testing.provide_metadata
def test_autoload_replace_primary_key(self):
- a = Table('a', self.metadata, Column('id', Integer))
+ Table('a', self.metadata, Column('id', Integer))
self.metadata.create_all()
m2 = MetaData()
"""
meta = self.metadata
- t1 = Table('test', meta,
+ Table('test', meta,
Column('id', sa.Integer, primary_key=True),
Column('data', sa.String(50)),
mysql_engine='MyISAM'
)
- t2 = Table('test2', meta,
+ Table('test2', meta,
Column('id', sa.Integer, sa.ForeignKey('test.id'),
primary_key=True),
Column('id2', sa.Integer, primary_key=True),
a primary key column"""
meta = self.metadata
- users = Table('users', meta,
+ Table('users', meta,
Column('id', sa.Integer, primary_key=True),
Column('name', sa.String(30)))
- addresses = Table('addresses', meta,
+ Table('addresses', meta,
Column('id', sa.Integer, primary_key=True),
Column('street', sa.String(30)))
assert list(a2.primary_key) == [a2.c.id]
assert list(u2.primary_key) == [u2.c.id]
- assert u2.join(a2).onclause.compare(u2.c.id==a2.c.id)
+ assert u2.join(a2).onclause.compare(u2.c.id == a2.c.id)
meta3 = MetaData(testing.db)
u3 = Table('users', meta3, autoload=True)
assert list(a3.primary_key) == [a3.c.id]
assert list(u3.primary_key) == [u3.c.id]
- assert u3.join(a3).onclause.compare(u3.c.id==a3.c.id)
+ assert u3.join(a3).onclause.compare(u3.c.id == a3.c.id)
@testing.provide_metadata
def test_override_nonexistent_fk(self):
is common with MySQL MyISAM tables."""
meta = self.metadata
- users = Table('users', meta,
+ Table('users', meta,
Column('id', sa.Integer, primary_key=True),
Column('name', sa.String(30)))
- addresses = Table('addresses', meta,
+ Table('addresses', meta,
Column('id', sa.Integer, primary_key=True),
Column('street', sa.String(30)),
Column('user_id', sa.Integer))
meta.create_all()
meta2 = MetaData(testing.db)
a2 = Table('addresses', meta2,
- Column('user_id',sa.Integer, sa.ForeignKey('users.id')),
+ Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
autoload=True)
u2 = Table('users', meta2, autoload=True)
assert len(a2.c.user_id.foreign_keys) == 1
metadata = self.metadata
- a = Table('a',
+ Table('a',
metadata,
Column('x', sa.Integer, primary_key=True),
Column('y', sa.Integer, primary_key=True),
)
- b = Table('b',
+ Table('b',
metadata,
Column('x', sa.Integer, primary_key=True),
Column('y', sa.Integer, primary_key=True),
and that ForeignKey targeting during reflection still works."""
meta = self.metadata
- a1 = Table('a', meta,
+ Table('a', meta,
Column('x', sa.Integer, primary_key=True),
Column('z', sa.Integer),
test_needs_fk=True
)
- b1 = Table('b', meta,
+ Table('b', meta,
Column('y', sa.Integer, sa.ForeignKey('a.x')),
test_needs_fk=True
)
"""
meta = self.metadata
- a1 = Table('a', meta,
+ Table('a', meta,
Column('x', sa.Integer, primary_key=True),
Column('z', sa.Integer),
test_needs_fk=True
)
- b1 = Table('b', meta,
+ Table('b', meta,
Column('y', sa.Integer, sa.ForeignKey('a.x')),
test_needs_fk=True
)
have that foreign key, and that the FK is not duped. """
meta = self.metadata
- users = Table('users', meta,
+ Table('users', meta,
Column('id', sa.Integer, primary_key=True),
Column('name', sa.String(30)),
test_needs_fk=True)
- addresses = Table('addresses', meta,
+ Table('addresses', meta,
Column('id', sa.Integer, primary_key=True),
Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
test_needs_fk=True)
meta.create_all()
meta2 = MetaData(testing.db)
a2 = Table('addresses', meta2,
- Column('user_id',sa.Integer, sa.ForeignKey('users.id')),
+ Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
autoload=True)
u2 = Table('users', meta2, autoload=True)
s = sa.select([a2])
def test_fk_error(self):
metadata = MetaData(testing.db)
- slots_table = Table('slots', metadata,
+ Table('slots', metadata,
Column('slot_id', sa.Integer, primary_key=True),
Column('pkg_id', sa.Integer, sa.ForeignKey('pkgs.pkg_id')),
Column('slot', sa.String(128)),
check_col = 'true'
quoter = meta.bind.dialect.identifier_preparer.quote_identifier
- table_b = Table('false', meta,
+ Table('false', meta,
Column('create', sa.Integer, primary_key=True),
- Column('true', sa.Integer,sa.ForeignKey('select.not')),
+ Column('true', sa.Integer, sa.ForeignKey('select.not')),
sa.CheckConstraint('%s <> 1'
% quoter(check_col), name='limit')
)
meta.create_all()
index_c.drop()
meta2 = MetaData(testing.db)
- table_a2 = Table('select', meta2, autoload=True)
- table_b2 = Table('false', meta2, autoload=True)
- table_c2 = Table('is', meta2, autoload=True)
+ Table('select', meta2, autoload=True)
+ Table('false', meta2, autoload=True)
+ Table('is', meta2, autoload=True)
@testing.provide_metadata
def _test_reflect_uses_bind(self, fn):
def test_reflect_all(self):
existing = testing.db.table_names()
- names = ['rt_%s' % name for name in ('a','b','c','d','e')]
+ names = ['rt_%s' % name for name in ('a', 'b', 'c', 'd', 'e')]
nameset = set(names)
for name in names:
# be sure our starting environment is sane
assert not c.closed
def test_inspector_conn_closing(self):
- m1 = MetaData()
c = testing.db.connect()
- i = inspect(testing.db)
+ inspect(c)
assert not c.closed
@testing.provide_metadata
Column('id', sa.Integer, nullable=False),
Column('name', sa.String(20), index=True)
)
- i1 = sa.Index('idx1', t1.c.id, unique=True)
- i2 = sa.Index('idx2', t1.c.name, t1.c.id, unique=False)
+ sa.Index('idx1', t1.c.id, unique=True)
+ sa.Index('idx2', t1.c.name, t1.c.id, unique=False)
m1.create_all()
m2 = MetaData(testing.db)
t2 = Table('party', m2, autoload=True)
Column('user_id', sa.Integer,
sa.Sequence('user_id_seq', optional=True),
primary_key=True),
- Column('user_name',sa.String(40)))
+ Column('user_name', sa.String(40)))
- addresses = Table('email_addresses', metadata,
+ Table('email_addresses', metadata,
Column('address_id', sa.Integer,
sa.Sequence('address_id_seq', optional=True),
primary_key=True),
sa.Integer, sa.ForeignKey(users.c.user_id)),
Column('email_address', sa.String(40)))
- orders = Table(
+ Table(
'orders',
metadata,
Column('order_id', sa.Integer, sa.Sequence('order_id_seq',
Column('description', sa.String(50)),
Column('isopen', sa.Integer),
)
- orderitems = Table('items', metadata, Column('item_id', sa.INT,
+ Table('items', metadata,
+ Column('item_id', sa.INT,
sa.Sequence('items_id_seq', optional=True),
- primary_key=True), Column('order_id',
+ primary_key=True),
+ Column('order_id',
sa.INT, sa.ForeignKey('orders')),
Column('item_name', sa.VARCHAR(50)))
- def test_sorter( self ):
+ def test_sorter(self):
tables = metadata.sorted_tables
table_names = [t.name for t in tables]
ua = [n for n in table_names if n in ('users', 'email_addresses')]
def test_createdrop(self):
metadata.create_all(bind=testing.db)
- eq_( testing.db.has_table('items'), True )
- eq_( testing.db.has_table('email_addresses'), True )
+ eq_(testing.db.has_table('items'), True)
+ eq_(testing.db.has_table('email_addresses'), True)
metadata.create_all(bind=testing.db)
- eq_( testing.db.has_table('items'), True )
+ eq_(testing.db.has_table('items'), True)
metadata.drop_all(bind=testing.db)
- eq_( testing.db.has_table('items'), False )
- eq_( testing.db.has_table('email_addresses'), False )
+ eq_(testing.db.has_table('items'), False)
+ eq_(testing.db.has_table('email_addresses'), False)
metadata.drop_all(bind=testing.db)
- eq_( testing.db.has_table('items'), False )
+ eq_(testing.db.has_table('items'), False)
def test_tablenames(self):
metadata.create_all(bind=testing.db)
Column('id', sa.Integer),
Column('user_id', sa.Integer))
- fk = sa.ForeignKeyConstraint(['user_id'],[users.c.id])
+ fk = sa.ForeignKeyConstraint(['user_id'], [users.c.id])
addresses.append_constraint(fk)
addresses.append_constraint(fk)
testing.db.connect().close()
cls.bind = bind = engines.utf8_engine(
- options={'convert_unicode' : True})
+ options={'convert_unicode': True})
cls.metadata = metadata = MetaData()
names = no_multibyte_period
# mysql can't handle casing usually
elif testing.against("mysql") and \
- not testing.requires._has_mysql_fully_case_sensitive():
+ not testing.requires._has_mysql_fully_case_sensitive():
names = no_multibyte_period.union(no_case_sensitivity)
# mssql + pyodbc + freetds can't compare multibyte names to
# information_schema.tables.table_name
names = no_multibyte_period.union(full)
for tname, cname, ixname in names:
- t = Table(tname, metadata, Column('id', sa.Integer,
- sa.Sequence(cname + '_id_seq'), primary_key=True),
- Column(cname, Integer)
+ t = Table(tname, metadata,
+ Column('id', sa.Integer,
+ sa.Sequence(cname + '_id_seq'),
+ primary_key=True),
+ Column(cname, Integer)
)
schema.Index(ixname, t.c[cname])
assert bool(schema)
metadata = MetaData(engine)
- table1 = Table('table1', metadata,
+ Table('table1', metadata,
Column('col1', sa.Integer, primary_key=True),
test_needs_fk=True,
schema=schema)
- table2 = Table('table2', metadata,
+ Table('table2', metadata,
Column('col1', sa.Integer, primary_key=True),
Column('col2', sa.Integer,
sa.ForeignKey('%s.table1.col1' % schema)),
assert len(metadata.tables) == 2
metadata.clear()
- table1 = Table('table1', metadata, autoload=True, schema=schema)
- table2 = Table('table2', metadata, autoload=True, schema=schema)
+ Table('table1', metadata, autoload=True, schema=schema)
+ Table('table2', metadata, autoload=True, schema=schema)
assert len(metadata.tables) == 2
finally:
metadata.drop_all()
assert bool(schema)
metadata = MetaData(engine, schema=schema)
- table1 = Table('table1', metadata,
+ Table('table1', metadata,
Column('col1', sa.Integer, primary_key=True),
test_needs_fk=True)
- table2 = Table('table2', metadata,
+ Table('table2', metadata,
Column('col1', sa.Integer, primary_key=True),
Column('col2', sa.Integer,
sa.ForeignKey('table1.col1')),
assert len(metadata.tables) == 2
metadata.clear()
- table1 = Table('table1', metadata, autoload=True)
- table2 = Table('table2', metadata, autoload=True)
+ Table('table1', metadata, autoload=True)
+ Table('table2', metadata, autoload=True)
assert len(metadata.tables) == 2
finally:
metadata.drop_all()
Column('test1', sa.CHAR(5), nullable=False),
Column('test2', sa.Float(5), nullable=False),
Column('test3', sa.Text),
- Column('test4', sa.Numeric(10, 2), nullable = False),
+ Column('test4', sa.Numeric(10, 2), nullable=False),
Column('test5', sa.Date),
Column('test5_1', sa.TIMESTAMP),
Column('parent_user_id', sa.Integer,
test_needs_fk=True,
)
dingalings = Table("dingalings", meta,
- Column('dingaling_id', sa.Integer, primary_key=True),
- Column('address_id', sa.Integer,
- sa.ForeignKey('%semail_addresses.address_id' % schema_prefix)),
- Column('data', sa.String(30)),
- schema=schema,
- test_needs_fk=True,
- )
+ Column('dingaling_id', sa.Integer, primary_key=True),
+ Column('address_id', sa.Integer,
+ sa.ForeignKey('%semail_addresses.address_id' % schema_prefix)),
+ Column('data', sa.String(30)),
+ schema=schema, test_needs_fk=True,
+ )
addresses = Table('email_addresses', meta,
Column('address_id', sa.Integer),
Column('remote_user_id', sa.Integer,
def test_override_key(self):
self._do_test(
- "x", {"key":"YXZ"},
+ "x", {"key": "YXZ"},
lambda table: eq_(table.c.YXZ.name, "x")
)
def assert_(table):
assert isinstance(table.c.x.type, sa.String)
self._do_test(
- "x", {"type":sa.String},
+ "x", {"type": sa.String},
assert_
)
def test_override_info(self):
self._do_test(
- "x", {"info":{"a":"b"}},
- lambda table: eq_(table.c.x.info, {"a":"b"})
+ "x", {"info": {"a": "b"}},
+ lambda table: eq_(table.c.x.info, {"a": "b"})
)