]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- rework assertsql system, fixes #3293
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 19 Jan 2015 01:57:26 +0000 (20:57 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 19 Jan 2015 01:57:26 +0000 (20:57 -0500)
lib/sqlalchemy/testing/assertions.py
lib/sqlalchemy/testing/assertsql.py
lib/sqlalchemy/testing/fixtures.py
test/dialect/mssql/test_query.py
test/dialect/postgresql/test_query.py
test/dialect/postgresql/test_types.py
test/orm/test_cycles.py
test/orm/test_query.py
test/sql/test_constraints.py

index 46fcd64b1cf9bd7cd965c0dd44ae0fef5a912a02..635f6c5399239c9711a73ad604a2cc8736158035 100644 (file)
@@ -419,21 +419,16 @@ class AssertsExecutionResults(object):
             callable_()
         asserter.assert_(*rules)
 
-    def assert_sql(self, db, callable_, list_, with_sequences=None):
-        if (with_sequences is not None and
-                config.db.dialect.supports_sequences):
-            rules = with_sequences
-        else:
-            rules = list_
+    def assert_sql(self, db, callable_, rules):
 
         newrules = []
         for rule in rules:
             if isinstance(rule, dict):
                 newrule = assertsql.AllOf(*[
-                    assertsql.ExactSQL(k, v) for k, v in rule.items()
+                    assertsql.CompiledSQL(k, v) for k, v in rule.items()
                 ])
             else:
-                newrule = assertsql.ExactSQL(*rule)
+                newrule = assertsql.CompiledSQL(*rule)
             newrules.append(newrule)
 
         self.assert_sql_execution(db, callable_, *newrules)
index 2ac0605a22408b9b423ad71c6989ee6427937527..5c746e8f105506fa8aa8c901d1615f9c9f275db5 100644 (file)
@@ -11,84 +11,138 @@ import re
 import collections
 import contextlib
 from .. import event
+from sqlalchemy.schema import _DDLCompiles
+from sqlalchemy.engine.util import _distill_params
 
 
 class AssertRule(object):
 
-    def process_execute(self, clauseelement, *multiparams, **params):
-        pass
+    is_consumed = False
+    errormessage = None
+    consume_statement = True
 
-    def process_cursor_execute(self, statement, parameters, context,
-                               executemany):
+    def process_statement(self, execute_observed):
         pass
 
-    def is_consumed(self):
-        """Return True if this rule has been consumed, False if not.
-
-        Should raise an AssertionError if this rule's condition has
-        definitely failed.
-
-        """
-
-        raise NotImplementedError()
+    def no_more_statements(self):
+        assert False, 'All statements are complete, but pending '\
+            'assertion rules remain'
 
-    def rule_passed(self):
-        """Return True if the last test of this rule passed, False if
-        failed, None if no test was applied."""
 
-        raise NotImplementedError()
-
-    def consume_final(self):
-        """Return True if this rule has been consumed.
-
-        Should raise an AssertionError if this rule's condition has not
-        been consumed or has failed.
+class SQLMatchRule(AssertRule):
+    pass
 
-        """
 
-        if self._result is None:
-            assert False, 'Rule has not been consumed'
-        return self.is_consumed()
+class CursorSQL(SQLMatchRule):
+    consume_statement = False
 
+    def __init__(self, statement, params=None):
+        self.statement = statement
+        self.params = params
 
-class SQLMatchRule(AssertRule):
-    def __init__(self):
-        self._result = None
-        self._errmsg = ""
+    def process_statement(self, execute_observed):
+        stmt = execute_observed.statements[0]
+        if self.statement != stmt.statement or (
+                self.params is not None and self.params != stmt.parameters):
+            self.errormessage = \
+                "Testing for exact SQL %s parameters %s received %s %s" % (
+                    self.statement, self.params,
+                    stmt.statement, stmt.parameters
+                )
+        else:
+            execute_observed.statements.pop(0)
+            self.is_consumed = True
+            if not execute_observed.statements:
+                self.consume_statement = True
 
-    def rule_passed(self):
-        return self._result
 
-    def is_consumed(self):
-        if self._result is None:
-            return False
+class CompiledSQL(SQLMatchRule):
 
-        assert self._result, self._errmsg
+    def __init__(self, statement, params=None):
+        self.statement = statement
+        self.params = params
 
-        return True
+    def _compare_sql(self, execute_observed, received_statement):
+        stmt = re.sub(r'[\n\t]', '', self.statement)
+        return received_statement == stmt
 
+    def _compile_dialect(self, execute_observed):
+        return DefaultDialect()
 
-class ExactSQL(SQLMatchRule):
+    def _received_statement(self, execute_observed):
+        """reconstruct the statement and params in terms
+        of a target dialect, which for CompiledSQL is just DefaultDialect."""
 
-    def __init__(self, sql, params=None):
-        SQLMatchRule.__init__(self)
-        self.sql = sql
-        self.params = params
+        context = execute_observed.context
+        compare_dialect = self._compile_dialect(execute_observed)
+        if isinstance(context.compiled.statement, _DDLCompiles):
+            compiled = \
+                context.compiled.statement.compile(dialect=compare_dialect)
+        else:
+            compiled = (
+                context.compiled.statement.compile(
+                    dialect=compare_dialect,
+                    column_keys=context.compiled.column_keys,
+                    inline=context.compiled.inline)
+            )
+        _received_statement = re.sub(r'[\n\t]', '', str(compiled))
+        parameters = execute_observed.parameters
 
-    def process_cursor_execute(self, statement, parameters, context,
-                               executemany):
-        if not context:
-            return
-        _received_statement = \
-            _process_engine_statement(context.unicode_statement,
-                                      context)
-        _received_parameters = context.compiled_parameters
+        if not parameters:
+            _received_parameters = [compiled.construct_params()]
+        else:
+            _received_parameters = [
+                compiled.construct_params(m) for m in parameters]
+
+        return _received_statement, _received_parameters
+
+    def process_statement(self, execute_observed):
+        context = execute_observed.context
+
+        _received_statement, _received_parameters = \
+            self._received_statement(execute_observed)
+        params = self._all_params(context)
+
+        equivalent = self._compare_sql(execute_observed, _received_statement)
+
+        if equivalent:
+            if params is not None:
+                all_params = list(params)
+                all_received = list(_received_parameters)
+                while all_params and all_received:
+                    param = dict(all_params.pop(0))
+
+                    for idx, received in enumerate(list(all_received)):
+                        # do a positive compare only
+                        for param_key in param:
+                            # a key in param did not match current
+                            # 'received'
+                            if param_key not in received or \
+                                    received[param_key] != param[param_key]:
+                                break
+                        else:
+                            # all keys in param matched 'received';
+                            # onto next param
+                            del all_received[idx]
+                            break
+                    else:
+                        # param did not match any entry
+                        # in all_received
+                        equivalent = False
+                        break
+                if all_params or all_received:
+                    equivalent = False
 
-        # TODO: remove this step once all unit tests are migrated, as
-        # ExactSQL should really be *exact* SQL
+        if equivalent:
+            self.is_consumed = True
+            self.errormessage = None
+        else:
+            self.errormessage = self._failure_message(params) % {
+                'received_statement': _received_statement,
+                'received_parameters': _received_parameters
+            }
 
-        sql = _process_assertion_statement(self.sql, context)
-        equivalent = _received_statement == sql
+    def _all_params(self, context):
         if self.params:
             if util.callable(self.params):
                 params = self.params(context)
@@ -96,127 +150,77 @@ class ExactSQL(SQLMatchRule):
                 params = self.params
             if not isinstance(params, list):
                 params = [params]
-            equivalent = equivalent and params \
-                == context.compiled_parameters
+            return params
         else:
-            params = {}
-        self._result = equivalent
-        if not self._result:
-            self._errmsg = (
-                'Testing for exact statement %r exact params %r, '
-                'received %r with params %r' %
-                (sql, params, _received_statement, _received_parameters))
-
+            return None
+
+    def _failure_message(self, expected_params):
+        return (
+            'Testing for compiled statement %r partial params %r, '
+            'received %%(received_statement)r with params '
+            '%%(received_parameters)r' % (
+                self.statement, expected_params
+            )
+        )
 
-class RegexSQL(SQLMatchRule):
 
+class RegexSQL(CompiledSQL):
     def __init__(self, regex, params=None):
         SQLMatchRule.__init__(self)
         self.regex = re.compile(regex)
         self.orig_regex = regex
         self.params = params
 
-    def process_cursor_execute(self, statement, parameters, context,
-                               executemany):
-        if not context:
-            return
-        _received_statement = \
-            _process_engine_statement(context.unicode_statement,
-                                      context)
-        _received_parameters = context.compiled_parameters
-        equivalent = bool(self.regex.match(_received_statement))
-        if self.params:
-            if util.callable(self.params):
-                params = self.params(context)
-            else:
-                params = self.params
-            if not isinstance(params, list):
-                params = [params]
-
-            # do a positive compare only
-
-            for param, received in zip(params, _received_parameters):
-                for k, v in param.items():
-                    if k not in received or received[k] != v:
-                        equivalent = False
-                        break
-        else:
-            params = {}
-        self._result = equivalent
-        if not self._result:
-            self._errmsg = \
-                'Testing for regex %r partial params %r, received %r '\
-                'with params %r' % (self.orig_regex, params,
-                                    _received_statement,
-                                    _received_parameters)
-
-
-class CompiledSQL(SQLMatchRule):
+    def _failure_message(self, expected_params):
+        return (
+            'Testing for compiled statement ~%r partial params %r, '
+            'received %%(received_statement)r with params '
+            '%%(received_parameters)r' % (
+                self.orig_regex, expected_params
+            )
+        )
 
-    def __init__(self, statement, params=None):
-        SQLMatchRule.__init__(self)
-        self.statement = statement
-        self.params = params
+    def _compare_sql(self, execute_observed, received_statement):
+        return bool(self.regex.match(received_statement))
 
-    def process_cursor_execute(self, statement, parameters, context,
-                               executemany):
-        if not context:
-            return
-        from sqlalchemy.schema import _DDLCompiles
-        _received_parameters = list(context.compiled_parameters)
 
-        # recompile from the context, using the default dialect
+class DialectSQL(CompiledSQL):
+    def _compile_dialect(self, execute_observed):
+        return execute_observed.context.dialect
 
-        if isinstance(context.compiled.statement, _DDLCompiles):
-            compiled = \
-                context.compiled.statement.compile(dialect=DefaultDialect())
+    def _received_statement(self, execute_observed):
+        received_stmt, received_params = super(DialectSQL, self).\
+            _received_statement(execute_observed)
+        for real_stmt in execute_observed.statements:
+            if real_stmt.statement == received_stmt:
+                break
         else:
-            compiled = (
-                context.compiled.statement.compile(
-                    dialect=DefaultDialect(),
-                    column_keys=context.compiled.column_keys)
-            )
-        _received_statement = re.sub(r'[\n\t]', '', str(compiled))
-        equivalent = self.statement == _received_statement
-        if self.params:
-            if util.callable(self.params):
-                params = self.params(context)
-            else:
-                params = self.params
-            if not isinstance(params, list):
-                params = [params]
-            else:
-                params = list(params)
-            all_params = list(params)
-            all_received = list(_received_parameters)
-            while params:
-                param = dict(params.pop(0))
-                for k, v in context.compiled.params.items():
-                    param.setdefault(k, v)
-                if param not in _received_parameters:
-                    equivalent = False
-                    break
-                else:
-                    _received_parameters.remove(param)
-            if _received_parameters:
-                equivalent = False
+            raise AssertionError(
+                "Can't locate compiled statement %r in list of "
+                "statements actually invoked" % received_stmt)
+        return received_stmt, execute_observed.context.compiled_parameters
+
+    def _compare_sql(self, execute_observed, received_statement):
+        stmt = re.sub(r'[\n\t]', '', self.statement)
+
+        # convert our comparison statement to have the
+        # paramstyle of the received
+        paramstyle = execute_observed.context.dialect.paramstyle
+        if paramstyle == 'pyformat':
+            stmt = re.sub(
+                r':([\w_]+)', r"%(\1)s", stmt)
         else:
-            params = {}
-            all_params = {}
-            all_received = []
-        self._result = equivalent
-        if not self._result:
-            print('Testing for compiled statement %r partial params '
-                  '%r, received %r with params %r' %
-                  (self.statement, all_params,
-                   _received_statement, all_received))
-            self._errmsg = (
-                'Testing for compiled statement %r partial params %r, '
-                'received %r with params %r' %
-                (self.statement, all_params,
-                 _received_statement, all_received))
-
-            # print self._errmsg
+            # positional params
+            repl = None
+            if paramstyle == 'qmark':
+                repl = "?"
+            elif paramstyle == 'format':
+                repl = r"%s"
+            elif paramstyle == 'numeric':
+                repl = None
+            stmt = re.sub(r':([\w_]+)', repl, stmt)
+
+        return received_statement == stmt
 
 
 class CountStatements(AssertRule):
@@ -225,21 +229,13 @@ class CountStatements(AssertRule):
         self.count = count
         self._statement_count = 0
 
-    def process_execute(self, clauseelement, *multiparams, **params):
+    def process_statement(self, execute_observed):
         self._statement_count += 1
 
-    def process_cursor_execute(self, statement, parameters, context,
-                               executemany):
-        pass
-
-    def is_consumed(self):
-        return False
-
-    def consume_final(self):
-        assert self.count == self._statement_count, \
-            'desired statement count %d does not match %d' \
-            % (self.count, self._statement_count)
-        return True
+    def no_more_statements(self):
+        if self.count != self._statement_count:
+            assert False, 'desired statement count %d does not match %d' \
+                % (self.count, self._statement_count)
 
 
 class AllOf(AssertRule):
@@ -247,98 +243,41 @@ class AllOf(AssertRule):
     def __init__(self, *rules):
         self.rules = set(rules)
 
-    def process_execute(self, clauseelement, *multiparams, **params):
-        for rule in self.rules:
-            rule.process_execute(clauseelement, *multiparams, **params)
-
-    def process_cursor_execute(self, statement, parameters, context,
-                               executemany):
-        for rule in self.rules:
-            rule.process_cursor_execute(statement, parameters, context,
-                                        executemany)
-
-    def is_consumed(self):
-        if not self.rules:
-            return True
+    def process_statement(self, execute_observed):
         for rule in list(self.rules):
-            if rule.rule_passed():  # a rule passed, move on
-                self.rules.remove(rule)
-                return len(self.rules) == 0
-        return False
-
-    def rule_passed(self):
-        return self.is_consumed()
-
-    def consume_final(self):
-        return len(self.rules) == 0
+            rule.errormessage = None
+            rule.process_statement(execute_observed)
+            if rule.is_consumed:
+                self.rules.discard(rule)
+                if not self.rules:
+                    self.is_consumed = True
+                break
+            elif not rule.errormessage:
+                # rule is not done yet
+                self.errormessage = None
+                break
+        else:
+            self.errormessage = list(self.rules)[0].errormessage
 
 
 class Or(AllOf):
-    def __init__(self, *rules):
-        self.rules = set(rules)
-        self._consume_final = False
-
-    def is_consumed(self):
-        if not self.rules:
-            return True
-        for rule in list(self.rules):
-            if rule.rule_passed():  # a rule passed
-                self._consume_final = True
-                return True
-        return False
-
-    def consume_final(self):
-        assert self._consume_final, "Unsatisified rules remain"
-
-
-def _process_engine_statement(query, context):
-    if util.jython:
-
-        # oracle+zxjdbc passes a PyStatement when returning into
-
-        query = str(query)
-    if context.engine.name == 'mssql' \
-            and query.endswith('; select scope_identity()'):
-        query = query[:-25]
-    query = re.sub(r'\n', '', query)
-    return query
 
+    def process_statement(self, execute_observed):
+        for rule in self.rules:
+            rule.process_statement(execute_observed)
+            if rule.is_consumed:
+                self.is_consumed = True
+                break
+        else:
+            self.errormessage = list(self.rules)[0].errormessage
 
-def _process_assertion_statement(query, context):
-    paramstyle = context.dialect.paramstyle
-    if paramstyle == 'named':
-        pass
-    elif paramstyle == 'pyformat':
-        query = re.sub(r':([\w_]+)', r"%(\1)s", query)
-    else:
-        # positional params
-        repl = None
-        if paramstyle == 'qmark':
-            repl = "?"
-        elif paramstyle == 'format':
-            repl = r"%s"
-        elif paramstyle == 'numeric':
-            repl = None
-        query = re.sub(r':([\w_]+)', repl, query)
 
-    return query
-
-
-class SQLExecuteObserved(
-    collections.namedtuple(
-        "SQLExecuteObserved", ["clauseelement", "multiparams", "params"])
-):
-    def process(self, rules):
-        if rules is not None:
-            if not rules:
-                assert False, \
-                    'All rules have been exhausted, but further '\
-                    'statements remain'
-            rule = rules[0]
-            rule.process_execute(
-                self.clauseelement, *self.multiparams, **self.params)
-            if rule.is_consumed():
-                rules.pop(0)
+class SQLExecuteObserved(object):
+    def __init__(self, context, clauseelement, multiparams, params):
+        self.context = context
+        self.clauseelement = clauseelement
+        self.parameters = _distill_params(multiparams, params)
+        self.statements = []
 
 
 class SQLCursorExecuteObserved(
@@ -346,12 +285,7 @@ class SQLCursorExecuteObserved(
         "SQLCursorExecuteObserved",
         ["statement", "parameters", "context", "executemany"])
 ):
-    def process(self, rules):
-        if rules:
-            rule = rules[0]
-            rule.process_cursor_execute(
-                self.statement, self.parameters,
-                self.context, self.executemany)
+    pass
 
 
 class SQLAsserter(object):
@@ -359,43 +293,63 @@ class SQLAsserter(object):
         self.accumulated = []
 
     def _close(self):
-        # safety feature in case event.remove
-        # goes haywire
         self._final = self.accumulated
         del self.accumulated
 
     def assert_(self, *rules):
         rules = list(rules)
-        for observed in self._final:
-            observed.process(rules)
+        observed = list(self._final)
+
+        while observed and rules:
+            rule = rules[0]
+            rule.process_statement(observed[0])
+            if rule.is_consumed:
+                rules.pop(0)
+            elif rule.errormessage:
+                assert False, rule.errormessage
 
-        for rule in rules:
-            if not rule.consume_final():
-                assert False, \
-                    'All statements are complete, but pending '\
-                    'assertion rules remain'
+            if rule.consume_statement:
+                observed.pop(0)
+
+        if not observed and rules:
+            rules[0].no_more_statements()
+        elif not rules and observed:
+            assert False, "Additional SQL statements remain"
 
 
 @contextlib.contextmanager
 def assert_engine(engine):
     asserter = SQLAsserter()
 
-    @event.listens_for(engine, "after_execute")
-    def execute(conn, clauseelement, multiparams, params, result):
-        asserter.accumulated.append(
-            SQLExecuteObserved(
-                clauseelement, multiparams, params))
+    orig = []
+
+    @event.listens_for(engine, "before_execute")
+    def connection_execute(conn, clauseelement, multiparams, params):
+        # grab the original statement + params before any cursor
+        # execution
+        orig[:] = clauseelement, multiparams, params
 
     @event.listens_for(engine, "after_cursor_execute")
     def cursor_execute(conn, cursor, statement, parameters,
                        context, executemany):
-        asserter.accumulated.append(
+        if not context:
+            return
+        # then grab real cursor statements and associate them all
+        # around a single context
+        if asserter.accumulated and \
+                asserter.accumulated[-1].context is context:
+            obs = asserter.accumulated[-1]
+        else:
+            obs = SQLExecuteObserved(context, orig[0], orig[1], orig[2])
+            asserter.accumulated.append(obs)
+        obs.statements.append(
             SQLCursorExecuteObserved(
-                statement, parameters, context, executemany))
+                statement, parameters, context, executemany)
+        )
 
     try:
         yield asserter
     finally:
-        asserter._close()
         event.remove(engine, "after_cursor_execute", cursor_execute)
-        event.remove(engine, "after_execute", execute)
+        event.remove(engine, "before_execute", connection_execute)
+        asserter._close()
index d86049da749591b6ff598e697c10c9246a926be2..48d4d9c9b665955767467e26747996f18c10a3be 100644 (file)
@@ -192,9 +192,8 @@ class TablesTest(TestBase):
     def sql_count_(self, count, fn):
         self.assert_sql_count(self.bind, fn, count)
 
-    def sql_eq_(self, callable_, statements, with_sequences=None):
-        self.assert_sql(self.bind,
-                        callable_, statements, with_sequences)
+    def sql_eq_(self, callable_, statements):
+        self.assert_sql(self.bind, callable_, statements)
 
     @classmethod
     def _load_fixtures(cls):
index 715eebb8489e2096c8d5b541c8bc0fa8aeed1f9e..e0affe831f3d70a19078cf9ac19ed5b04be010d6 100644 (file)
@@ -7,6 +7,7 @@ from sqlalchemy.testing import fixtures, AssertsCompiledSQL
 from sqlalchemy import testing
 from sqlalchemy.util import ue
 from sqlalchemy import util
+from sqlalchemy.testing.assertsql import CursorSQL
 
 
 
@@ -163,7 +164,6 @@ class QueryUnicodeTest(fixtures.TestBase):
         finally:
             meta.drop_all()
 
-from sqlalchemy.testing.assertsql import ExactSQL
 class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
     __only_on__ = 'mssql'
 
@@ -232,27 +232,73 @@ class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
             con.execute("""drop trigger paj""")
             meta.drop_all()
 
-    @testing.fails_on_everything_except('mssql+pyodbc', 'pyodbc-specific feature')
     @testing.provide_metadata
     def test_disable_scope_identity(self):
         engine = engines.testing_engine(options={"use_scope_identity": False})
         metadata = self.metadata
-        metadata.bind = engine
-        t1 = Table('t1', metadata,
-                Column('id', Integer, primary_key=True),
-                implicit_returning=False
+        t1 = Table(
+            't1', metadata,
+            Column('id', Integer, primary_key=True),
+            Column('data', String(50)),
+            implicit_returning=False
         )
-        metadata.create_all()
+        metadata.create_all(engine)
+
+        with self.sql_execution_asserter(engine) as asserter:
+            engine.execute(t1.insert(), {"data": "somedata"})
+
+        asserter.assert_(
+            CursorSQL(
+                "INSERT INTO t1 (data) VALUES (?)",
+                ("somedata", )
+            ),
+            CursorSQL("SELECT @@identity AS lastrowid"),
+        )
+
+    @testing.provide_metadata
+    def test_enable_scope_identity(self):
+        engine = engines.testing_engine(options={"use_scope_identity": True})
+        metadata = self.metadata
+        t1 = Table(
+            't1', metadata,
+            Column('id', Integer, primary_key=True),
+            implicit_returning=False
+        )
+        metadata.create_all(engine)
+
+        with self.sql_execution_asserter(engine) as asserter:
+            engine.execute(t1.insert())
+
+        # even with pyodbc, we don't embed the scope identity on a
+        # DEFAULT VALUES insert
+        asserter.assert_(
+            CursorSQL("INSERT INTO t1 DEFAULT VALUES"),
+            CursorSQL("SELECT scope_identity() AS lastrowid"),
+        )
+
+    @testing.only_on('mssql+pyodbc')
+    @testing.provide_metadata
+    def test_embedded_scope_identity(self):
+        engine = engines.testing_engine(options={"use_scope_identity": True})
+        metadata = self.metadata
+        t1 = Table(
+            't1', metadata,
+            Column('id', Integer, primary_key=True),
+            Column('data', String(50)),
+            implicit_returning=False
+        )
+        metadata.create_all(engine)
+
+        with self.sql_execution_asserter(engine) as asserter:
+            engine.execute(t1.insert(), {'data': 'somedata'})
 
-        self.assert_sql_execution(
-                testing.db,
-                lambda: engine.execute(t1.insert()),
-                ExactSQL("INSERT INTO t1 DEFAULT VALUES"),
-                # we don't have an event for
-                # "SELECT @@IDENTITY" part here.
-                # this will be in 0.8 with #2459
+        # pyodbc-specific system
+        asserter.assert_(
+            CursorSQL(
+                "INSERT INTO t1 (data) VALUES (?); select scope_identity()",
+                ("somedata", )
+            ),
         )
-        assert not engine.dialect.use_scope_identity
 
     def test_insertid_schema(self):
         meta = MetaData(testing.db)
index 6841f397a9c835452deb9c04a1663ce539ae5d2c..26ff5e93b1282adf15715859fc6720c73e280232 100644 (file)
@@ -6,6 +6,7 @@ from sqlalchemy import Table, Column, MetaData, Integer, String, bindparam, \
     Sequence, ForeignKey, text, select, func, extract, literal_column, \
     tuple_, DateTime, Time, literal, and_, Date, or_
 from sqlalchemy.testing import engines, fixtures
+from sqlalchemy.testing.assertsql import DialectSQL, CursorSQL
 from sqlalchemy import testing
 from sqlalchemy import exc
 from sqlalchemy.dialects import postgresql
@@ -170,7 +171,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
             engines.testing_engine(options={'implicit_returning': False})
         metadata.bind = self.engine
 
-        def go():
+        with self.sql_execution_asserter(self.engine) as asserter:
 
             # execute with explicit id
 
@@ -199,32 +200,41 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
 
             table.insert(inline=True).execute({'data': 'd8'})
 
-        # note that the test framework doesn't capture the "preexecute"
-        # of a seqeuence or default.  we just see it in the bind params.
+        asserter.assert_(
+            DialectSQL(
+                'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+                {'id': 30, 'data': 'd1'}),
+            DialectSQL(
+                'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+                {'id': 1, 'data': 'd2'}),
+            DialectSQL(
+                'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+                [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
+            DialectSQL(
+                'INSERT INTO testtable (data) VALUES (:data)',
+                [{'data': 'd5'}, {'data': 'd6'}]),
+            DialectSQL(
+                'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+                [{'id': 33, 'data': 'd7'}]),
+            DialectSQL(
+                'INSERT INTO testtable (data) VALUES (:data)',
+                [{'data': 'd8'}]),
+        )
+
+        eq_(
+            table.select().execute().fetchall(),
+            [
+                (30, 'd1'),
+                (1, 'd2'),
+                (31, 'd3'),
+                (32, 'd4'),
+                (2, 'd5'),
+                (3, 'd6'),
+                (33, 'd7'),
+                (4, 'd8'),
+            ]
+        )
 
-        self.assert_sql(self.engine, go, [], with_sequences=[
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
-             {'id': 30, 'data': 'd1'}),
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
-             {'id': 1, 'data': 'd2'}),
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
-             [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
-            ('INSERT INTO testtable (data) VALUES (:data)',
-             [{'data': 'd5'}, {'data': 'd6'}]),
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
-             [{'id': 33, 'data': 'd7'}]),
-            ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]),
-        ])
-        assert table.select().execute().fetchall() == [
-            (30, 'd1'),
-            (1, 'd2'),
-            (31, 'd3'),
-            (32, 'd4'),
-            (2, 'd5'),
-            (3, 'd6'),
-            (33, 'd7'),
-            (4, 'd8'),
-        ]
         table.delete().execute()
 
         # test the same series of events using a reflected version of
@@ -233,7 +243,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
         m2 = MetaData(self.engine)
         table = Table(table.name, m2, autoload=True)
 
-        def go():
+        with self.sql_execution_asserter(self.engine) as asserter:
             table.insert().execute({'id': 30, 'data': 'd1'})
             r = table.insert().execute({'data': 'd2'})
             assert r.inserted_primary_key == [5]
@@ -243,29 +253,39 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
             table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
             table.insert(inline=True).execute({'data': 'd8'})
 
-        self.assert_sql(self.engine, go, [], with_sequences=[
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
-             {'id': 30, 'data': 'd1'}),
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
-             {'id': 5, 'data': 'd2'}),
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
-             [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
-            ('INSERT INTO testtable (data) VALUES (:data)',
-             [{'data': 'd5'}, {'data': 'd6'}]),
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
-             [{'id': 33, 'data': 'd7'}]),
-            ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]),
-        ])
-        assert table.select().execute().fetchall() == [
-            (30, 'd1'),
-            (5, 'd2'),
-            (31, 'd3'),
-            (32, 'd4'),
-            (6, 'd5'),
-            (7, 'd6'),
-            (33, 'd7'),
-            (8, 'd8'),
-        ]
+        asserter.assert_(
+            DialectSQL(
+                'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+                {'id': 30, 'data': 'd1'}),
+            DialectSQL(
+                'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+                {'id': 5, 'data': 'd2'}),
+            DialectSQL(
+                'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+                [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
+            DialectSQL(
+                'INSERT INTO testtable (data) VALUES (:data)',
+                [{'data': 'd5'}, {'data': 'd6'}]),
+            DialectSQL(
+                'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+                [{'id': 33, 'data': 'd7'}]),
+            DialectSQL(
+                'INSERT INTO testtable (data) VALUES (:data)',
+                [{'data': 'd8'}]),
+        )
+        eq_(
+            table.select().execute().fetchall(),
+            [
+                (30, 'd1'),
+                (5, 'd2'),
+                (31, 'd3'),
+                (32, 'd4'),
+                (6, 'd5'),
+                (7, 'd6'),
+                (33, 'd7'),
+                (8, 'd8'),
+            ]
+        )
         table.delete().execute()
 
     def _assert_data_autoincrement_returning(self, table):
@@ -273,7 +293,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
             engines.testing_engine(options={'implicit_returning': True})
         metadata.bind = self.engine
 
-        def go():
+        with self.sql_execution_asserter(self.engine) as asserter:
 
             # execute with explicit id
 
@@ -302,29 +322,34 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
 
             table.insert(inline=True).execute({'data': 'd8'})
 
-        self.assert_sql(self.engine, go, [], with_sequences=[
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+        asserter.assert_(
+            DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
              {'id': 30, 'data': 'd1'}),
-            ('INSERT INTO testtable (data) VALUES (:data) RETURNING '
+            DialectSQL('INSERT INTO testtable (data) VALUES (:data) RETURNING '
              'testtable.id', {'data': 'd2'}),
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+            DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
              [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
-            ('INSERT INTO testtable (data) VALUES (:data)',
+            DialectSQL('INSERT INTO testtable (data) VALUES (:data)',
              [{'data': 'd5'}, {'data': 'd6'}]),
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+            DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
              [{'id': 33, 'data': 'd7'}]),
-            ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]),
-        ])
-        assert table.select().execute().fetchall() == [
-            (30, 'd1'),
-            (1, 'd2'),
-            (31, 'd3'),
-            (32, 'd4'),
-            (2, 'd5'),
-            (3, 'd6'),
-            (33, 'd7'),
-            (4, 'd8'),
-        ]
+            DialectSQL('INSERT INTO testtable (data) VALUES (:data)',
+                [{'data': 'd8'}]),
+        )
+
+        eq_(
+            table.select().execute().fetchall(),
+            [
+                (30, 'd1'),
+                (1, 'd2'),
+                (31, 'd3'),
+                (32, 'd4'),
+                (2, 'd5'),
+                (3, 'd6'),
+                (33, 'd7'),
+                (4, 'd8'),
+            ]
+        )
         table.delete().execute()
 
         # test the same series of events using a reflected version of
@@ -333,7 +358,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
         m2 = MetaData(self.engine)
         table = Table(table.name, m2, autoload=True)
 
-        def go():
+        with self.sql_execution_asserter(self.engine) as asserter:
             table.insert().execute({'id': 30, 'data': 'd1'})
             r = table.insert().execute({'data': 'd2'})
             assert r.inserted_primary_key == [5]
@@ -343,29 +368,32 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
             table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
             table.insert(inline=True).execute({'data': 'd8'})
 
-        self.assert_sql(self.engine, go, [], with_sequences=[
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+        asserter.assert_(
+            DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
              {'id': 30, 'data': 'd1'}),
-            ('INSERT INTO testtable (data) VALUES (:data) RETURNING '
+            DialectSQL('INSERT INTO testtable (data) VALUES (:data) RETURNING '
              'testtable.id', {'data': 'd2'}),
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+            DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
              [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
-            ('INSERT INTO testtable (data) VALUES (:data)',
+            DialectSQL('INSERT INTO testtable (data) VALUES (:data)',
              [{'data': 'd5'}, {'data': 'd6'}]),
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+            DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
              [{'id': 33, 'data': 'd7'}]),
-            ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]),
-        ])
-        assert table.select().execute().fetchall() == [
-            (30, 'd1'),
-            (5, 'd2'),
-            (31, 'd3'),
-            (32, 'd4'),
-            (6, 'd5'),
-            (7, 'd6'),
-            (33, 'd7'),
-            (8, 'd8'),
-        ]
+            DialectSQL('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]),
+        )
+        eq_(
+            table.select().execute().fetchall(),
+            [
+                (30, 'd1'),
+                (5, 'd2'),
+                (31, 'd3'),
+                (32, 'd4'),
+                (6, 'd5'),
+                (7, 'd6'),
+                (33, 'd7'),
+                (8, 'd8'),
+            ]
+        )
         table.delete().execute()
 
     def _assert_data_with_sequence(self, table, seqname):
@@ -373,7 +401,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
             engines.testing_engine(options={'implicit_returning': False})
         metadata.bind = self.engine
 
-        def go():
+        with self.sql_execution_asserter(self.engine) as asserter:
             table.insert().execute({'id': 30, 'data': 'd1'})
             table.insert().execute({'data': 'd2'})
             table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
@@ -382,30 +410,34 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
             table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
             table.insert(inline=True).execute({'data': 'd8'})
 
-        self.assert_sql(self.engine, go, [], with_sequences=[
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+        asserter.assert_(
+            DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
              {'id': 30, 'data': 'd1'}),
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+            CursorSQL("select nextval('my_seq')"),
+            DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
              {'id': 1, 'data': 'd2'}),
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+            DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
              [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
-            ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
+            DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
              ":data)" % seqname, [{'data': 'd5'}, {'data': 'd6'}]),
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+            DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
              [{'id': 33, 'data': 'd7'}]),
-            ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
+            DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
              ":data)" % seqname, [{'data': 'd8'}]),
-        ])
-        assert table.select().execute().fetchall() == [
-            (30, 'd1'),
-            (1, 'd2'),
-            (31, 'd3'),
-            (32, 'd4'),
-            (2, 'd5'),
-            (3, 'd6'),
-            (33, 'd7'),
-            (4, 'd8'),
-        ]
+        )
+        eq_(
+            table.select().execute().fetchall(),
+            [
+                (30, 'd1'),
+                (1, 'd2'),
+                (31, 'd3'),
+                (32, 'd4'),
+                (2, 'd5'),
+                (3, 'd6'),
+                (33, 'd7'),
+                (4, 'd8'),
+            ]
+        )
 
         # cant test reflection here since the Sequence must be
         # explicitly specified
@@ -415,7 +447,7 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
             engines.testing_engine(options={'implicit_returning': True})
         metadata.bind = self.engine
 
-        def go():
+        with self.sql_execution_asserter(self.engine) as asserter:
             table.insert().execute({'id': 30, 'data': 'd1'})
             table.insert().execute({'data': 'd2'})
             table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
@@ -424,31 +456,35 @@ class InsertTest(fixtures.TestBase, AssertsExecutionResults):
             table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
             table.insert(inline=True).execute({'data': 'd8'})
 
-        self.assert_sql(self.engine, go, [], with_sequences=[
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+        asserter.assert_(
+            DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
              {'id': 30, 'data': 'd1'}),
-            ("INSERT INTO testtable (id, data) VALUES "
+            DialectSQL("INSERT INTO testtable (id, data) VALUES "
              "(nextval('my_seq'), :data) RETURNING testtable.id",
              {'data': 'd2'}),
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+            DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
              [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
-            ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
+            DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
              ":data)" % seqname, [{'data': 'd5'}, {'data': 'd6'}]),
-            ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+            DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
              [{'id': 33, 'data': 'd7'}]),
-            ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
+            DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
              ":data)" % seqname, [{'data': 'd8'}]),
-        ])
-        assert table.select().execute().fetchall() == [
-            (30, 'd1'),
-            (1, 'd2'),
-            (31, 'd3'),
-            (32, 'd4'),
-            (2, 'd5'),
-            (3, 'd6'),
-            (33, 'd7'),
-            (4, 'd8'),
-        ]
+        )
+
+        eq_(
+            table.select().execute().fetchall(),
+            [
+                (30, 'd1'),
+                (1, 'd2'),
+                (31, 'd3'),
+                (32, 'd4'),
+                (2, 'd5'),
+                (3, 'd6'),
+                (33, 'd7'),
+                (4, 'd8'),
+            ]
+        )
 
         # cant test reflection here since the Sequence must be
         # explicitly specified
index 5c5da59b13d107fe97768c4619b42fe7a16ba98e..c62ca79a89ec6d78242c92417ec53417506881ff 100644 (file)
@@ -189,7 +189,7 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults):
 
         try:
             self.assert_sql(
-                testing.db, go, [], with_sequences=[
+                testing.db, go, [
                     ("CREATE TABLE foo (\tbar "
                      "VARCHAR(5), \tCONSTRAINT myenum CHECK "
                      "(bar IN ('one', 'two', 'three')))", {})])
@@ -259,9 +259,9 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults):
 
         try:
             self.assert_sql(
-                engine, go, [], with_sequences=[
-                    ("CREATE TABLE foo (\tbar "
-                     "VARCHAR(5), \tCONSTRAINT myenum CHECK "
+                engine, go, [
+                    ("CREATE TABLE foo (bar "
+                     "VARCHAR(5), CONSTRAINT myenum CHECK "
                      "(bar IN ('one', 'two', 'three')))", {})])
         finally:
             metadata.drop_all(engine)
index fc7059dcb127a23405d7c6d0e5ce9d6eb6858a48..c95b8d152a65e251e5ab949cfb576c8542fff7be 100644 (file)
@@ -11,7 +11,7 @@ from sqlalchemy.testing.schema import Table, Column
 from sqlalchemy.orm import mapper, relationship, backref, \
                             create_session, sessionmaker
 from sqlalchemy.testing import eq_
-from sqlalchemy.testing.assertsql import RegexSQL, ExactSQL, CompiledSQL, AllOf
+from sqlalchemy.testing.assertsql import RegexSQL, CompiledSQL, AllOf
 from sqlalchemy.testing import fixtures
 
 
@@ -656,7 +656,7 @@ class OneToManyManyToOneTest(fixtures.MappedTest):
             RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}),
             RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}),
             RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}),
-            ExactSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id "
+            CompiledSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id "
                         "WHERE person.id = :person_id",
                         lambda ctx:{'favorite_ball_id':p.favorite.id, 'person_id':p.id}
              ),
@@ -667,11 +667,11 @@ class OneToManyManyToOneTest(fixtures.MappedTest):
         self.assert_sql_execution(
             testing.db,
             sess.flush,
-            ExactSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id "
+            CompiledSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id "
                 "WHERE person.id = :person_id",
                 lambda ctx: {'person_id': p.id, 'favorite_ball_id': None}),
-            ExactSQL("DELETE FROM ball WHERE ball.id = :id", None), # lambda ctx:[{'id': 1L}, {'id': 4L}, {'id': 3L}, {'id': 2L}])
-            ExactSQL("DELETE FROM person WHERE person.id = :id", lambda ctx:[{'id': p.id}])
+            CompiledSQL("DELETE FROM ball WHERE ball.id = :id", None), # lambda ctx:[{'id': 1L}, {'id': 4L}, {'id': 3L}, {'id': 2L}])
+            CompiledSQL("DELETE FROM person WHERE person.id = :id", lambda ctx:[{'id': p.id}])
         )
 
     def test_post_update_backref(self):
index 354bbe5b174a258acc52400f0e2666a8dfea9075..4c6e16bf2ca1312f11b4645121cc4429187901d4 100644 (file)
@@ -1484,7 +1484,6 @@ class SliceTest(QueryTest):
         assert create_session().query(User).filter(User.id == 27). \
             first() is None
 
-    @testing.only_on('sqlite', 'testing execution but db-specific syntax')
     def test_limit_offset_applies(self):
         """Test that the expected LIMIT/OFFSET is applied for slices.
 
@@ -1510,15 +1509,15 @@ class SliceTest(QueryTest):
             testing.db, lambda: q[:20], [
                 (
                     "SELECT users.id AS users_id, users.name "
-                    "AS users_name FROM users LIMIT :param_1 OFFSET :param_2",
-                    {'param_1': 20, 'param_2': 0})])
+                    "AS users_name FROM users LIMIT :param_1",
+                    {'param_1': 20})])
 
         self.assert_sql(
             testing.db, lambda: q[5:], [
                 (
                     "SELECT users.id AS users_id, users.name "
-                    "AS users_name FROM users LIMIT :param_1 OFFSET :param_2",
-                    {'param_1': -1, 'param_2': 5})])
+                    "AS users_name FROM users LIMIT -1 OFFSET :param_1",
+                    {'param_1': 5})])
 
         self.assert_sql(testing.db, lambda: q[2:2], [])
 
index 604b5efebd499388acf865c3a585b26158f7fc7e..2603f67a3bc2c415d37d74c892f88862124ce008 100644 (file)
@@ -9,7 +9,7 @@ from sqlalchemy import testing
 from sqlalchemy.engine import default
 from sqlalchemy.testing import engines
 from sqlalchemy.testing import eq_
-from sqlalchemy.testing.assertsql import AllOf, RegexSQL, ExactSQL, CompiledSQL
+from sqlalchemy.testing.assertsql import AllOf, RegexSQL, CompiledSQL
 from sqlalchemy.sql import table, column
 
 
@@ -417,13 +417,13 @@ class ConstraintGenTest(fixtures.TestBase, AssertsExecutionResults):
             lambda: events.create(testing.db),
             RegexSQL("^CREATE TABLE events"),
             AllOf(
-                ExactSQL('CREATE UNIQUE INDEX ix_events_name ON events '
+                CompiledSQL('CREATE UNIQUE INDEX ix_events_name ON events '
                          '(name)'),
-                ExactSQL('CREATE INDEX ix_events_location ON events '
+                CompiledSQL('CREATE INDEX ix_events_location ON events '
                          '(location)'),
-                ExactSQL('CREATE UNIQUE INDEX sport_announcer ON events '
+                CompiledSQL('CREATE UNIQUE INDEX sport_announcer ON events '
                          '(sport, announcer)'),
-                ExactSQL('CREATE INDEX idx_winners ON events (winner)')
+                CompiledSQL('CREATE INDEX idx_winners ON events (winner)'),
             )
         )
 
@@ -441,7 +441,7 @@ class ConstraintGenTest(fixtures.TestBase, AssertsExecutionResults):
             lambda: t.create(testing.db),
             CompiledSQL('CREATE TABLE sometable (id INTEGER NOT NULL, '
                         'data VARCHAR(50), PRIMARY KEY (id))'),
-            ExactSQL('CREATE INDEX myindex ON sometable (data DESC)')
+            CompiledSQL('CREATE INDEX myindex ON sometable (data DESC)')
         )