callable_()
asserter.assert_(*rules)
- def assert_sql(self, db, callable_, list_, with_sequences=None):
- if (with_sequences is not None and
- config.db.dialect.supports_sequences):
- rules = with_sequences
- else:
- rules = list_
+ def assert_sql(self, db, callable_, rules):
newrules = []
for rule in rules:
if isinstance(rule, dict):
newrule = assertsql.AllOf(*[
- assertsql.ExactSQL(k, v) for k, v in rule.items()
+ assertsql.CompiledSQL(k, v) for k, v in rule.items()
])
else:
- newrule = assertsql.ExactSQL(*rule)
+ newrule = assertsql.CompiledSQL(*rule)
newrules.append(newrule)
self.assert_sql_execution(db, callable_, *newrules)
import collections
import contextlib
from .. import event
+from sqlalchemy.schema import _DDLCompiles
+from sqlalchemy.engine.util import _distill_params
class AssertRule(object):
- def process_execute(self, clauseelement, *multiparams, **params):
- pass
+ is_consumed = False
+ errormessage = None
+ consume_statement = True
- def process_cursor_execute(self, statement, parameters, context,
- executemany):
+ def process_statement(self, execute_observed):
pass
- def is_consumed(self):
- """Return True if this rule has been consumed, False if not.
-
- Should raise an AssertionError if this rule's condition has
- definitely failed.
-
- """
-
- raise NotImplementedError()
+ def no_more_statements(self):
+ assert False, 'All statements are complete, but pending '\
+ 'assertion rules remain'
- def rule_passed(self):
- """Return True if the last test of this rule passed, False if
- failed, None if no test was applied."""
- raise NotImplementedError()
-
- def consume_final(self):
- """Return True if this rule has been consumed.
-
- Should raise an AssertionError if this rule's condition has not
- been consumed or has failed.
+class SQLMatchRule(AssertRule):
+ pass
- """
- if self._result is None:
- assert False, 'Rule has not been consumed'
- return self.is_consumed()
+class CursorSQL(SQLMatchRule):
+ consume_statement = False
+ def __init__(self, statement, params=None):
+ self.statement = statement
+ self.params = params
-class SQLMatchRule(AssertRule):
- def __init__(self):
- self._result = None
- self._errmsg = ""
+ def process_statement(self, execute_observed):
+ stmt = execute_observed.statements[0]
+ if self.statement != stmt.statement or (
+ self.params is not None and self.params != stmt.parameters):
+ self.errormessage = \
+ "Testing for exact SQL %s parameters %s received %s %s" % (
+ self.statement, self.params,
+ stmt.statement, stmt.parameters
+ )
+ else:
+ execute_observed.statements.pop(0)
+ self.is_consumed = True
+ if not execute_observed.statements:
+ self.consume_statement = True
- def rule_passed(self):
- return self._result
- def is_consumed(self):
- if self._result is None:
- return False
+class CompiledSQL(SQLMatchRule):
- assert self._result, self._errmsg
+ def __init__(self, statement, params=None):
+ self.statement = statement
+ self.params = params
- return True
+ def _compare_sql(self, execute_observed, received_statement):
+ stmt = re.sub(r'[\n\t]', '', self.statement)
+ return received_statement == stmt
+ def _compile_dialect(self, execute_observed):
+ return DefaultDialect()
-class ExactSQL(SQLMatchRule):
+ def _received_statement(self, execute_observed):
+ """reconstruct the statement and params in terms
+ of a target dialect, which for CompiledSQL is just DefaultDialect."""
- def __init__(self, sql, params=None):
- SQLMatchRule.__init__(self)
- self.sql = sql
- self.params = params
+ context = execute_observed.context
+ compare_dialect = self._compile_dialect(execute_observed)
+ if isinstance(context.compiled.statement, _DDLCompiles):
+ compiled = \
+ context.compiled.statement.compile(dialect=compare_dialect)
+ else:
+ compiled = (
+ context.compiled.statement.compile(
+ dialect=compare_dialect,
+ column_keys=context.compiled.column_keys,
+ inline=context.compiled.inline)
+ )
+ _received_statement = re.sub(r'[\n\t]', '', str(compiled))
+ parameters = execute_observed.parameters
- def process_cursor_execute(self, statement, parameters, context,
- executemany):
- if not context:
- return
- _received_statement = \
- _process_engine_statement(context.unicode_statement,
- context)
- _received_parameters = context.compiled_parameters
+ if not parameters:
+ _received_parameters = [compiled.construct_params()]
+ else:
+ _received_parameters = [
+ compiled.construct_params(m) for m in parameters]
+
+ return _received_statement, _received_parameters
+
+ def process_statement(self, execute_observed):
+ context = execute_observed.context
+
+ _received_statement, _received_parameters = \
+ self._received_statement(execute_observed)
+ params = self._all_params(context)
+
+ equivalent = self._compare_sql(execute_observed, _received_statement)
+
+ if equivalent:
+ if params is not None:
+ all_params = list(params)
+ all_received = list(_received_parameters)
+ while all_params and all_received:
+ param = dict(all_params.pop(0))
+
+ for idx, received in enumerate(list(all_received)):
+ # do a positive compare only
+ for param_key in param:
+ # a key in param did not match current
+ # 'received'
+ if param_key not in received or \
+ received[param_key] != param[param_key]:
+ break
+ else:
+ # all keys in param matched 'received';
+ # onto next param
+ del all_received[idx]
+ break
+ else:
+ # param did not match any entry
+ # in all_received
+ equivalent = False
+ break
+ if all_params or all_received:
+ equivalent = False
- # TODO: remove this step once all unit tests are migrated, as
- # ExactSQL should really be *exact* SQL
+ if equivalent:
+ self.is_consumed = True
+ self.errormessage = None
+ else:
+ self.errormessage = self._failure_message(params) % {
+ 'received_statement': _received_statement,
+ 'received_parameters': _received_parameters
+ }
- sql = _process_assertion_statement(self.sql, context)
- equivalent = _received_statement == sql
+ def _all_params(self, context):
if self.params:
if util.callable(self.params):
params = self.params(context)
params = self.params
if not isinstance(params, list):
params = [params]
- equivalent = equivalent and params \
- == context.compiled_parameters
+ return params
else:
- params = {}
- self._result = equivalent
- if not self._result:
- self._errmsg = (
- 'Testing for exact statement %r exact params %r, '
- 'received %r with params %r' %
- (sql, params, _received_statement, _received_parameters))
-
+ return None
+
+ def _failure_message(self, expected_params):
+ return (
+ 'Testing for compiled statement %r partial params %r, '
+ 'received %%(received_statement)r with params '
+ '%%(received_parameters)r' % (
+ self.statement, expected_params
+ )
+ )
-class RegexSQL(SQLMatchRule):
+class RegexSQL(CompiledSQL):
def __init__(self, regex, params=None):
SQLMatchRule.__init__(self)
self.regex = re.compile(regex)
self.orig_regex = regex
self.params = params
- def process_cursor_execute(self, statement, parameters, context,
- executemany):
- if not context:
- return
- _received_statement = \
- _process_engine_statement(context.unicode_statement,
- context)
- _received_parameters = context.compiled_parameters
- equivalent = bool(self.regex.match(_received_statement))
- if self.params:
- if util.callable(self.params):
- params = self.params(context)
- else:
- params = self.params
- if not isinstance(params, list):
- params = [params]
-
- # do a positive compare only
-
- for param, received in zip(params, _received_parameters):
- for k, v in param.items():
- if k not in received or received[k] != v:
- equivalent = False
- break
- else:
- params = {}
- self._result = equivalent
- if not self._result:
- self._errmsg = \
- 'Testing for regex %r partial params %r, received %r '\
- 'with params %r' % (self.orig_regex, params,
- _received_statement,
- _received_parameters)
-
-
-class CompiledSQL(SQLMatchRule):
+ def _failure_message(self, expected_params):
+ return (
+ 'Testing for compiled statement ~%r partial params %r, '
+ 'received %%(received_statement)r with params '
+ '%%(received_parameters)r' % (
+ self.orig_regex, expected_params
+ )
+ )
- def __init__(self, statement, params=None):
- SQLMatchRule.__init__(self)
- self.statement = statement
- self.params = params
+ def _compare_sql(self, execute_observed, received_statement):
+ return bool(self.regex.match(received_statement))
- def process_cursor_execute(self, statement, parameters, context,
- executemany):
- if not context:
- return
- from sqlalchemy.schema import _DDLCompiles
- _received_parameters = list(context.compiled_parameters)
- # recompile from the context, using the default dialect
+class DialectSQL(CompiledSQL):
+ def _compile_dialect(self, execute_observed):
+ return execute_observed.context.dialect
- if isinstance(context.compiled.statement, _DDLCompiles):
- compiled = \
- context.compiled.statement.compile(dialect=DefaultDialect())
+ def _received_statement(self, execute_observed):
+ received_stmt, received_params = super(DialectSQL, self).\
+ _received_statement(execute_observed)
+ for real_stmt in execute_observed.statements:
+ if real_stmt.statement == received_stmt:
+ break
else:
- compiled = (
- context.compiled.statement.compile(
- dialect=DefaultDialect(),
- column_keys=context.compiled.column_keys)
- )
- _received_statement = re.sub(r'[\n\t]', '', str(compiled))
- equivalent = self.statement == _received_statement
- if self.params:
- if util.callable(self.params):
- params = self.params(context)
- else:
- params = self.params
- if not isinstance(params, list):
- params = [params]
- else:
- params = list(params)
- all_params = list(params)
- all_received = list(_received_parameters)
- while params:
- param = dict(params.pop(0))
- for k, v in context.compiled.params.items():
- param.setdefault(k, v)
- if param not in _received_parameters:
- equivalent = False
- break
- else:
- _received_parameters.remove(param)
- if _received_parameters:
- equivalent = False
+ raise AssertionError(
+ "Can't locate compiled statement %r in list of "
+ "statements actually invoked" % received_stmt)
+ return received_stmt, execute_observed.context.compiled_parameters
+
+ def _compare_sql(self, execute_observed, received_statement):
+ stmt = re.sub(r'[\n\t]', '', self.statement)
+
+ # convert our comparison statement to have the
+ # paramstyle of the received
+ paramstyle = execute_observed.context.dialect.paramstyle
+ if paramstyle == 'pyformat':
+ stmt = re.sub(
+ r':([\w_]+)', r"%(\1)s", stmt)
else:
- params = {}
- all_params = {}
- all_received = []
- self._result = equivalent
- if not self._result:
- print('Testing for compiled statement %r partial params '
- '%r, received %r with params %r' %
- (self.statement, all_params,
- _received_statement, all_received))
- self._errmsg = (
- 'Testing for compiled statement %r partial params %r, '
- 'received %r with params %r' %
- (self.statement, all_params,
- _received_statement, all_received))
-
- # print self._errmsg
+ # positional params
+ repl = None
+ if paramstyle == 'qmark':
+ repl = "?"
+ elif paramstyle == 'format':
+ repl = r"%s"
+ elif paramstyle == 'numeric':
+ repl = None
+ stmt = re.sub(r':([\w_]+)', repl, stmt)
+
+ return received_statement == stmt
class CountStatements(AssertRule):
self.count = count
self._statement_count = 0
- def process_execute(self, clauseelement, *multiparams, **params):
+ def process_statement(self, execute_observed):
self._statement_count += 1
- def process_cursor_execute(self, statement, parameters, context,
- executemany):
- pass
-
- def is_consumed(self):
- return False
-
- def consume_final(self):
- assert self.count == self._statement_count, \
- 'desired statement count %d does not match %d' \
- % (self.count, self._statement_count)
- return True
+ def no_more_statements(self):
+ if self.count != self._statement_count:
+ assert False, 'desired statement count %d does not match %d' \
+ % (self.count, self._statement_count)
class AllOf(AssertRule):
def __init__(self, *rules):
self.rules = set(rules)
- def process_execute(self, clauseelement, *multiparams, **params):
- for rule in self.rules:
- rule.process_execute(clauseelement, *multiparams, **params)
-
- def process_cursor_execute(self, statement, parameters, context,
- executemany):
- for rule in self.rules:
- rule.process_cursor_execute(statement, parameters, context,
- executemany)
-
- def is_consumed(self):
- if not self.rules:
- return True
+ def process_statement(self, execute_observed):
for rule in list(self.rules):
- if rule.rule_passed(): # a rule passed, move on
- self.rules.remove(rule)
- return len(self.rules) == 0
- return False
-
- def rule_passed(self):
- return self.is_consumed()
-
- def consume_final(self):
- return len(self.rules) == 0
+ rule.errormessage = None
+ rule.process_statement(execute_observed)
+ if rule.is_consumed:
+ self.rules.discard(rule)
+ if not self.rules:
+ self.is_consumed = True
+ break
+ elif not rule.errormessage:
+ # rule is not done yet
+ self.errormessage = None
+ break
+ else:
+ self.errormessage = list(self.rules)[0].errormessage
class Or(AllOf):
- def __init__(self, *rules):
- self.rules = set(rules)
- self._consume_final = False
-
- def is_consumed(self):
- if not self.rules:
- return True
- for rule in list(self.rules):
- if rule.rule_passed(): # a rule passed
- self._consume_final = True
- return True
- return False
-
- def consume_final(self):
- assert self._consume_final, "Unsatisified rules remain"
-
-
-def _process_engine_statement(query, context):
- if util.jython:
-
- # oracle+zxjdbc passes a PyStatement when returning into
-
- query = str(query)
- if context.engine.name == 'mssql' \
- and query.endswith('; select scope_identity()'):
- query = query[:-25]
- query = re.sub(r'\n', '', query)
- return query
+ def process_statement(self, execute_observed):
+ for rule in self.rules:
+ rule.process_statement(execute_observed)
+ if rule.is_consumed:
+ self.is_consumed = True
+ break
+ else:
+ self.errormessage = list(self.rules)[0].errormessage
-def _process_assertion_statement(query, context):
- paramstyle = context.dialect.paramstyle
- if paramstyle == 'named':
- pass
- elif paramstyle == 'pyformat':
- query = re.sub(r':([\w_]+)', r"%(\1)s", query)
- else:
- # positional params
- repl = None
- if paramstyle == 'qmark':
- repl = "?"
- elif paramstyle == 'format':
- repl = r"%s"
- elif paramstyle == 'numeric':
- repl = None
- query = re.sub(r':([\w_]+)', repl, query)
- return query
-
-
-class SQLExecuteObserved(
- collections.namedtuple(
- "SQLExecuteObserved", ["clauseelement", "multiparams", "params"])
-):
- def process(self, rules):
- if rules is not None:
- if not rules:
- assert False, \
- 'All rules have been exhausted, but further '\
- 'statements remain'
- rule = rules[0]
- rule.process_execute(
- self.clauseelement, *self.multiparams, **self.params)
- if rule.is_consumed():
- rules.pop(0)
+class SQLExecuteObserved(object):
+ def __init__(self, context, clauseelement, multiparams, params):
+ self.context = context
+ self.clauseelement = clauseelement
+ self.parameters = _distill_params(multiparams, params)
+ self.statements = []
class SQLCursorExecuteObserved(
"SQLCursorExecuteObserved",
["statement", "parameters", "context", "executemany"])
):
- def process(self, rules):
- if rules:
- rule = rules[0]
- rule.process_cursor_execute(
- self.statement, self.parameters,
- self.context, self.executemany)
+ pass
class SQLAsserter(object):
self.accumulated = []
def _close(self):
- # safety feature in case event.remove
- # goes haywire
self._final = self.accumulated
del self.accumulated
def assert_(self, *rules):
rules = list(rules)
- for observed in self._final:
- observed.process(rules)
+ observed = list(self._final)
+
+ while observed and rules:
+ rule = rules[0]
+ rule.process_statement(observed[0])
+ if rule.is_consumed:
+ rules.pop(0)
+ elif rule.errormessage:
+ assert False, rule.errormessage
- for rule in rules:
- if not rule.consume_final():
- assert False, \
- 'All statements are complete, but pending '\
- 'assertion rules remain'
+ if rule.consume_statement:
+ observed.pop(0)
+
+ if not observed and rules:
+ rules[0].no_more_statements()
+ elif not rules and observed:
+ assert False, "Additional SQL statements remain"
@contextlib.contextmanager
def assert_engine(engine):
asserter = SQLAsserter()
- @event.listens_for(engine, "after_execute")
- def execute(conn, clauseelement, multiparams, params, result):
- asserter.accumulated.append(
- SQLExecuteObserved(
- clauseelement, multiparams, params))
+ orig = []
+
+ @event.listens_for(engine, "before_execute")
+ def connection_execute(conn, clauseelement, multiparams, params):
+ # grab the original statement + params before any cursor
+ # execution
+ orig[:] = clauseelement, multiparams, params
@event.listens_for(engine, "after_cursor_execute")
def cursor_execute(conn, cursor, statement, parameters,
context, executemany):
- asserter.accumulated.append(
+ if not context:
+ return
+ # then grab real cursor statements and associate them all
+ # around a single context
+ if asserter.accumulated and \
+ asserter.accumulated[-1].context is context:
+ obs = asserter.accumulated[-1]
+ else:
+ obs = SQLExecuteObserved(context, orig[0], orig[1], orig[2])
+ asserter.accumulated.append(obs)
+ obs.statements.append(
SQLCursorExecuteObserved(
- statement, parameters, context, executemany))
+ statement, parameters, context, executemany)
+ )
try:
yield asserter
finally:
- asserter._close()
event.remove(engine, "after_cursor_execute", cursor_execute)
- event.remove(engine, "after_execute", execute)
+ event.remove(engine, "before_execute", connection_execute)
+ asserter._close()
def sql_count_(self, count, fn):
self.assert_sql_count(self.bind, fn, count)
- def sql_eq_(self, callable_, statements, with_sequences=None):
- self.assert_sql(self.bind,
- callable_, statements, with_sequences)
+ def sql_eq_(self, callable_, statements):
+ self.assert_sql(self.bind, callable_, statements)
@classmethod
def _load_fixtures(cls):
from sqlalchemy import testing
from sqlalchemy.util import ue
from sqlalchemy import util
+from sqlalchemy.testing.assertsql import CursorSQL
finally:
meta.drop_all()
-from sqlalchemy.testing.assertsql import ExactSQL
class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
__only_on__ = 'mssql'
con.execute("""drop trigger paj""")
meta.drop_all()
- @testing.fails_on_everything_except('mssql+pyodbc', 'pyodbc-specific feature')
@testing.provide_metadata
def test_disable_scope_identity(self):
engine = engines.testing_engine(options={"use_scope_identity": False})
metadata = self.metadata
- metadata.bind = engine
- t1 = Table('t1', metadata,
- Column('id', Integer, primary_key=True),
- implicit_returning=False
+ t1 = Table(
+ 't1', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(50)),
+ implicit_returning=False
)
- metadata.create_all()
+ metadata.create_all(engine)
+
+ with self.sql_execution_asserter(engine) as asserter:
+ engine.execute(t1.insert(), {"data": "somedata"})
+
+ asserter.assert_(
+ CursorSQL(
+ "INSERT INTO t1 (data) VALUES (?)",
+ ("somedata", )
+ ),
+ CursorSQL("SELECT @@identity AS lastrowid"),
+ )
+
+ @testing.provide_metadata
+ def test_enable_scope_identity(self):
+ engine = engines.testing_engine(options={"use_scope_identity": True})
+ metadata = self.metadata
+ t1 = Table(
+ 't1', metadata,
+ Column('id', Integer, primary_key=True),
+ implicit_returning=False
+ )
+ metadata.create_all(engine)
+
+ with self.sql_execution_asserter(engine) as asserter:
+ engine.execute(t1.insert())
+
+ # even with pyodbc, we don't embed the scope identity on a
+ # DEFAULT VALUES insert
+ asserter.assert_(
+ CursorSQL("INSERT INTO t1 DEFAULT VALUES"),
+ CursorSQL("SELECT scope_identity() AS lastrowid"),
+ )
+
+ @testing.only_on('mssql+pyodbc')
+ @testing.provide_metadata
+ def test_embedded_scope_identity(self):
+ engine = engines.testing_engine(options={"use_scope_identity": True})
+ metadata = self.metadata
+ t1 = Table(
+ 't1', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('data', String(50)),
+ implicit_returning=False
+ )
+ metadata.create_all(engine)
+
+ with self.sql_execution_asserter(engine) as asserter:
+ engine.execute(t1.insert(), {'data': 'somedata'})
- self.assert_sql_execution(
- testing.db,
- lambda: engine.execute(t1.insert()),
- ExactSQL("INSERT INTO t1 DEFAULT VALUES"),
- # we don't have an event for
- # "SELECT @@IDENTITY" part here.
- # this will be in 0.8 with #2459
+ # pyodbc-specific system
+ asserter.assert_(
+ CursorSQL(
+ "INSERT INTO t1 (data) VALUES (?); select scope_identity()",
+ ("somedata", )
+ ),
)
- assert not engine.dialect.use_scope_identity
def test_insertid_schema(self):
meta = MetaData(testing.db)
Sequence, ForeignKey, text, select, func, extract, literal_column, \
tuple_, DateTime, Time, literal, and_, Date, or_
from sqlalchemy.testing import engines, fixtures
+from sqlalchemy.testing.assertsql import DialectSQL, CursorSQL
from sqlalchemy import testing
from sqlalchemy import exc
from sqlalchemy.dialects import postgresql
engines.testing_engine(options={'implicit_returning': False})
metadata.bind = self.engine
- def go():
+ with self.sql_execution_asserter(self.engine) as asserter:
# execute with explicit id
table.insert(inline=True).execute({'data': 'd8'})
- # note that the test framework doesn't capture the "preexecute"
- # of a seqeuence or default. we just see it in the bind params.
+ asserter.assert_(
+ DialectSQL(
+ 'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 30, 'data': 'd1'}),
+ DialectSQL(
+ 'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 1, 'data': 'd2'}),
+ DialectSQL(
+ 'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
+ DialectSQL(
+ 'INSERT INTO testtable (data) VALUES (:data)',
+ [{'data': 'd5'}, {'data': 'd6'}]),
+ DialectSQL(
+ 'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 33, 'data': 'd7'}]),
+ DialectSQL(
+ 'INSERT INTO testtable (data) VALUES (:data)',
+ [{'data': 'd8'}]),
+ )
+
+ eq_(
+ table.select().execute().fetchall(),
+ [
+ (30, 'd1'),
+ (1, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (2, 'd5'),
+ (3, 'd6'),
+ (33, 'd7'),
+ (4, 'd8'),
+ ]
+ )
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 30, 'data': 'd1'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 1, 'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ('INSERT INTO testtable (data) VALUES (:data)',
- [{'data': 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 33, 'data': 'd7'}]),
- ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (1, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (2, 'd5'),
- (3, 'd6'),
- (33, 'd7'),
- (4, 'd8'),
- ]
table.delete().execute()
# test the same series of events using a reflected version of
m2 = MetaData(self.engine)
table = Table(table.name, m2, autoload=True)
- def go():
+ with self.sql_execution_asserter(self.engine) as asserter:
table.insert().execute({'id': 30, 'data': 'd1'})
r = table.insert().execute({'data': 'd2'})
assert r.inserted_primary_key == [5]
table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
table.insert(inline=True).execute({'data': 'd8'})
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 30, 'data': 'd1'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- {'id': 5, 'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ('INSERT INTO testtable (data) VALUES (:data)',
- [{'data': 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
- [{'id': 33, 'data': 'd7'}]),
- ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (5, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (6, 'd5'),
- (7, 'd6'),
- (33, 'd7'),
- (8, 'd8'),
- ]
+ asserter.assert_(
+ DialectSQL(
+ 'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 30, 'data': 'd1'}),
+ DialectSQL(
+ 'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ {'id': 5, 'data': 'd2'}),
+ DialectSQL(
+ 'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
+ DialectSQL(
+ 'INSERT INTO testtable (data) VALUES (:data)',
+ [{'data': 'd5'}, {'data': 'd6'}]),
+ DialectSQL(
+ 'INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ [{'id': 33, 'data': 'd7'}]),
+ DialectSQL(
+ 'INSERT INTO testtable (data) VALUES (:data)',
+ [{'data': 'd8'}]),
+ )
+ eq_(
+ table.select().execute().fetchall(),
+ [
+ (30, 'd1'),
+ (5, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (6, 'd5'),
+ (7, 'd6'),
+ (33, 'd7'),
+ (8, 'd8'),
+ ]
+ )
table.delete().execute()
def _assert_data_autoincrement_returning(self, table):
engines.testing_engine(options={'implicit_returning': True})
metadata.bind = self.engine
- def go():
+ with self.sql_execution_asserter(self.engine) as asserter:
# execute with explicit id
table.insert(inline=True).execute({'data': 'd8'})
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ asserter.assert_(
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
{'id': 30, 'data': 'd1'}),
- ('INSERT INTO testtable (data) VALUES (:data) RETURNING '
+ DialectSQL('INSERT INTO testtable (data) VALUES (:data) RETURNING '
'testtable.id', {'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
[{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ('INSERT INTO testtable (data) VALUES (:data)',
+ DialectSQL('INSERT INTO testtable (data) VALUES (:data)',
[{'data': 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
[{'id': 33, 'data': 'd7'}]),
- ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (1, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (2, 'd5'),
- (3, 'd6'),
- (33, 'd7'),
- (4, 'd8'),
- ]
+ DialectSQL('INSERT INTO testtable (data) VALUES (:data)',
+ [{'data': 'd8'}]),
+ )
+
+ eq_(
+ table.select().execute().fetchall(),
+ [
+ (30, 'd1'),
+ (1, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (2, 'd5'),
+ (3, 'd6'),
+ (33, 'd7'),
+ (4, 'd8'),
+ ]
+ )
table.delete().execute()
# test the same series of events using a reflected version of
m2 = MetaData(self.engine)
table = Table(table.name, m2, autoload=True)
- def go():
+ with self.sql_execution_asserter(self.engine) as asserter:
table.insert().execute({'id': 30, 'data': 'd1'})
r = table.insert().execute({'data': 'd2'})
assert r.inserted_primary_key == [5]
table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
table.insert(inline=True).execute({'data': 'd8'})
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ asserter.assert_(
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
{'id': 30, 'data': 'd1'}),
- ('INSERT INTO testtable (data) VALUES (:data) RETURNING '
+ DialectSQL('INSERT INTO testtable (data) VALUES (:data) RETURNING '
'testtable.id', {'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
[{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ('INSERT INTO testtable (data) VALUES (:data)',
+ DialectSQL('INSERT INTO testtable (data) VALUES (:data)',
[{'data': 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
[{'id': 33, 'data': 'd7'}]),
- ('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (5, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (6, 'd5'),
- (7, 'd6'),
- (33, 'd7'),
- (8, 'd8'),
- ]
+ DialectSQL('INSERT INTO testtable (data) VALUES (:data)', [{'data': 'd8'}]),
+ )
+ eq_(
+ table.select().execute().fetchall(),
+ [
+ (30, 'd1'),
+ (5, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (6, 'd5'),
+ (7, 'd6'),
+ (33, 'd7'),
+ (8, 'd8'),
+ ]
+ )
table.delete().execute()
def _assert_data_with_sequence(self, table, seqname):
engines.testing_engine(options={'implicit_returning': False})
metadata.bind = self.engine
- def go():
+ with self.sql_execution_asserter(self.engine) as asserter:
table.insert().execute({'id': 30, 'data': 'd1'})
table.insert().execute({'data': 'd2'})
table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
table.insert(inline=True).execute({'data': 'd8'})
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ asserter.assert_(
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
{'id': 30, 'data': 'd1'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ CursorSQL("select nextval('my_seq')"),
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
{'id': 1, 'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
[{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
+ DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
":data)" % seqname, [{'data': 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
[{'id': 33, 'data': 'd7'}]),
- ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
+ DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
":data)" % seqname, [{'data': 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (1, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (2, 'd5'),
- (3, 'd6'),
- (33, 'd7'),
- (4, 'd8'),
- ]
+ )
+ eq_(
+ table.select().execute().fetchall(),
+ [
+ (30, 'd1'),
+ (1, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (2, 'd5'),
+ (3, 'd6'),
+ (33, 'd7'),
+ (4, 'd8'),
+ ]
+ )
# cant test reflection here since the Sequence must be
# explicitly specified
engines.testing_engine(options={'implicit_returning': True})
metadata.bind = self.engine
- def go():
+ with self.sql_execution_asserter(self.engine) as asserter:
table.insert().execute({'id': 30, 'data': 'd1'})
table.insert().execute({'data': 'd2'})
table.insert().execute({'id': 31, 'data': 'd3'}, {'id': 32,
table.insert(inline=True).execute({'id': 33, 'data': 'd7'})
table.insert(inline=True).execute({'data': 'd8'})
- self.assert_sql(self.engine, go, [], with_sequences=[
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ asserter.assert_(
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
{'id': 30, 'data': 'd1'}),
- ("INSERT INTO testtable (id, data) VALUES "
+ DialectSQL("INSERT INTO testtable (id, data) VALUES "
"(nextval('my_seq'), :data) RETURNING testtable.id",
{'data': 'd2'}),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
[{'id': 31, 'data': 'd3'}, {'id': 32, 'data': 'd4'}]),
- ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
+ DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
":data)" % seqname, [{'data': 'd5'}, {'data': 'd6'}]),
- ('INSERT INTO testtable (id, data) VALUES (:id, :data)',
+ DialectSQL('INSERT INTO testtable (id, data) VALUES (:id, :data)',
[{'id': 33, 'data': 'd7'}]),
- ("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
+ DialectSQL("INSERT INTO testtable (id, data) VALUES (nextval('%s'), "
":data)" % seqname, [{'data': 'd8'}]),
- ])
- assert table.select().execute().fetchall() == [
- (30, 'd1'),
- (1, 'd2'),
- (31, 'd3'),
- (32, 'd4'),
- (2, 'd5'),
- (3, 'd6'),
- (33, 'd7'),
- (4, 'd8'),
- ]
+ )
+
+ eq_(
+ table.select().execute().fetchall(),
+ [
+ (30, 'd1'),
+ (1, 'd2'),
+ (31, 'd3'),
+ (32, 'd4'),
+ (2, 'd5'),
+ (3, 'd6'),
+ (33, 'd7'),
+ (4, 'd8'),
+ ]
+ )
# cant test reflection here since the Sequence must be
# explicitly specified
try:
self.assert_sql(
- testing.db, go, [], with_sequences=[
+ testing.db, go, [
("CREATE TABLE foo (\tbar "
"VARCHAR(5), \tCONSTRAINT myenum CHECK "
"(bar IN ('one', 'two', 'three')))", {})])
try:
self.assert_sql(
- engine, go, [], with_sequences=[
- ("CREATE TABLE foo (\tbar "
- "VARCHAR(5), \tCONSTRAINT myenum CHECK "
+ engine, go, [
+ ("CREATE TABLE foo (bar "
+ "VARCHAR(5), CONSTRAINT myenum CHECK "
"(bar IN ('one', 'two', 'three')))", {})])
finally:
metadata.drop_all(engine)
from sqlalchemy.orm import mapper, relationship, backref, \
create_session, sessionmaker
from sqlalchemy.testing import eq_
-from sqlalchemy.testing.assertsql import RegexSQL, ExactSQL, CompiledSQL, AllOf
+from sqlalchemy.testing.assertsql import RegexSQL, CompiledSQL, AllOf
from sqlalchemy.testing import fixtures
RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}),
RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}),
RegexSQL("^INSERT INTO ball", lambda c: {'person_id':p.id, 'data':'some data'}),
- ExactSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id "
+ CompiledSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id "
"WHERE person.id = :person_id",
lambda ctx:{'favorite_ball_id':p.favorite.id, 'person_id':p.id}
),
self.assert_sql_execution(
testing.db,
sess.flush,
- ExactSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id "
+ CompiledSQL("UPDATE person SET favorite_ball_id=:favorite_ball_id "
"WHERE person.id = :person_id",
lambda ctx: {'person_id': p.id, 'favorite_ball_id': None}),
- ExactSQL("DELETE FROM ball WHERE ball.id = :id", None), # lambda ctx:[{'id': 1L}, {'id': 4L}, {'id': 3L}, {'id': 2L}])
- ExactSQL("DELETE FROM person WHERE person.id = :id", lambda ctx:[{'id': p.id}])
+ CompiledSQL("DELETE FROM ball WHERE ball.id = :id", None), # lambda ctx:[{'id': 1L}, {'id': 4L}, {'id': 3L}, {'id': 2L}])
+ CompiledSQL("DELETE FROM person WHERE person.id = :id", lambda ctx:[{'id': p.id}])
)
def test_post_update_backref(self):
assert create_session().query(User).filter(User.id == 27). \
first() is None
- @testing.only_on('sqlite', 'testing execution but db-specific syntax')
def test_limit_offset_applies(self):
"""Test that the expected LIMIT/OFFSET is applied for slices.
testing.db, lambda: q[:20], [
(
"SELECT users.id AS users_id, users.name "
- "AS users_name FROM users LIMIT :param_1 OFFSET :param_2",
- {'param_1': 20, 'param_2': 0})])
+ "AS users_name FROM users LIMIT :param_1",
+ {'param_1': 20})])
self.assert_sql(
testing.db, lambda: q[5:], [
(
"SELECT users.id AS users_id, users.name "
- "AS users_name FROM users LIMIT :param_1 OFFSET :param_2",
- {'param_1': -1, 'param_2': 5})])
+ "AS users_name FROM users LIMIT -1 OFFSET :param_1",
+ {'param_1': 5})])
self.assert_sql(testing.db, lambda: q[2:2], [])
from sqlalchemy.engine import default
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
-from sqlalchemy.testing.assertsql import AllOf, RegexSQL, ExactSQL, CompiledSQL
+from sqlalchemy.testing.assertsql import AllOf, RegexSQL, CompiledSQL
from sqlalchemy.sql import table, column
lambda: events.create(testing.db),
RegexSQL("^CREATE TABLE events"),
AllOf(
- ExactSQL('CREATE UNIQUE INDEX ix_events_name ON events '
+ CompiledSQL('CREATE UNIQUE INDEX ix_events_name ON events '
'(name)'),
- ExactSQL('CREATE INDEX ix_events_location ON events '
+ CompiledSQL('CREATE INDEX ix_events_location ON events '
'(location)'),
- ExactSQL('CREATE UNIQUE INDEX sport_announcer ON events '
+ CompiledSQL('CREATE UNIQUE INDEX sport_announcer ON events '
'(sport, announcer)'),
- ExactSQL('CREATE INDEX idx_winners ON events (winner)')
+ CompiledSQL('CREATE INDEX idx_winners ON events (winner)'),
)
)
lambda: t.create(testing.db),
CompiledSQL('CREATE TABLE sometable (id INTEGER NOT NULL, '
'data VARCHAR(50), PRIMARY KEY (id))'),
- ExactSQL('CREATE INDEX myindex ON sometable (data DESC)')
+ CompiledSQL('CREATE INDEX myindex ON sometable (data DESC)')
)