]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
dev
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 29 Sep 2012 19:17:08 +0000 (15:17 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 29 Sep 2012 19:17:08 +0000 (15:17 -0400)
lib/sqlalchemy/dialects/sybase/base.py
lib/sqlalchemy/engine/base.py
lib/sqlalchemy/testing/requirements.py
lib/sqlalchemy/testing/suite/test_reflection.py
test/requirements.py

index f551bff9915a8696318b8bb9685ef3451ba06d3c..9a1a5bdf381e2898c302f486e67da71d81b75c54 100644 (file)
@@ -245,10 +245,10 @@ class SybaseExecutionContext(default.DefaultExecutionContext):
 
 
     def post_exec(self):
-       if self.isddl:
+        if self.isddl:
             self.set_ddl_autocommit(self.root_connection, False)
 
-       if self._enable_identity_insert:
+        if self._enable_identity_insert:
             self.cursor.execute(
                         "SET IDENTITY_INSERT %s OFF" %
                             self.dialect.identifier_preparer.
index 47594407f2c09bf0b37818c6b11b6d020580fcaa..e8b22fdc645a090bc221d7855e6a171059139b96 100644 (file)
@@ -914,8 +914,8 @@ class Connection(Connectable):
             for fn in self.dispatch.before_cursor_execute:
                 statement, parameters = \
                             fn(self, cursor, statement, parameters,
-                                        context, 
-                                        context.executemany 
+                                        context,
+                                        context.executemany
                                            if context is not None else False)
 
         if self._echo:
index ea824244810a0ba5c2acfc6d2e60823ffba7446a..c6492a84aa25b34ec5e274ea1471c7da710dc265 100644 (file)
@@ -41,7 +41,20 @@ class SuiteRequirements(Requirements):
     def returning(self):
         """target platform supports RETURNING."""
 
-        return exclusions.closed()
+        return exclusions.only_if(
+                lambda: self.config.db.dialect.implicit_returning,
+                "'returning' not supported by database"
+            )
+
+    @property
+    def denormalized_names(self):
+        """Target database must have 'denormalized', i.e.
+        UPPERCASE as case insensitive names."""
+
+        return exclusions.skip_if(
+                    lambda: not self.db.dialect.requires_name_normalize,
+                    "Backend does not require denormalized names."
+                )
 
     @property
     def dbapi_lastrowid(self):
@@ -76,3 +89,22 @@ class SuiteRequirements(Requirements):
     def reflects_pk_names(self):
         return exclusions.closed()
 
+    @property
+    def table_reflection(self):
+        return exclusions.open()
+
+    @property
+    def view_reflection(self):
+        return self.views
+
+    @property
+    def schema_reflection(self):
+        return self.schemas
+
+    @property
+    def constraint_reflection(self):
+        return exclusions.open()
+
+    @property
+    def index_reflection(self):
+        return exclusions.open()
index f816895a46f160b28a1bcbe1effdf265b875291c..59efd8bbc8df574a11b7ceebb8faafe57ea6fd8c 100644 (file)
@@ -11,6 +11,9 @@ from sqlalchemy.testing import eq_, assert_raises_message
 from sqlalchemy import testing
 from .. import config
 
+from sqlalchemy.schema import DDL
+from sqlalchemy import event
+
 metadata, users = None, None
 
 
@@ -67,83 +70,78 @@ class HasSequenceTest(fixtures.TestBase):
             False)
 
 
-def createTables(meta, schema=None):
-    if schema:
-        schema_prefix = schema + "."
-    else:
-        schema_prefix = ""
-
-    users = Table('users', meta,
-        Column('user_id', sa.INT, primary_key=True),
-        Column('user_name', sa.VARCHAR(20), nullable=False),
-        Column('test1', sa.CHAR(5), nullable=False),
-        Column('test2', sa.Float(5), nullable=False),
-        Column('test3', sa.Text),
-        Column('test4', sa.Numeric(10, 2), nullable=False),
-        Column('test5', sa.Date),
-        Column('test5_1', sa.TIMESTAMP),
-        Column('parent_user_id', sa.Integer,
-                    sa.ForeignKey('%susers.user_id' % schema_prefix)),
-        Column('test6', sa.Date, nullable=False),
-        Column('test7', sa.Text),
-        Column('test8', sa.LargeBinary),
-        Column('test_passivedefault2', sa.Integer, server_default='5'),
-        Column('test9', sa.LargeBinary(100)),
-        Column('test10', sa.Numeric(10, 2)),
-        schema=schema,
-        test_needs_fk=True,
-    )
-    dingalings = Table("dingalings", meta,
-              Column('dingaling_id', sa.Integer, primary_key=True),
-              Column('address_id', sa.Integer,
-                    sa.ForeignKey('%semail_addresses.address_id' % schema_prefix)),
-              Column('data', sa.String(30)),
-              schema=schema,
-              test_needs_fk=True,
-        )
-    addresses = Table('email_addresses', meta,
-        Column('address_id', sa.Integer),
-        Column('remote_user_id', sa.Integer,
-               sa.ForeignKey(users.c.user_id)),
-        Column('email_address', sa.String(20)),
-        sa.PrimaryKeyConstraint('address_id', name='email_ad_pk'),
-        schema=schema,
-        test_needs_fk=True,
-    )
-
-    return (users, addresses, dingalings)
-
-def createIndexes(con, schema=None):
-    fullname = 'users'
-    if schema:
-        fullname = "%s.%s" % (schema, 'users')
-    query = "CREATE INDEX users_t_idx ON %s (test1, test2)" % fullname
-    con.execute(sa.sql.text(query))
-
-@testing.requires.views
-def _create_views(con, schema=None):
-    for table_name in ('users', 'email_addresses'):
-        fullname = table_name
-        if schema:
-            fullname = "%s.%s" % (schema, table_name)
-        view_name = fullname + '_v'
-        query = "CREATE VIEW %s AS SELECT * FROM %s" % (view_name,
-                                                                   fullname)
-        con.execute(sa.sql.text(query))
-
-@testing.requires.views
-def _drop_views(con, schema=None):
-    for table_name in ('email_addresses', 'users'):
-        fullname = table_name
-        if schema:
-            fullname = "%s.%s" % (schema, table_name)
-        view_name = fullname + '_v'
-        query = "DROP VIEW %s" % view_name
-        con.execute(sa.sql.text(query))
-
-class ComponentReflectionTest(fixtures.TestBase):
 
-    @testing.requires.schemas
+class ComponentReflectionTest(fixtures.TablesTest):
+
+    @classmethod
+    def define_tables(cls, metadata):
+        if testing.requires.schemas.enabled:
+            to_build = [
+               (None, ""),
+                ("test_schema", "test_schema."),
+            ]
+        else:
+            to_build = [(None, "")]
+
+
+        for schema, schema_prefix in to_build:
+            users = Table('users', metadata,
+                Column('user_id', sa.INT, primary_key=True),
+                Column('test1', sa.CHAR(5), nullable=False),
+                Column('test2', sa.Float(5), nullable=False),
+                Column('parent_user_id', sa.Integer,
+                            sa.ForeignKey('%susers.user_id' % schema_prefix)),
+                schema=schema,
+                test_needs_fk=True,
+            )
+            Table("dingalings", metadata,
+                      Column('dingaling_id', sa.Integer, primary_key=True),
+                      Column('address_id', sa.Integer,
+                        sa.ForeignKey('%semail_addresses.address_id' %
+                                        schema_prefix)),
+                      Column('data', sa.String(30)),
+                      schema=schema,
+                      test_needs_fk=True,
+                )
+            Table('email_addresses', metadata,
+                Column('address_id', sa.Integer),
+                Column('remote_user_id', sa.Integer,
+                       sa.ForeignKey(users.c.user_id)),
+                Column('email_address', sa.String(20)),
+                sa.PrimaryKeyConstraint('address_id', name='email_ad_pk'),
+                schema=schema,
+                test_needs_fk=True,
+            )
+
+            fullname = 'users'
+            if schema:
+                fullname = "%s.%s" % (schema, 'users')
+            event.listen(
+                metadata,
+                "after_create",
+                DDL("CREATE INDEX users_t_idx ON %s (test1, test2)" % fullname)
+            )
+
+        for table_name in ('users', 'email_addresses'):
+            fullname = table_name
+            if schema:
+                fullname = "%s.%s" % (schema, table_name)
+            view_name = fullname + '_v'
+            query = "CREATE VIEW %s AS SELECT * FROM %s" % (view_name,
+                                                                       fullname)
+            event.listen(
+                metadata,
+                "after_create",
+                DDL(query)
+            )
+            event.listen(
+                metadata,
+                "before_drop",
+                DDL("DROP VIEW %s" % view_name)
+            )
+
+
+    @testing.requires.schema_reflection
     def test_get_schema_names(self):
         insp = inspect(testing.db)
 
@@ -163,107 +161,102 @@ class ComponentReflectionTest(fixtures.TestBase):
     def _test_get_table_names(self, schema=None, table_type='table',
                               order_by=None):
         meta = self.metadata
-        users, addresses, dingalings = createTables(meta, schema)
-        meta.create_all()
-        _create_views(meta.bind, schema)
-        try:
-            insp = inspect(meta.bind)
-            if table_type == 'view':
-                table_names = insp.get_view_names(schema)
-                table_names.sort()
-                answer = ['email_addresses_v', 'users_v']
+        users, addresses, dingalings = self.tables.users, \
+                self.tables.email_addresses, self.tables.dingalings
+        insp = inspect(meta.bind)
+        if table_type == 'view':
+            table_names = insp.get_view_names(schema)
+            table_names.sort()
+            answer = ['email_addresses_v', 'users_v']
+        else:
+            table_names = insp.get_table_names(schema,
+                                               order_by=order_by)
+            if order_by == 'foreign_key':
+                answer = ['dingalings', 'email_addresses', 'users']
+                eq_(table_names, answer)
             else:
-                table_names = insp.get_table_names(schema,
-                                                   order_by=order_by)
-                if order_by == 'foreign_key':
-                    answer = ['dingalings', 'email_addresses', 'users']
-                    eq_(table_names, answer)
-                else:
-                    answer = ['dingalings', 'email_addresses', 'users']
-                    eq_(sorted(table_names), answer)
-        finally:
-            _drop_views(meta.bind, schema)
+                answer = ['dingalings', 'email_addresses', 'users']
+                eq_(sorted(table_names), answer)
 
+    @testing.requires.table_reflection
     def test_get_table_names(self):
         self._test_get_table_names()
 
+    @testing.requires.table_reflection
     def test_get_table_names_fks(self):
         self._test_get_table_names(order_by='foreign_key')
 
+    @testing.requires.table_reflection
     @testing.requires.schemas
     def test_get_table_names_with_schema(self):
         self._test_get_table_names('test_schema')
 
-    @testing.requires.views
+    @testing.requires.view_reflection
     def test_get_view_names(self):
         self._test_get_table_names(table_type='view')
 
+    @testing.requires.view_reflection
     @testing.requires.schemas
     def test_get_view_names_with_schema(self):
         self._test_get_table_names('test_schema', table_type='view')
 
     def _test_get_columns(self, schema=None, table_type='table'):
         meta = MetaData(testing.db)
-        users, addresses, dingalings = createTables(meta, schema)
+        users, addresses, dingalings = self.tables.users, \
+                self.tables.email_addresses, self.tables.dingalings
         table_names = ['users', 'email_addresses']
-        meta.create_all()
         if table_type == 'view':
-            _create_views(meta.bind, schema)
             table_names = ['users_v', 'email_addresses_v']
-        try:
-            insp = inspect(meta.bind)
-            for table_name, table in zip(table_names, (users,
-                    addresses)):
-                schema_name = schema
-                cols = insp.get_columns(table_name, schema=schema_name)
-                self.assert_(len(cols) > 0, len(cols))
-
-                # should be in order
-
-                for i, col in enumerate(table.columns):
-                    eq_(col.name, cols[i]['name'])
-                    ctype = cols[i]['type'].__class__
-                    ctype_def = col.type
-                    if isinstance(ctype_def, sa.types.TypeEngine):
-                        ctype_def = ctype_def.__class__
-
-                    # Oracle returns Date for DateTime.
-
-                    if testing.against('oracle') and ctype_def \
-                        in (sql_types.Date, sql_types.DateTime):
-                        ctype_def = sql_types.Date
-
-                    # assert that the desired type and return type share
-                    # a base within one of the generic types.
-
-                    self.assert_(len(set(ctype.__mro__).
-                        intersection(ctype_def.__mro__).intersection([
-                        sql_types.Integer,
-                        sql_types.Numeric,
-                        sql_types.DateTime,
-                        sql_types.Date,
-                        sql_types.Time,
-                        sql_types.String,
-                        sql_types._Binary,
-                        ])) > 0, '%s(%s), %s(%s)' % (col.name,
-                                col.type, cols[i]['name'], ctype))
-        finally:
-            if table_type == 'view':
-                _drop_views(meta.bind, schema)
-            meta.drop_all()
+        insp = inspect(meta.bind)
+        for table_name, table in zip(table_names, (users,
+                addresses)):
+            schema_name = schema
+            cols = insp.get_columns(table_name, schema=schema_name)
+            self.assert_(len(cols) > 0, len(cols))
+
+            # should be in order
+
+            for i, col in enumerate(table.columns):
+                eq_(col.name, cols[i]['name'])
+                ctype = cols[i]['type'].__class__
+                ctype_def = col.type
+                if isinstance(ctype_def, sa.types.TypeEngine):
+                    ctype_def = ctype_def.__class__
+
+                # Oracle returns Date for DateTime.
+
+                if testing.against('oracle') and ctype_def \
+                    in (sql_types.Date, sql_types.DateTime):
+                    ctype_def = sql_types.Date
+
+                # assert that the desired type and return type share
+                # a base within one of the generic types.
+
+                self.assert_(len(set(ctype.__mro__).
+                    intersection(ctype_def.__mro__).intersection([
+                    sql_types.Integer,
+                    sql_types.Numeric,
+                    sql_types.DateTime,
+                    sql_types.Date,
+                    sql_types.Time,
+                    sql_types.String,
+                    sql_types._Binary,
+                    ])) > 0, '%s(%s), %s(%s)' % (col.name,
+                            col.type, cols[i]['name'], ctype))
 
     def test_get_columns(self):
         self._test_get_columns()
 
+    @testing.requires.table_reflection
     @testing.requires.schemas
     def test_get_columns_with_schema(self):
         self._test_get_columns(schema='test_schema')
 
-    @testing.requires.views
+    @testing.requires.view_reflection
     def test_get_view_columns(self):
         self._test_get_columns(table_type='view')
 
-    @testing.requires.views
+    @testing.requires.view_reflection
     @testing.requires.schemas
     def test_get_view_columns_with_schema(self):
         self._test_get_columns(schema='test_schema', table_type='view')
@@ -271,8 +264,7 @@ class ComponentReflectionTest(fixtures.TestBase):
     @testing.provide_metadata
     def _test_get_pk_constraint(self, schema=None):
         meta = self.metadata
-        users, addresses, _ = createTables(meta, schema)
-        meta.create_all()
+        users, addresses = self.tables.users, self.tables.email_addresses
         insp = inspect(meta.bind)
 
         users_cons = insp.get_pk_constraint(users.name, schema=schema)
@@ -291,15 +283,16 @@ class ComponentReflectionTest(fixtures.TestBase):
     def test_get_pk_constraint(self):
         self._test_get_pk_constraint()
 
+    @testing.requires.table_reflection
     @testing.fails_on('sqlite', 'no schemas')
     def test_get_pk_constraint_with_schema(self):
         self._test_get_pk_constraint(schema='test_schema')
 
+    @testing.requires.table_reflection
     @testing.provide_metadata
     def test_deprecated_get_primary_keys(self):
         meta = self.metadata
-        users, _, _ = createTables(meta, schema=None)
-        meta.create_all()
+        users = self.tables.users
         insp = Inspector(meta.bind)
         assert_raises_message(
             sa_exc.SADeprecationWarning,
@@ -311,8 +304,8 @@ class ComponentReflectionTest(fixtures.TestBase):
     @testing.provide_metadata
     def _test_get_foreign_keys(self, schema=None):
         meta = self.metadata
-        users, addresses, dingalings = createTables(meta, schema)
-        meta.create_all()
+        users, addresses, dingalings = self.tables.users, \
+                    self.tables.email_addresses, self.tables.dingalings
         insp = inspect(meta.bind)
         expected_schema = schema
         # users
@@ -342,9 +335,11 @@ class ComponentReflectionTest(fixtures.TestBase):
         eq_(fkey1['referred_columns'], ['user_id', ])
         eq_(fkey1['constrained_columns'], ['remote_user_id'])
 
+    @testing.requires.constraint_reflection
     def test_get_foreign_keys(self):
         self._test_get_foreign_keys()
 
+    @testing.requires.constraint_reflection
     @testing.requires.schemas
     def test_get_foreign_keys_with_schema(self):
         self._test_get_foreign_keys(schema='test_schema')
@@ -352,9 +347,8 @@ class ComponentReflectionTest(fixtures.TestBase):
     @testing.provide_metadata
     def _test_get_indexes(self, schema=None):
         meta = self.metadata
-        users, addresses, dingalings = createTables(meta, schema)
-        meta.create_all()
-        createIndexes(meta.bind, schema)
+        users, addresses, dingalings = self.tables.users, \
+                    self.tables.email_addresses, self.tables.dingalings
         # The database may decide to create indexes for foreign keys, etc.
         # so there may be more indexes than expected.
         insp = inspect(meta.bind)
@@ -370,9 +364,11 @@ class ComponentReflectionTest(fixtures.TestBase):
             for key in e_index:
                 eq_(e_index[key], index[key])
 
+    @testing.requires.index_reflection
     def test_get_indexes(self):
         self._test_get_indexes()
 
+    @testing.requires.index_reflection
     @testing.requires.schemas
     def test_get_indexes_with_schema(self):
         self._test_get_indexes(schema='test_schema')
@@ -394,11 +390,11 @@ class ComponentReflectionTest(fixtures.TestBase):
         finally:
             _drop_views(meta.bind, schema)
 
-    @testing.requires.views
+    @testing.requires.view_reflection
     def test_get_view_definition(self):
         self._test_get_view_definition()
 
-    @testing.requires.views
+    @testing.requires.view_reflection
     @testing.requires.schemas
     def test_get_view_definition_with_schema(self):
         self._test_get_view_definition(schema='test_schema')
index eee3106d76f1145a04fbf1830608128cbe281f5e..fb347187e51d22dbb66c33cb247834bc8ec7f481 100644 (file)
@@ -200,15 +200,6 @@ class DefaultRequirements(SuiteRequirements):
                     ("informix", "<", (11, 55, "xC3"))
                     ], "savepoints not supported")
 
-    @property
-    def denormalized_names(self):
-        """Target database must have 'denormalized', i.e.
-        UPPERCASE as case insensitive names."""
-
-        return skip_if(
-                    lambda: not self.db.dialect.requires_name_normalize,
-                    "Backend does not require denormalized names."
-                )
 
     @property
     def schemas(self):
@@ -263,12 +254,6 @@ class DefaultRequirements(SuiteRequirements):
                     "postgresql", "mssql", "oracle"
                 ], "Backend does not support window functions")
 
-    @property
-    def returning(self):
-        return only_if(["postgresql", "mssql", "oracle", "firebird"],
-                "'returning' not supported by database"
-            )
-
     @property
     def two_phase_transactions(self):
         """Target database must support two-phase transactions."""