From: Mike Bayer Date: Wed, 30 Mar 2016 21:36:03 +0000 (-0400) Subject: - move all resultproxy tests intio test_resultset X-Git-Tag: rel_1_1_0b1~79 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=03d35a833cff68013781640b1192d079ab7e1154;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - move all resultproxy tests intio test_resultset --- diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index 66903bef3c..76d60f207c 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -874,184 +874,6 @@ class MockStrategyTest(fixtures.TestBase): ) -class ResultProxyTest(fixtures.TestBase): - __backend__ = True - - def test_nontuple_row(self): - """ensure the C version of BaseRowProxy handles - duck-type-dependent rows.""" - - from sqlalchemy.engine import RowProxy - - class MyList(object): - - def __init__(self, l): - self.l = l - - def __len__(self): - return len(self.l) - - def __getitem__(self, i): - return list.__getitem__(self.l, i) - - proxy = RowProxy(object(), MyList(['value']), [None], { - 'key': (None, None, 0), 0: (None, None, 0)}) - eq_(list(proxy), ['value']) - eq_(proxy[0], 'value') - eq_(proxy['key'], 'value') - - @testing.provide_metadata - def test_no_rowcount_on_selects_inserts(self): - """assert that rowcount is only called on deletes and updates. - - This because cursor.rowcount may can be expensive on some dialects - such as Firebird, however many dialects require it be called - before the cursor is closed. - - """ - - metadata = self.metadata - - engine = engines.testing_engine() - - t = Table('t1', metadata, - Column('data', String(10)) - ) - metadata.create_all(engine) - - with patch.object( - engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount: - mock_rowcount.__get__ = Mock() - engine.execute(t.insert(), - {'data': 'd1'}, - {'data': 'd2'}, - {'data': 'd3'}) - - eq_(len(mock_rowcount.__get__.mock_calls), 0) - - eq_( - engine.execute(t.select()).fetchall(), - [('d1', ), ('d2', ), ('d3', )] - ) - eq_(len(mock_rowcount.__get__.mock_calls), 0) - - engine.execute(t.update(), {'data': 'd4'}) - - eq_(len(mock_rowcount.__get__.mock_calls), 1) - - engine.execute(t.delete()) - eq_(len(mock_rowcount.__get__.mock_calls), 2) - - def test_rowproxy_is_sequence(self): - import collections - from sqlalchemy.engine import RowProxy - - row = RowProxy( - object(), ['value'], [None], - {'key': (None, None, 0), 0: (None, None, 0)}) - assert isinstance(row, collections.Sequence) - - @testing.provide_metadata - def test_rowproxy_getitem_indexes_compiled(self): - values = Table('users', self.metadata, - Column('key', String(10), primary_key=True), - Column('value', String(10))) - values.create() - - testing.db.execute(values.insert(), dict(key='One', value='Uno')) - row = testing.db.execute(values.select()).first() - eq_(row['key'], 'One') - eq_(row['value'], 'Uno') - eq_(row[0], 'One') - eq_(row[1], 'Uno') - eq_(row[-2], 'One') - eq_(row[-1], 'Uno') - eq_(row[1:0:-1], ('Uno',)) - - def test_rowproxy_getitem_indexes_raw(self): - row = testing.db.execute("select 'One' as key, 'Uno' as value").first() - eq_(row['key'], 'One') - eq_(row['value'], 'Uno') - eq_(row[0], 'One') - eq_(row[1], 'Uno') - eq_(row[-2], 'One') - eq_(row[-1], 'Uno') - eq_(row[1:0:-1], ('Uno',)) - - @testing.requires.cextensions - def test_row_c_sequence_check(self): - import csv - - metadata = MetaData() - metadata.bind = 'sqlite://' - users = Table('users', metadata, - Column('id', Integer, primary_key=True), - Column('name', String(40)), - ) - users.create() - - users.insert().execute(name='Test') - row = users.select().execute().fetchone() - - s = util.StringIO() - writer = csv.writer(s) - # csv performs PySequenceCheck call - writer.writerow(row) - assert s.getvalue().strip() == '1,Test' - - @testing.requires.selectone - def test_empty_accessors(self): - statements = [ - ( - "select 1", - [ - lambda r: r.last_inserted_params(), - lambda r: r.last_updated_params(), - lambda r: r.prefetch_cols(), - lambda r: r.postfetch_cols(), - lambda r: r.inserted_primary_key - ], - "Statement is not a compiled expression construct." - ), - ( - select([1]), - [ - lambda r: r.last_inserted_params(), - lambda r: r.inserted_primary_key - ], - r"Statement is not an insert\(\) expression construct." - ), - ( - select([1]), - [ - lambda r: r.last_updated_params(), - ], - r"Statement is not an update\(\) expression construct." - ), - ( - select([1]), - [ - lambda r: r.prefetch_cols(), - lambda r: r.postfetch_cols() - ], - r"Statement is not an insert\(\) " - r"or update\(\) expression construct." - ), - ] - - for stmt, meths, msg in statements: - r = testing.db.execute(stmt) - try: - for meth in meths: - assert_raises_message( - tsa.exc.InvalidRequestError, - msg, - meth, r - ) - - finally: - r.close() - class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults): __requires__ = 'schemas', __backend__ = True @@ -1260,160 +1082,6 @@ class ExecutionOptionsTest(fixtures.TestBase): ) -class AlternateResultProxyTest(fixtures.TablesTest): - __requires__ = ('sqlite', ) - - @classmethod - def setup_bind(cls): - cls.engine = engine = testing_engine('sqlite://') - return engine - - @classmethod - def define_tables(cls, metadata): - Table( - 'test', metadata, - Column('x', Integer, primary_key=True), - Column('y', String(50, convert_unicode='force')) - ) - - @classmethod - def insert_data(cls): - cls.engine.execute(cls.tables.test.insert(), [ - {'x': i, 'y': "t_%d" % i} for i in range(1, 12) - ]) - - @contextmanager - def _proxy_fixture(self, cls): - self.table = self.tables.test - - class ExcCtx(default.DefaultExecutionContext): - - def get_result_proxy(self): - return cls(self) - self.patcher = patch.object( - self.engine.dialect, "execution_ctx_cls", ExcCtx) - with self.patcher: - yield - - def _test_proxy(self, cls): - with self._proxy_fixture(cls): - rows = [] - r = self.engine.execute(select([self.table])) - assert isinstance(r, cls) - for i in range(5): - rows.append(r.fetchone()) - eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) - - rows = r.fetchmany(3) - eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)]) - - rows = r.fetchall() - eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)]) - - r = self.engine.execute(select([self.table])) - rows = r.fetchmany(None) - eq_(rows[0], (1, "t_1")) - # number of rows here could be one, or the whole thing - assert len(rows) == 1 or len(rows) == 11 - - r = self.engine.execute(select([self.table]).limit(1)) - r.fetchone() - eq_(r.fetchone(), None) - - r = self.engine.execute(select([self.table]).limit(5)) - rows = r.fetchmany(6) - eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) - - # result keeps going just fine with blank results... - eq_(r.fetchmany(2), []) - - eq_(r.fetchmany(2), []) - - eq_(r.fetchall(), []) - - eq_(r.fetchone(), None) - - # until we close - r.close() - - self._assert_result_closed(r) - - r = self.engine.execute(select([self.table]).limit(5)) - eq_(r.first(), (1, "t_1")) - self._assert_result_closed(r) - - r = self.engine.execute(select([self.table]).limit(5)) - eq_(r.scalar(), 1) - self._assert_result_closed(r) - - def _assert_result_closed(self, r): - assert_raises_message( - tsa.exc.ResourceClosedError, - "object is closed", - r.fetchone - ) - - assert_raises_message( - tsa.exc.ResourceClosedError, - "object is closed", - r.fetchmany, 2 - ) - - assert_raises_message( - tsa.exc.ResourceClosedError, - "object is closed", - r.fetchall - ) - - def test_plain(self): - self._test_proxy(_result.ResultProxy) - - def test_buffered_row_result_proxy(self): - self._test_proxy(_result.BufferedRowResultProxy) - - def test_fully_buffered_result_proxy(self): - self._test_proxy(_result.FullyBufferedResultProxy) - - def test_buffered_column_result_proxy(self): - self._test_proxy(_result.BufferedColumnResultProxy) - - def test_buffered_row_growth(self): - with self._proxy_fixture(_result.BufferedRowResultProxy): - with self.engine.connect() as conn: - conn.execute(self.table.insert(), [ - {'x': i, 'y': "t_%d" % i} for i in range(15, 1200) - ]) - result = conn.execute(self.table.select()) - checks = { - 0: 5, 1: 10, 9: 20, 135: 250, 274: 500, - 1351: 1000 - } - for idx, row in enumerate(result, 0): - if idx in checks: - eq_(result._bufsize, checks[idx]) - le_( - len(result._BufferedRowResultProxy__rowbuffer), - 1000 - ) - - def test_max_row_buffer_option(self): - with self._proxy_fixture(_result.BufferedRowResultProxy): - with self.engine.connect() as conn: - conn.execute(self.table.insert(), [ - {'x': i, 'y': "t_%d" % i} for i in range(15, 1200) - ]) - result = conn.execution_options(max_row_buffer=27).execute( - self.table.select() - ) - for idx, row in enumerate(result, 0): - if idx in (16, 70, 150, 250): - eq_(result._bufsize, 27) - le_( - len(result._BufferedRowResultProxy__rowbuffer), - 27 - ) - - class EngineEventsTest(fixtures.TestBase): __requires__ = 'ad_hoc_engines', __backend__ = True diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index aaeb82fa4c..3669390d5a 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -1,5 +1,5 @@ from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, \ - in_, not_in_, is_, ne_ + in_, not_in_, is_, ne_, le_ from sqlalchemy import testing from sqlalchemy.testing import fixtures, engines from sqlalchemy import util @@ -11,6 +11,10 @@ from sqlalchemy.engine import result as _result from sqlalchemy.testing.schema import Table, Column import operator from sqlalchemy.testing import assertions +from sqlalchemy import exc as sa_exc +from sqlalchemy.testing.mock import patch, Mock +from contextlib import contextmanager +from sqlalchemy.engine import default class ResultProxyTest(fixtures.TablesTest): @@ -915,6 +919,182 @@ class ResultProxyTest(fixtures.TablesTest): eq_(r['_parent'], 'Hidden parent') eq_(r['_row'], 'Hidden row') + def test_nontuple_row(self): + """ensure the C version of BaseRowProxy handles + duck-type-dependent rows.""" + + from sqlalchemy.engine import RowProxy + + class MyList(object): + + def __init__(self, l): + self.l = l + + def __len__(self): + return len(self.l) + + def __getitem__(self, i): + return list.__getitem__(self.l, i) + + proxy = RowProxy(object(), MyList(['value']), [None], { + 'key': (None, None, 0), 0: (None, None, 0)}) + eq_(list(proxy), ['value']) + eq_(proxy[0], 'value') + eq_(proxy['key'], 'value') + + @testing.provide_metadata + def test_no_rowcount_on_selects_inserts(self): + """assert that rowcount is only called on deletes and updates. + + This because cursor.rowcount may can be expensive on some dialects + such as Firebird, however many dialects require it be called + before the cursor is closed. + + """ + + metadata = self.metadata + + engine = engines.testing_engine() + + t = Table('t1', metadata, + Column('data', String(10)) + ) + metadata.create_all(engine) + + with patch.object( + engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount: + mock_rowcount.__get__ = Mock() + engine.execute(t.insert(), + {'data': 'd1'}, + {'data': 'd2'}, + {'data': 'd3'}) + + eq_(len(mock_rowcount.__get__.mock_calls), 0) + + eq_( + engine.execute(t.select()).fetchall(), + [('d1', ), ('d2', ), ('d3', )] + ) + eq_(len(mock_rowcount.__get__.mock_calls), 0) + + engine.execute(t.update(), {'data': 'd4'}) + + eq_(len(mock_rowcount.__get__.mock_calls), 1) + + engine.execute(t.delete()) + eq_(len(mock_rowcount.__get__.mock_calls), 2) + + def test_rowproxy_is_sequence(self): + import collections + from sqlalchemy.engine import RowProxy + + row = RowProxy( + object(), ['value'], [None], + {'key': (None, None, 0), 0: (None, None, 0)}) + assert isinstance(row, collections.Sequence) + + @testing.provide_metadata + def test_rowproxy_getitem_indexes_compiled(self): + values = Table('rp', self.metadata, + Column('key', String(10), primary_key=True), + Column('value', String(10))) + values.create() + + testing.db.execute(values.insert(), dict(key='One', value='Uno')) + row = testing.db.execute(values.select()).first() + eq_(row['key'], 'One') + eq_(row['value'], 'Uno') + eq_(row[0], 'One') + eq_(row[1], 'Uno') + eq_(row[-2], 'One') + eq_(row[-1], 'Uno') + eq_(row[1:0:-1], ('Uno',)) + + def test_rowproxy_getitem_indexes_raw(self): + row = testing.db.execute("select 'One' as key, 'Uno' as value").first() + eq_(row['key'], 'One') + eq_(row['value'], 'Uno') + eq_(row[0], 'One') + eq_(row[1], 'Uno') + eq_(row[-2], 'One') + eq_(row[-1], 'Uno') + eq_(row[1:0:-1], ('Uno',)) + + @testing.requires.cextensions + def test_row_c_sequence_check(self): + import csv + + metadata = MetaData() + metadata.bind = 'sqlite://' + users = Table('users', metadata, + Column('id', Integer, primary_key=True), + Column('name', String(40)), + ) + users.create() + + users.insert().execute(name='Test') + row = users.select().execute().fetchone() + + s = util.StringIO() + writer = csv.writer(s) + # csv performs PySequenceCheck call + writer.writerow(row) + assert s.getvalue().strip() == '1,Test' + + @testing.requires.selectone + def test_empty_accessors(self): + statements = [ + ( + "select 1", + [ + lambda r: r.last_inserted_params(), + lambda r: r.last_updated_params(), + lambda r: r.prefetch_cols(), + lambda r: r.postfetch_cols(), + lambda r: r.inserted_primary_key + ], + "Statement is not a compiled expression construct." + ), + ( + select([1]), + [ + lambda r: r.last_inserted_params(), + lambda r: r.inserted_primary_key + ], + r"Statement is not an insert\(\) expression construct." + ), + ( + select([1]), + [ + lambda r: r.last_updated_params(), + ], + r"Statement is not an update\(\) expression construct." + ), + ( + select([1]), + [ + lambda r: r.prefetch_cols(), + lambda r: r.postfetch_cols() + ], + r"Statement is not an insert\(\) " + r"or update\(\) expression construct." + ), + ] + + for stmt, meths, msg in statements: + r = testing.db.execute(stmt) + try: + for meth in meths: + assert_raises_message( + sa_exc.InvalidRequestError, + msg, + meth, r + ) + + finally: + r.close() + + class KeyTargetingTest(fixtures.TablesTest): run_inserts = 'once' @@ -1317,3 +1497,158 @@ class PositionalTextTest(fixtures.TablesTest): "Could not locate column in row for column 'text1.a'", lambda: row[text1.c.a] ) + + +class AlternateResultProxyTest(fixtures.TablesTest): + __requires__ = ('sqlite', ) + + @classmethod + def setup_bind(cls): + cls.engine = engine = engines.testing_engine('sqlite://') + return engine + + @classmethod + def define_tables(cls, metadata): + Table( + 'test', metadata, + Column('x', Integer, primary_key=True), + Column('y', String(50, convert_unicode='force')) + ) + + @classmethod + def insert_data(cls): + cls.engine.execute(cls.tables.test.insert(), [ + {'x': i, 'y': "t_%d" % i} for i in range(1, 12) + ]) + + @contextmanager + def _proxy_fixture(self, cls): + self.table = self.tables.test + + class ExcCtx(default.DefaultExecutionContext): + + def get_result_proxy(self): + return cls(self) + self.patcher = patch.object( + self.engine.dialect, "execution_ctx_cls", ExcCtx) + with self.patcher: + yield + + def _test_proxy(self, cls): + with self._proxy_fixture(cls): + rows = [] + r = self.engine.execute(select([self.table])) + assert isinstance(r, cls) + for i in range(5): + rows.append(r.fetchone()) + eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) + + rows = r.fetchmany(3) + eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)]) + + rows = r.fetchall() + eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)]) + + r = self.engine.execute(select([self.table])) + rows = r.fetchmany(None) + eq_(rows[0], (1, "t_1")) + # number of rows here could be one, or the whole thing + assert len(rows) == 1 or len(rows) == 11 + + r = self.engine.execute(select([self.table]).limit(1)) + r.fetchone() + eq_(r.fetchone(), None) + + r = self.engine.execute(select([self.table]).limit(5)) + rows = r.fetchmany(6) + eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)]) + + # result keeps going just fine with blank results... + eq_(r.fetchmany(2), []) + + eq_(r.fetchmany(2), []) + + eq_(r.fetchall(), []) + + eq_(r.fetchone(), None) + + # until we close + r.close() + + self._assert_result_closed(r) + + r = self.engine.execute(select([self.table]).limit(5)) + eq_(r.first(), (1, "t_1")) + self._assert_result_closed(r) + + r = self.engine.execute(select([self.table]).limit(5)) + eq_(r.scalar(), 1) + self._assert_result_closed(r) + + def _assert_result_closed(self, r): + assert_raises_message( + sa_exc.ResourceClosedError, + "object is closed", + r.fetchone + ) + + assert_raises_message( + sa_exc.ResourceClosedError, + "object is closed", + r.fetchmany, 2 + ) + + assert_raises_message( + sa_exc.ResourceClosedError, + "object is closed", + r.fetchall + ) + + def test_plain(self): + self._test_proxy(_result.ResultProxy) + + def test_buffered_row_result_proxy(self): + self._test_proxy(_result.BufferedRowResultProxy) + + def test_fully_buffered_result_proxy(self): + self._test_proxy(_result.FullyBufferedResultProxy) + + def test_buffered_column_result_proxy(self): + self._test_proxy(_result.BufferedColumnResultProxy) + + def test_buffered_row_growth(self): + with self._proxy_fixture(_result.BufferedRowResultProxy): + with self.engine.connect() as conn: + conn.execute(self.table.insert(), [ + {'x': i, 'y': "t_%d" % i} for i in range(15, 1200) + ]) + result = conn.execute(self.table.select()) + checks = { + 0: 5, 1: 10, 9: 20, 135: 250, 274: 500, + 1351: 1000 + } + for idx, row in enumerate(result, 0): + if idx in checks: + eq_(result._bufsize, checks[idx]) + le_( + len(result._BufferedRowResultProxy__rowbuffer), + 1000 + ) + + def test_max_row_buffer_option(self): + with self._proxy_fixture(_result.BufferedRowResultProxy): + with self.engine.connect() as conn: + conn.execute(self.table.insert(), [ + {'x': i, 'y': "t_%d" % i} for i in range(15, 1200) + ]) + result = conn.execution_options(max_row_buffer=27).execute( + self.table.select() + ) + for idx, row in enumerate(result, 0): + if idx in (16, 70, 150, 250): + eq_(result._bufsize, 27) + le_( + len(result._BufferedRowResultProxy__rowbuffer), + 27 + ) +