From: Diana Clarke Date: Sat, 30 Mar 2013 04:33:07 +0000 (-0400) Subject: starting on the update tests next, pep8 pass first (see #2630) X-Git-Tag: rel_0_8_1~25^2^2~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=192bddc7208f2c7f4d5b23a1c5395c3673ce9bb6;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git starting on the update tests next, pep8 pass first (see #2630) --- diff --git a/test/sql/test_labels.py b/test/sql/test_labels.py index d7cb8db4a3..fd45d303f7 100644 --- a/test/sql/test_labels.py +++ b/test/sql/test_labels.py @@ -1,19 +1,15 @@ - -from sqlalchemy import exc as exceptions -from sqlalchemy import testing -from sqlalchemy.testing import engines -from sqlalchemy import select, MetaData, Integer, or_ +from sqlalchemy import exc as exceptions, select, MetaData, Integer, or_ from sqlalchemy.engine import default from sqlalchemy.sql import table, column -from sqlalchemy.testing import assert_raises, eq_ -from sqlalchemy.testing import fixtures, AssertsCompiledSQL -from sqlalchemy.testing.engines import testing_engine +from sqlalchemy.testing import AssertsCompiledSQL, assert_raises, engines,\ + fixtures from sqlalchemy.testing.schema import Table, Column IDENT_LENGTH = 29 class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = 'DefaultDialect' table1 = table('some_large_named_table', column('this_is_the_primarykey_column'), @@ -25,9 +21,6 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL): column('this_is_the_data_column') ) - __dialect__ = 'DefaultDialect' - - def _length_fixture(self, length=IDENT_LENGTH, positional=False): dialect = default.DefaultDialect() dialect.max_identifier_length = length @@ -60,7 +53,7 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL): ta = table2.alias() on = table1.c.this_is_the_data_column == ta.c.this_is_the_data_column self.assert_compile( - select([table1, ta]).select_from(table1.join(ta, on)).\ + select([table1, ta]).select_from(table1.join(ta, on)). where(ta.c.this_is_the_data_column == 'data3'), 'SELECT ' 'some_large_named_table.this_is_the_primarykey_column, ' @@ -87,16 +80,9 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL): t = Table('this_name_is_too_long_for_what_were_doing_in_this_test', m, Column('foo', Integer)) eng = self._engine_fixture() - for meth in ( - t.create, - t.drop, - m.create_all, - m.drop_all - ): - assert_raises( - exceptions.IdentifierError, - meth, eng - ) + methods = (t.create, t.drop, m.create_all, m.drop_all) + for meth in methods: + assert_raises(exceptions.IdentifierError, meth, eng) def _assert_labeled_table1_select(self, s): table1 = self.table1 @@ -263,7 +249,9 @@ class MaxIdentTest(fixtures.TestBase, AssertsCompiledSQL): dialect=self._length_fixture(positional=True) ) + class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = 'DefaultDialect' table1 = table('some_large_named_table', column('this_is_the_primarykey_column'), @@ -275,8 +263,6 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL): column('this_is_the_data_column') ) - __dialect__ = 'DefaultDialect' - def test_adjustable_1(self): table1 = self.table1 q = table1.select( @@ -404,27 +390,27 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL): 'AS _1', dialect=compile_dialect) - def test_adjustable_result_schema_column_1(self): table1 = self.table1 + q = table1.select( table1.c.this_is_the_primarykey_column == 4).apply_labels().\ alias('foo') - dialect = default.DefaultDialect(label_length=10) + dialect = default.DefaultDialect(label_length=10) compiled = q.compile(dialect=dialect) + assert set(compiled.result_map['some_2'][1]).issuperset([ - table1.c.this_is_the_data_column, - 'some_large_named_table_this_is_the_data_column', - 'some_2' + table1.c.this_is_the_data_column, + 'some_large_named_table_this_is_the_data_column', + 'some_2' + ]) - ]) assert set(compiled.result_map['some_1'][1]).issuperset([ - table1.c.this_is_the_primarykey_column, - 'some_large_named_table_this_is_the_primarykey_column', - 'some_1' - - ]) + table1.c.this_is_the_primarykey_column, + 'some_large_named_table_this_is_the_primarykey_column', + 'some_1' + ]) def test_adjustable_result_schema_column_2(self): table1 = self.table1 @@ -434,20 +420,17 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL): x = select([q]) dialect = default.DefaultDialect(label_length=10) - compiled = x.compile(dialect=dialect) + assert set(compiled.result_map['this_2'][1]).issuperset([ - q.corresponding_column(table1.c.this_is_the_data_column), - 'this_is_the_data_column', - 'this_2' + q.corresponding_column(table1.c.this_is_the_data_column), + 'this_is_the_data_column', + 'this_2']) - ]) assert set(compiled.result_map['this_1'][1]).issuperset([ - q.corresponding_column(table1.c.this_is_the_primarykey_column), - 'this_is_the_primarykey_column', - 'this_1' - - ]) + q.corresponding_column(table1.c.this_is_the_primarykey_column), + 'this_is_the_primarykey_column', + 'this_1']) def test_table_plus_column_exceeds_length(self): """test that the truncation only occurs when tablename + colname are @@ -490,7 +473,6 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL): 'other_thirty_characters_table_.thirty_characters_table_id', dialect=compile_dialect) - def test_colnames_longer_than_labels_lowercase(self): t1 = table('a', column('abcde')) self._test_colnames_longer_than_labels(t1) @@ -507,30 +489,18 @@ class LabelLengthTest(fixtures.TestBase, AssertsCompiledSQL): # 'abcde' is longer than 4, but rendered as itself # needs to have all characters s = select([a1]) - self.assert_compile( - select([a1]), - "SELECT asdf.abcde FROM a AS asdf", - dialect=dialect - ) + self.assert_compile(select([a1]), + 'SELECT asdf.abcde FROM a AS asdf', + dialect=dialect) compiled = s.compile(dialect=dialect) assert set(compiled.result_map['abcde'][1]).issuperset([ - 'abcde', - a1.c.abcde, - 'abcde' - ]) + 'abcde', a1.c.abcde, 'abcde']) # column still there, but short label s = select([a1]).apply_labels() - self.assert_compile( - s, - "SELECT asdf.abcde AS _1 FROM a AS asdf", - dialect=dialect - ) + self.assert_compile(s, + 'SELECT asdf.abcde AS _1 FROM a AS asdf', + dialect=dialect) compiled = s.compile(dialect=dialect) assert set(compiled.result_map['_1'][1]).issuperset([ - 'asdf_abcde', - a1.c.abcde, - '_1' - ]) - - + 'asdf_abcde', a1.c.abcde, '_1']) diff --git a/test/sql/test_update.py b/test/sql/test_update.py index b46489cd24..86fe3934c8 100644 --- a/test/sql/test_update.py +++ b/test/sql/test_update.py @@ -1,55 +1,48 @@ -from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, AssertsCompiledSQL -import datetime from sqlalchemy import * -from sqlalchemy import exc, sql, util -from sqlalchemy.engine import default, base from sqlalchemy import testing -from sqlalchemy.testing import fixtures -from sqlalchemy.testing.schema import Table, Column from sqlalchemy.dialects import mysql +from sqlalchemy.testing import AssertsCompiledSQL, eq_, fixtures +from sqlalchemy.testing.schema import Table, Column + class _UpdateFromTestBase(object): @classmethod def define_tables(cls, metadata): Table('users', metadata, Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), - Column('name', String(30), nullable=False), - ) + test_needs_autoincrement=True), + Column('name', String(30), nullable=False)) Table('addresses', metadata, Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), + test_needs_autoincrement=True), Column('user_id', None, ForeignKey('users.id')), Column('name', String(30), nullable=False), - Column('email_address', String(50), nullable=False), - ) + Column('email_address', String(50), nullable=False)) - Table("dingalings", metadata, + Table('dingalings', metadata, Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), + test_needs_autoincrement=True), Column('address_id', None, ForeignKey('addresses.id')), - Column('data', String(30)), - ) + Column('data', String(30))) @classmethod def fixtures(cls): return dict( - users = ( + users=( ('id', 'name'), (7, 'jack'), (8, 'ed'), (9, 'fred'), (10, 'chuck') ), - addresses = ( ('id', 'user_id', 'name', 'email_address'), - (1, 7, 'x', "jack@bean.com"), - (2, 8, 'x', "ed@wood.com"), - (3, 8, 'x', "ed@bettyboop.com"), - (4, 8, 'x', "ed@lala.com"), - (5, 9, 'x', "fred@fred.com") + (1, 7, 'x', 'jack@bean.com'), + (2, 8, 'x', 'ed@wood.com'), + (3, 8, 'x', 'ed@bettyboop.com'), + (4, 8, 'x', 'ed@lala.com'), + (5, 9, 'x', 'fred@fred.com') ), dingalings = ( ('id', 'address_id', 'data'), @@ -59,288 +52,296 @@ class _UpdateFromTestBase(object): ) -class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest, AssertsCompiledSQL): +class UpdateFromCompileTest(_UpdateFromTestBase, fixtures.TablesTest, + AssertsCompiledSQL): __dialect__ = 'default' run_create_tables = run_inserts = run_deletes = None def test_render_table(self): users, addresses = self.tables.users, self.tables.addresses + self.assert_compile( - users.update().\ - values(name='newname').\ - where(users.c.id==addresses.c.user_id).\ - where(addresses.c.email_address=='e1'), - "UPDATE users SET name=:name FROM addresses " - "WHERE users.id = addresses.user_id AND " - "addresses.email_address = :email_address_1", - checkparams={u'email_address_1': 'e1', 'name': 'newname'} - ) + users.update(). + values(name='newname'). + where(users.c.id == addresses.c.user_id). + where(addresses.c.email_address == 'e1'), + 'UPDATE users ' + 'SET name=:name FROM addresses ' + 'WHERE ' + 'users.id = addresses.user_id AND ' + 'addresses.email_address = :email_address_1', + checkparams={u'email_address_1': 'e1', 'name': 'newname'}) def test_render_multi_table(self): - users, addresses, dingalings = \ - self.tables.users, \ - self.tables.addresses, \ - self.tables.dingalings + users = self.tables.users + addresses = self.tables.addresses + dingalings = self.tables.dingalings + + checkparams = { + u'email_address_1': 'e1', + u'id_1': 2, + 'name': 'newname' + } + self.assert_compile( - users.update().\ - values(name='newname').\ - where(users.c.id==addresses.c.user_id).\ - where(addresses.c.email_address=='e1').\ - where(addresses.c.id==dingalings.c.address_id).\ - where(dingalings.c.id==2), - "UPDATE users SET name=:name FROM addresses, " - "dingalings WHERE users.id = addresses.user_id " - "AND addresses.email_address = :email_address_1 " - "AND addresses.id = dingalings.address_id AND " - "dingalings.id = :id_1", - checkparams={u'email_address_1': 'e1', u'id_1': 2, - 'name': 'newname'} - ) + users.update(). + values(name='newname'). + where(users.c.id == addresses.c.user_id). + where(addresses.c.email_address == 'e1'). + where(addresses.c.id == dingalings.c.address_id). + where(dingalings.c.id == 2), + 'UPDATE users ' + 'SET name=:name ' + 'FROM addresses, dingalings ' + 'WHERE ' + 'users.id = addresses.user_id AND ' + 'addresses.email_address = :email_address_1 AND ' + 'addresses.id = dingalings.address_id AND ' + 'dingalings.id = :id_1', + checkparams=checkparams) def test_render_table_mysql(self): users, addresses = self.tables.users, self.tables.addresses + self.assert_compile( - users.update().\ - values(name='newname').\ - where(users.c.id==addresses.c.user_id).\ - where(addresses.c.email_address=='e1'), - "UPDATE users, addresses SET users.name=%s " - "WHERE users.id = addresses.user_id AND " - "addresses.email_address = %s", + users.update(). + values(name='newname'). + where(users.c.id == addresses.c.user_id). + where(addresses.c.email_address == 'e1'), + 'UPDATE users, addresses ' + 'SET users.name=%s ' + 'WHERE ' + 'users.id = addresses.user_id AND ' + 'addresses.email_address = %s', checkparams={u'email_address_1': 'e1', 'name': 'newname'}, - dialect=mysql.dialect() - ) + dialect=mysql.dialect()) def test_render_subquery(self): users, addresses = self.tables.users, self.tables.addresses - subq = select([addresses.c.id, - addresses.c.user_id, - addresses.c.email_address]).\ - where(addresses.c.id==7).alias() + + checkparams = { + u'email_address_1': 'e1', + u'id_1': 7, + 'name': 'newname' + } + + cols = [ + addresses.c.id, + addresses.c.user_id, + addresses.c.email_address + ] + + subq = select(cols).where(addresses.c.id == 7).alias() self.assert_compile( - users.update().\ - values(name='newname').\ - where(users.c.id==subq.c.user_id).\ - where(subq.c.email_address=='e1'), - "UPDATE users SET name=:name FROM " - "(SELECT addresses.id AS id, addresses.user_id " - "AS user_id, addresses.email_address AS " - "email_address FROM addresses WHERE addresses.id = " - ":id_1) AS anon_1 WHERE users.id = anon_1.user_id " - "AND anon_1.email_address = :email_address_1", - checkparams={u'email_address_1': 'e1', - u'id_1': 7, 'name': 'newname'} - ) + users.update(). + values(name='newname'). + where(users.c.id == subq.c.user_id). + where(subq.c.email_address == 'e1'), + 'UPDATE users ' + 'SET name=:name FROM (' + 'SELECT ' + 'addresses.id AS id, ' + 'addresses.user_id AS user_id, ' + 'addresses.email_address AS email_address ' + 'FROM addresses ' + 'WHERE addresses.id = :id_1' + ') AS anon_1 ' + 'WHERE users.id = anon_1.user_id ' + 'AND anon_1.email_address = :email_address_1', + checkparams=checkparams) + class UpdateFromRoundTripTest(_UpdateFromTestBase, fixtures.TablesTest): @testing.requires.update_from def test_exec_two_table(self): users, addresses = self.tables.users, self.tables.addresses + testing.db.execute( - addresses.update().\ - values(email_address=users.c.name).\ - where(users.c.id==addresses.c.user_id).\ - where(users.c.name=='ed') - ) - eq_( - testing.db.execute( - addresses.select().\ - order_by(addresses.c.id)).fetchall(), - [ - (1, 7, 'x', "jack@bean.com"), - (2, 8, 'x', "ed"), - (3, 8, 'x', "ed"), - (4, 8, 'x', "ed"), - (5, 9, 'x', "fred@fred.com") - ] - ) + addresses.update(). + values(email_address=users.c.name). + where(users.c.id == addresses.c.user_id). + where(users.c.name == 'ed')) + + expected = [ + (1, 7, 'x', 'jack@bean.com'), + (2, 8, 'x', 'ed'), + (3, 8, 'x', 'ed'), + (4, 8, 'x', 'ed'), + (5, 9, 'x', 'fred@fred.com')] + self._assert_addresses(addresses, expected) @testing.requires.update_from def test_exec_two_table_plus_alias(self): users, addresses = self.tables.users, self.tables.addresses - a1 = addresses.alias() + a1 = addresses.alias() testing.db.execute( - addresses.update().\ - values(email_address=users.c.name).\ - where(users.c.id==a1.c.user_id).\ - where(users.c.name=='ed').\ - where(a1.c.id==addresses.c.id) - ) - eq_( - testing.db.execute( - addresses.select().\ - order_by(addresses.c.id)).fetchall(), - [ - (1, 7, 'x', "jack@bean.com"), - (2, 8, 'x', "ed"), - (3, 8, 'x', "ed"), - (4, 8, 'x', "ed"), - (5, 9, 'x', "fred@fred.com") - ] + addresses.update(). + values(email_address=users.c.name). + where(users.c.id == a1.c.user_id). + where(users.c.name == 'ed'). + where(a1.c.id == addresses.c.id) ) + expected = [ + (1, 7, 'x', 'jack@bean.com'), + (2, 8, 'x', 'ed'), + (3, 8, 'x', 'ed'), + (4, 8, 'x', 'ed'), + (5, 9, 'x', 'fred@fred.com')] + self._assert_addresses(addresses, expected) + @testing.requires.update_from def test_exec_three_table(self): - users, addresses, dingalings = \ - self.tables.users, \ - self.tables.addresses, \ - self.tables.dingalings + users = self.tables.users + addresses = self.tables.addresses + dingalings = self.tables.dingalings + testing.db.execute( - addresses.update().\ - values(email_address=users.c.name).\ - where(users.c.id==addresses.c.user_id).\ - where(users.c.name=='ed'). - where(addresses.c.id==dingalings.c.address_id).\ - where(dingalings.c.id==1), - ) - eq_( - testing.db.execute( - addresses.select().order_by(addresses.c.id) - ).fetchall(), - [ - (1, 7, 'x', "jack@bean.com"), - (2, 8, 'x', "ed"), - (3, 8, 'x', "ed@bettyboop.com"), - (4, 8, 'x', "ed@lala.com"), - (5, 9, 'x', "fred@fred.com") - ] - ) + addresses.update(). + values(email_address=users.c.name). + where(users.c.id == addresses.c.user_id). + where(users.c.name == 'ed'). + where(addresses.c.id == dingalings.c.address_id). + where(dingalings.c.id == 1)) + + expected = [ + (1, 7, 'x', 'jack@bean.com'), + (2, 8, 'x', 'ed'), + (3, 8, 'x', 'ed@bettyboop.com'), + (4, 8, 'x', 'ed@lala.com'), + (5, 9, 'x', 'fred@fred.com')] + self._assert_addresses(addresses, expected) @testing.only_on('mysql', 'Multi table update') def test_exec_multitable(self): users, addresses = self.tables.users, self.tables.addresses + + values = { + addresses.c.email_address: users.c.name, + users.c.name: 'ed2' + } + testing.db.execute( - addresses.update().\ - values({ - addresses.c.email_address:users.c.name, - users.c.name:'ed2' - }).\ - where(users.c.id==addresses.c.user_id).\ - where(users.c.name=='ed') - ) - eq_( - testing.db.execute( - addresses.select().order_by(addresses.c.id)).fetchall(), - [ - (1, 7, 'x', "jack@bean.com"), - (2, 8, 'x', "ed"), - (3, 8, 'x', "ed"), - (4, 8, 'x', "ed"), - (5, 9, 'x', "fred@fred.com") - ] - ) - eq_( - testing.db.execute( - users.select().order_by(users.c.id)).fetchall(), - [ - (7, 'jack'), - (8, 'ed2'), - (9, 'fred'), - (10, 'chuck') - ] - ) + addresses.update(). + values(values). + where(users.c.id == addresses.c.user_id). + where(users.c.name == 'ed')) + + expected = [ + (1, 7, 'x', 'jack@bean.com'), + (2, 8, 'x', 'ed'), + (3, 8, 'x', 'ed'), + (4, 8, 'x', 'ed'), + (5, 9, 'x', 'fred@fred.com')] + self._assert_addresses(addresses, expected) + + expected = [ + (7, 'jack'), + (8, 'ed2'), + (9, 'fred'), + (10, 'chuck')] + self._assert_users(users, expected) + + def _assert_addresses(self, addresses, expected): + stmt = addresses.select().order_by(addresses.c.id) + eq_(testing.db.execute(stmt).fetchall(), expected) + + def _assert_users(self, users, expected): + stmt = users.select().order_by(users.c.id) + eq_(testing.db.execute(stmt).fetchall(), expected) + -class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, fixtures.TablesTest): +class UpdateFromMultiTableUpdateDefaultsTest(_UpdateFromTestBase, + fixtures.TablesTest): @classmethod def define_tables(cls, metadata): Table('users', metadata, Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), + test_needs_autoincrement=True), Column('name', String(30), nullable=False), - Column('some_update', String(30), onupdate="im the update") - ) + Column('some_update', String(30), onupdate='im the update')) Table('addresses', metadata, Column('id', Integer, primary_key=True, - test_needs_autoincrement=True), + test_needs_autoincrement=True), Column('user_id', None, ForeignKey('users.id')), - Column('email_address', String(50), nullable=False), - ) + Column('email_address', String(50), nullable=False)) @classmethod def fixtures(cls): return dict( - users = ( + users=( ('id', 'name', 'some_update'), (8, 'ed', 'value'), (9, 'fred', 'value'), ), - - addresses = ( + addresses=( ('id', 'user_id', 'email_address'), - (2, 8, "ed@wood.com"), - (3, 8, "ed@bettyboop.com"), - (4, 9, "fred@fred.com") + (2, 8, 'ed@wood.com'), + (3, 8, 'ed@bettyboop.com'), + (4, 9, 'fred@fred.com') ), ) @testing.only_on('mysql', 'Multi table update') def test_defaults_second_table(self): users, addresses = self.tables.users, self.tables.addresses + + values = { + addresses.c.email_address: users.c.name, + users.c.name: 'ed2' + } + ret = testing.db.execute( - addresses.update().\ - values({ - addresses.c.email_address:users.c.name, - users.c.name:'ed2' - }).\ - where(users.c.id==addresses.c.user_id).\ - where(users.c.name=='ed') - ) - eq_( - set(ret.prefetch_cols()), - set([users.c.some_update]) - ) - eq_( - testing.db.execute( - addresses.select().order_by(addresses.c.id)).fetchall(), - [ - (2, 8, "ed"), - (3, 8, "ed"), - (4, 9, "fred@fred.com") - ] - ) - eq_( - testing.db.execute( - users.select().order_by(users.c.id)).fetchall(), - [ - (8, 'ed2', 'im the update'), - (9, 'fred', 'value'), - ] - ) + addresses.update(). + values(values). + where(users.c.id == addresses.c.user_id). + where(users.c.name == 'ed')) + + eq_(set(ret.prefetch_cols()), set([users.c.some_update])) + + expected = [ + (2, 8, 'ed'), + (3, 8, 'ed'), + (4, 9, 'fred@fred.com')] + self._assert_addresses(addresses, expected) + + expected = [ + (8, 'ed2', 'im the update'), + (9, 'fred', 'value')] + self._assert_users(users, expected) @testing.only_on('mysql', 'Multi table update') def test_no_defaults_second_table(self): users, addresses = self.tables.users, self.tables.addresses + ret = testing.db.execute( - addresses.update().\ - values({ - 'email_address':users.c.name, - }).\ - where(users.c.id==addresses.c.user_id).\ - where(users.c.name=='ed') - ) - eq_( - ret.prefetch_cols(),[] - ) - eq_( - testing.db.execute( - addresses.select().order_by(addresses.c.id)).fetchall(), - [ - (2, 8, "ed"), - (3, 8, "ed"), - (4, 9, "fred@fred.com") - ] - ) - # users table not actually updated, - # so no onupdate - eq_( - testing.db.execute( - users.select().order_by(users.c.id)).fetchall(), - [ - (8, 'ed', 'value'), - (9, 'fred', 'value'), - ] - ) + addresses.update(). + values({'email_address': users.c.name}). + where(users.c.id == addresses.c.user_id). + where(users.c.name == 'ed')) + + eq_(ret.prefetch_cols(), []) + + expected = [ + (2, 8, 'ed'), + (3, 8, 'ed'), + (4, 9, 'fred@fred.com')] + self._assert_addresses(addresses, expected) + + # users table not actually updated, so no onupdate + expected = [ + (8, 'ed', 'value'), + (9, 'fred', 'value')] + self._assert_users(users, expected) + + def _assert_addresses(self, addresses, expected): + stmt = addresses.select().order_by(addresses.c.id) + eq_(testing.db.execute(stmt).fetchall(), expected) + + def _assert_users(self, users, expected): + stmt = users.select().order_by(users.c.id) + eq_(testing.db.execute(stmt).fetchall(), expected)