from sqlalchemy import testing
from .. import config
+from sqlalchemy.schema import DDL
+from sqlalchemy import event
+
metadata, users = None, None
False)
-def createTables(meta, schema=None):
- if schema:
- schema_prefix = schema + "."
- else:
- schema_prefix = ""
-
- users = Table('users', meta,
- Column('user_id', sa.INT, primary_key=True),
- Column('user_name', sa.VARCHAR(20), nullable=False),
- Column('test1', sa.CHAR(5), nullable=False),
- Column('test2', sa.Float(5), nullable=False),
- Column('test3', sa.Text),
- Column('test4', sa.Numeric(10, 2), nullable=False),
- Column('test5', sa.Date),
- Column('test5_1', sa.TIMESTAMP),
- Column('parent_user_id', sa.Integer,
- sa.ForeignKey('%susers.user_id' % schema_prefix)),
- Column('test6', sa.Date, nullable=False),
- Column('test7', sa.Text),
- Column('test8', sa.LargeBinary),
- Column('test_passivedefault2', sa.Integer, server_default='5'),
- Column('test9', sa.LargeBinary(100)),
- Column('test10', sa.Numeric(10, 2)),
- schema=schema,
- test_needs_fk=True,
- )
- dingalings = Table("dingalings", meta,
- Column('dingaling_id', sa.Integer, primary_key=True),
- Column('address_id', sa.Integer,
- sa.ForeignKey('%semail_addresses.address_id' % schema_prefix)),
- Column('data', sa.String(30)),
- schema=schema,
- test_needs_fk=True,
- )
- addresses = Table('email_addresses', meta,
- Column('address_id', sa.Integer),
- Column('remote_user_id', sa.Integer,
- sa.ForeignKey(users.c.user_id)),
- Column('email_address', sa.String(20)),
- sa.PrimaryKeyConstraint('address_id', name='email_ad_pk'),
- schema=schema,
- test_needs_fk=True,
- )
-
- return (users, addresses, dingalings)
-
-def createIndexes(con, schema=None):
- fullname = 'users'
- if schema:
- fullname = "%s.%s" % (schema, 'users')
- query = "CREATE INDEX users_t_idx ON %s (test1, test2)" % fullname
- con.execute(sa.sql.text(query))
-
-@testing.requires.views
-def _create_views(con, schema=None):
- for table_name in ('users', 'email_addresses'):
- fullname = table_name
- if schema:
- fullname = "%s.%s" % (schema, table_name)
- view_name = fullname + '_v'
- query = "CREATE VIEW %s AS SELECT * FROM %s" % (view_name,
- fullname)
- con.execute(sa.sql.text(query))
-
-@testing.requires.views
-def _drop_views(con, schema=None):
- for table_name in ('email_addresses', 'users'):
- fullname = table_name
- if schema:
- fullname = "%s.%s" % (schema, table_name)
- view_name = fullname + '_v'
- query = "DROP VIEW %s" % view_name
- con.execute(sa.sql.text(query))
-
-class ComponentReflectionTest(fixtures.TestBase):
- @testing.requires.schemas
+class ComponentReflectionTest(fixtures.TablesTest):
+
+ @classmethod
+ def define_tables(cls, metadata):
+ if testing.requires.schemas.enabled:
+ to_build = [
+ (None, ""),
+ ("test_schema", "test_schema."),
+ ]
+ else:
+ to_build = [(None, "")]
+
+
+ for schema, schema_prefix in to_build:
+ users = Table('users', metadata,
+ Column('user_id', sa.INT, primary_key=True),
+ Column('test1', sa.CHAR(5), nullable=False),
+ Column('test2', sa.Float(5), nullable=False),
+ Column('parent_user_id', sa.Integer,
+ sa.ForeignKey('%susers.user_id' % schema_prefix)),
+ schema=schema,
+ test_needs_fk=True,
+ )
+ Table("dingalings", metadata,
+ Column('dingaling_id', sa.Integer, primary_key=True),
+ Column('address_id', sa.Integer,
+ sa.ForeignKey('%semail_addresses.address_id' %
+ schema_prefix)),
+ Column('data', sa.String(30)),
+ schema=schema,
+ test_needs_fk=True,
+ )
+ Table('email_addresses', metadata,
+ Column('address_id', sa.Integer),
+ Column('remote_user_id', sa.Integer,
+ sa.ForeignKey(users.c.user_id)),
+ Column('email_address', sa.String(20)),
+ sa.PrimaryKeyConstraint('address_id', name='email_ad_pk'),
+ schema=schema,
+ test_needs_fk=True,
+ )
+
+ fullname = 'users'
+ if schema:
+ fullname = "%s.%s" % (schema, 'users')
+ event.listen(
+ metadata,
+ "after_create",
+ DDL("CREATE INDEX users_t_idx ON %s (test1, test2)" % fullname)
+ )
+
+ for table_name in ('users', 'email_addresses'):
+ fullname = table_name
+ if schema:
+ fullname = "%s.%s" % (schema, table_name)
+ view_name = fullname + '_v'
+ query = "CREATE VIEW %s AS SELECT * FROM %s" % (view_name,
+ fullname)
+ event.listen(
+ metadata,
+ "after_create",
+ DDL(query)
+ )
+ event.listen(
+ metadata,
+ "before_drop",
+ DDL("DROP VIEW %s" % view_name)
+ )
+
+
+ @testing.requires.schema_reflection
def test_get_schema_names(self):
insp = inspect(testing.db)
def _test_get_table_names(self, schema=None, table_type='table',
order_by=None):
meta = self.metadata
- users, addresses, dingalings = createTables(meta, schema)
- meta.create_all()
- _create_views(meta.bind, schema)
- try:
- insp = inspect(meta.bind)
- if table_type == 'view':
- table_names = insp.get_view_names(schema)
- table_names.sort()
- answer = ['email_addresses_v', 'users_v']
+ users, addresses, dingalings = self.tables.users, \
+ self.tables.email_addresses, self.tables.dingalings
+ insp = inspect(meta.bind)
+ if table_type == 'view':
+ table_names = insp.get_view_names(schema)
+ table_names.sort()
+ answer = ['email_addresses_v', 'users_v']
+ else:
+ table_names = insp.get_table_names(schema,
+ order_by=order_by)
+ if order_by == 'foreign_key':
+ answer = ['dingalings', 'email_addresses', 'users']
+ eq_(table_names, answer)
else:
- table_names = insp.get_table_names(schema,
- order_by=order_by)
- if order_by == 'foreign_key':
- answer = ['dingalings', 'email_addresses', 'users']
- eq_(table_names, answer)
- else:
- answer = ['dingalings', 'email_addresses', 'users']
- eq_(sorted(table_names), answer)
- finally:
- _drop_views(meta.bind, schema)
+ answer = ['dingalings', 'email_addresses', 'users']
+ eq_(sorted(table_names), answer)
+ @testing.requires.table_reflection
def test_get_table_names(self):
self._test_get_table_names()
+ @testing.requires.table_reflection
def test_get_table_names_fks(self):
self._test_get_table_names(order_by='foreign_key')
+ @testing.requires.table_reflection
@testing.requires.schemas
def test_get_table_names_with_schema(self):
self._test_get_table_names('test_schema')
- @testing.requires.views
+ @testing.requires.view_reflection
def test_get_view_names(self):
self._test_get_table_names(table_type='view')
+ @testing.requires.view_reflection
@testing.requires.schemas
def test_get_view_names_with_schema(self):
self._test_get_table_names('test_schema', table_type='view')
def _test_get_columns(self, schema=None, table_type='table'):
meta = MetaData(testing.db)
- users, addresses, dingalings = createTables(meta, schema)
+ users, addresses, dingalings = self.tables.users, \
+ self.tables.email_addresses, self.tables.dingalings
table_names = ['users', 'email_addresses']
- meta.create_all()
if table_type == 'view':
- _create_views(meta.bind, schema)
table_names = ['users_v', 'email_addresses_v']
- try:
- insp = inspect(meta.bind)
- for table_name, table in zip(table_names, (users,
- addresses)):
- schema_name = schema
- cols = insp.get_columns(table_name, schema=schema_name)
- self.assert_(len(cols) > 0, len(cols))
-
- # should be in order
-
- for i, col in enumerate(table.columns):
- eq_(col.name, cols[i]['name'])
- ctype = cols[i]['type'].__class__
- ctype_def = col.type
- if isinstance(ctype_def, sa.types.TypeEngine):
- ctype_def = ctype_def.__class__
-
- # Oracle returns Date for DateTime.
-
- if testing.against('oracle') and ctype_def \
- in (sql_types.Date, sql_types.DateTime):
- ctype_def = sql_types.Date
-
- # assert that the desired type and return type share
- # a base within one of the generic types.
-
- self.assert_(len(set(ctype.__mro__).
- intersection(ctype_def.__mro__).intersection([
- sql_types.Integer,
- sql_types.Numeric,
- sql_types.DateTime,
- sql_types.Date,
- sql_types.Time,
- sql_types.String,
- sql_types._Binary,
- ])) > 0, '%s(%s), %s(%s)' % (col.name,
- col.type, cols[i]['name'], ctype))
- finally:
- if table_type == 'view':
- _drop_views(meta.bind, schema)
- meta.drop_all()
+ insp = inspect(meta.bind)
+ for table_name, table in zip(table_names, (users,
+ addresses)):
+ schema_name = schema
+ cols = insp.get_columns(table_name, schema=schema_name)
+ self.assert_(len(cols) > 0, len(cols))
+
+ # should be in order
+
+ for i, col in enumerate(table.columns):
+ eq_(col.name, cols[i]['name'])
+ ctype = cols[i]['type'].__class__
+ ctype_def = col.type
+ if isinstance(ctype_def, sa.types.TypeEngine):
+ ctype_def = ctype_def.__class__
+
+ # Oracle returns Date for DateTime.
+
+ if testing.against('oracle') and ctype_def \
+ in (sql_types.Date, sql_types.DateTime):
+ ctype_def = sql_types.Date
+
+ # assert that the desired type and return type share
+ # a base within one of the generic types.
+
+ self.assert_(len(set(ctype.__mro__).
+ intersection(ctype_def.__mro__).intersection([
+ sql_types.Integer,
+ sql_types.Numeric,
+ sql_types.DateTime,
+ sql_types.Date,
+ sql_types.Time,
+ sql_types.String,
+ sql_types._Binary,
+ ])) > 0, '%s(%s), %s(%s)' % (col.name,
+ col.type, cols[i]['name'], ctype))
def test_get_columns(self):
self._test_get_columns()
+ @testing.requires.table_reflection
@testing.requires.schemas
def test_get_columns_with_schema(self):
self._test_get_columns(schema='test_schema')
- @testing.requires.views
+ @testing.requires.view_reflection
def test_get_view_columns(self):
self._test_get_columns(table_type='view')
- @testing.requires.views
+ @testing.requires.view_reflection
@testing.requires.schemas
def test_get_view_columns_with_schema(self):
self._test_get_columns(schema='test_schema', table_type='view')
@testing.provide_metadata
def _test_get_pk_constraint(self, schema=None):
meta = self.metadata
- users, addresses, _ = createTables(meta, schema)
- meta.create_all()
+ users, addresses = self.tables.users, self.tables.email_addresses
insp = inspect(meta.bind)
users_cons = insp.get_pk_constraint(users.name, schema=schema)
def test_get_pk_constraint(self):
self._test_get_pk_constraint()
+ @testing.requires.table_reflection
@testing.fails_on('sqlite', 'no schemas')
def test_get_pk_constraint_with_schema(self):
self._test_get_pk_constraint(schema='test_schema')
+ @testing.requires.table_reflection
@testing.provide_metadata
def test_deprecated_get_primary_keys(self):
meta = self.metadata
- users, _, _ = createTables(meta, schema=None)
- meta.create_all()
+ users = self.tables.users
insp = Inspector(meta.bind)
assert_raises_message(
sa_exc.SADeprecationWarning,
@testing.provide_metadata
def _test_get_foreign_keys(self, schema=None):
meta = self.metadata
- users, addresses, dingalings = createTables(meta, schema)
- meta.create_all()
+ users, addresses, dingalings = self.tables.users, \
+ self.tables.email_addresses, self.tables.dingalings
insp = inspect(meta.bind)
expected_schema = schema
# users
eq_(fkey1['referred_columns'], ['user_id', ])
eq_(fkey1['constrained_columns'], ['remote_user_id'])
+ @testing.requires.constraint_reflection
def test_get_foreign_keys(self):
self._test_get_foreign_keys()
+ @testing.requires.constraint_reflection
@testing.requires.schemas
def test_get_foreign_keys_with_schema(self):
self._test_get_foreign_keys(schema='test_schema')
@testing.provide_metadata
def _test_get_indexes(self, schema=None):
meta = self.metadata
- users, addresses, dingalings = createTables(meta, schema)
- meta.create_all()
- createIndexes(meta.bind, schema)
+ users, addresses, dingalings = self.tables.users, \
+ self.tables.email_addresses, self.tables.dingalings
# The database may decide to create indexes for foreign keys, etc.
# so there may be more indexes than expected.
insp = inspect(meta.bind)
for key in e_index:
eq_(e_index[key], index[key])
+ @testing.requires.index_reflection
def test_get_indexes(self):
self._test_get_indexes()
+ @testing.requires.index_reflection
@testing.requires.schemas
def test_get_indexes_with_schema(self):
self._test_get_indexes(schema='test_schema')
finally:
_drop_views(meta.bind, schema)
- @testing.requires.views
+ @testing.requires.view_reflection
def test_get_view_definition(self):
self._test_get_view_definition()
- @testing.requires.views
+ @testing.requires.view_reflection
@testing.requires.schemas
def test_get_view_definition_with_schema(self):
self._test_get_view_definition(schema='test_schema')