]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- move all resultproxy tests intio test_resultset
authorMike Bayer <mike_mp@zzzcomputing.com>
Wed, 30 Mar 2016 21:36:03 +0000 (17:36 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Wed, 30 Mar 2016 21:36:03 +0000 (17:36 -0400)
test/engine/test_execute.py
test/sql/test_resultset.py

index 66903bef3c10e818669b5828b44a06cd4db72357..76d60f207c7f656c68a96ba3623a8700c6e3288c 100644 (file)
@@ -874,184 +874,6 @@ class MockStrategyTest(fixtures.TestBase):
         )
 
 
-class ResultProxyTest(fixtures.TestBase):
-    __backend__ = True
-
-    def test_nontuple_row(self):
-        """ensure the C version of BaseRowProxy handles
-        duck-type-dependent rows."""
-
-        from sqlalchemy.engine import RowProxy
-
-        class MyList(object):
-
-            def __init__(self, l):
-                self.l = l
-
-            def __len__(self):
-                return len(self.l)
-
-            def __getitem__(self, i):
-                return list.__getitem__(self.l, i)
-
-        proxy = RowProxy(object(), MyList(['value']), [None], {
-                         'key': (None, None, 0), 0: (None, None, 0)})
-        eq_(list(proxy), ['value'])
-        eq_(proxy[0], 'value')
-        eq_(proxy['key'], 'value')
-
-    @testing.provide_metadata
-    def test_no_rowcount_on_selects_inserts(self):
-        """assert that rowcount is only called on deletes and updates.
-
-        This because cursor.rowcount may can be expensive on some dialects
-        such as Firebird, however many dialects require it be called
-        before the cursor is closed.
-
-        """
-
-        metadata = self.metadata
-
-        engine = engines.testing_engine()
-
-        t = Table('t1', metadata,
-                  Column('data', String(10))
-                  )
-        metadata.create_all(engine)
-
-        with patch.object(
-                engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount:
-            mock_rowcount.__get__ = Mock()
-            engine.execute(t.insert(),
-                           {'data': 'd1'},
-                           {'data': 'd2'},
-                           {'data': 'd3'})
-
-            eq_(len(mock_rowcount.__get__.mock_calls), 0)
-
-            eq_(
-                engine.execute(t.select()).fetchall(),
-                [('d1', ), ('d2', ), ('d3', )]
-            )
-            eq_(len(mock_rowcount.__get__.mock_calls), 0)
-
-            engine.execute(t.update(), {'data': 'd4'})
-
-            eq_(len(mock_rowcount.__get__.mock_calls), 1)
-
-            engine.execute(t.delete())
-            eq_(len(mock_rowcount.__get__.mock_calls), 2)
-
-    def test_rowproxy_is_sequence(self):
-        import collections
-        from sqlalchemy.engine import RowProxy
-
-        row = RowProxy(
-            object(), ['value'], [None],
-            {'key': (None, None, 0), 0: (None, None, 0)})
-        assert isinstance(row, collections.Sequence)
-
-    @testing.provide_metadata
-    def test_rowproxy_getitem_indexes_compiled(self):
-        values = Table('users', self.metadata,
-                       Column('key', String(10), primary_key=True),
-                       Column('value', String(10)))
-        values.create()
-
-        testing.db.execute(values.insert(), dict(key='One', value='Uno'))
-        row = testing.db.execute(values.select()).first()
-        eq_(row['key'], 'One')
-        eq_(row['value'], 'Uno')
-        eq_(row[0], 'One')
-        eq_(row[1], 'Uno')
-        eq_(row[-2], 'One')
-        eq_(row[-1], 'Uno')
-        eq_(row[1:0:-1], ('Uno',))
-
-    def test_rowproxy_getitem_indexes_raw(self):
-        row = testing.db.execute("select 'One' as key, 'Uno' as value").first()
-        eq_(row['key'], 'One')
-        eq_(row['value'], 'Uno')
-        eq_(row[0], 'One')
-        eq_(row[1], 'Uno')
-        eq_(row[-2], 'One')
-        eq_(row[-1], 'Uno')
-        eq_(row[1:0:-1], ('Uno',))
-
-    @testing.requires.cextensions
-    def test_row_c_sequence_check(self):
-        import csv
-
-        metadata = MetaData()
-        metadata.bind = 'sqlite://'
-        users = Table('users', metadata,
-                      Column('id', Integer, primary_key=True),
-                      Column('name', String(40)),
-                      )
-        users.create()
-
-        users.insert().execute(name='Test')
-        row = users.select().execute().fetchone()
-
-        s = util.StringIO()
-        writer = csv.writer(s)
-        # csv performs PySequenceCheck call
-        writer.writerow(row)
-        assert s.getvalue().strip() == '1,Test'
-
-    @testing.requires.selectone
-    def test_empty_accessors(self):
-        statements = [
-            (
-                "select 1",
-                [
-                    lambda r: r.last_inserted_params(),
-                    lambda r: r.last_updated_params(),
-                    lambda r: r.prefetch_cols(),
-                    lambda r: r.postfetch_cols(),
-                    lambda r: r.inserted_primary_key
-                ],
-                "Statement is not a compiled expression construct."
-            ),
-            (
-                select([1]),
-                [
-                    lambda r: r.last_inserted_params(),
-                    lambda r: r.inserted_primary_key
-                ],
-                r"Statement is not an insert\(\) expression construct."
-            ),
-            (
-                select([1]),
-                [
-                    lambda r: r.last_updated_params(),
-                ],
-                r"Statement is not an update\(\) expression construct."
-            ),
-            (
-                select([1]),
-                [
-                    lambda r: r.prefetch_cols(),
-                    lambda r: r.postfetch_cols()
-                ],
-                r"Statement is not an insert\(\) "
-                r"or update\(\) expression construct."
-            ),
-        ]
-
-        for stmt, meths, msg in statements:
-            r = testing.db.execute(stmt)
-            try:
-                for meth in meths:
-                    assert_raises_message(
-                        tsa.exc.InvalidRequestError,
-                        msg,
-                        meth, r
-                    )
-
-            finally:
-                r.close()
-
 class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
     __requires__ = 'schemas',
     __backend__ = True
@@ -1260,160 +1082,6 @@ class ExecutionOptionsTest(fixtures.TestBase):
         )
 
 
-class AlternateResultProxyTest(fixtures.TablesTest):
-    __requires__ = ('sqlite', )
-
-    @classmethod
-    def setup_bind(cls):
-        cls.engine = engine = testing_engine('sqlite://')
-        return engine
-
-    @classmethod
-    def define_tables(cls, metadata):
-        Table(
-            'test', metadata,
-            Column('x', Integer, primary_key=True),
-            Column('y', String(50, convert_unicode='force'))
-        )
-
-    @classmethod
-    def insert_data(cls):
-        cls.engine.execute(cls.tables.test.insert(), [
-            {'x': i, 'y': "t_%d" % i} for i in range(1, 12)
-        ])
-
-    @contextmanager
-    def _proxy_fixture(self, cls):
-        self.table = self.tables.test
-
-        class ExcCtx(default.DefaultExecutionContext):
-
-            def get_result_proxy(self):
-                return cls(self)
-        self.patcher = patch.object(
-            self.engine.dialect, "execution_ctx_cls", ExcCtx)
-        with self.patcher:
-            yield
-
-    def _test_proxy(self, cls):
-        with self._proxy_fixture(cls):
-            rows = []
-            r = self.engine.execute(select([self.table]))
-            assert isinstance(r, cls)
-            for i in range(5):
-                rows.append(r.fetchone())
-            eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
-
-            rows = r.fetchmany(3)
-            eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
-
-            rows = r.fetchall()
-            eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
-
-            r = self.engine.execute(select([self.table]))
-            rows = r.fetchmany(None)
-            eq_(rows[0], (1, "t_1"))
-            # number of rows here could be one, or the whole thing
-            assert len(rows) == 1 or len(rows) == 11
-
-            r = self.engine.execute(select([self.table]).limit(1))
-            r.fetchone()
-            eq_(r.fetchone(), None)
-
-            r = self.engine.execute(select([self.table]).limit(5))
-            rows = r.fetchmany(6)
-            eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
-
-            # result keeps going just fine with blank results...
-            eq_(r.fetchmany(2), [])
-
-            eq_(r.fetchmany(2), [])
-
-            eq_(r.fetchall(), [])
-
-            eq_(r.fetchone(), None)
-
-            # until we close
-            r.close()
-
-            self._assert_result_closed(r)
-
-            r = self.engine.execute(select([self.table]).limit(5))
-            eq_(r.first(), (1, "t_1"))
-            self._assert_result_closed(r)
-
-            r = self.engine.execute(select([self.table]).limit(5))
-            eq_(r.scalar(), 1)
-            self._assert_result_closed(r)
-
-    def _assert_result_closed(self, r):
-        assert_raises_message(
-            tsa.exc.ResourceClosedError,
-            "object is closed",
-            r.fetchone
-        )
-
-        assert_raises_message(
-            tsa.exc.ResourceClosedError,
-            "object is closed",
-            r.fetchmany, 2
-        )
-
-        assert_raises_message(
-            tsa.exc.ResourceClosedError,
-            "object is closed",
-            r.fetchall
-        )
-
-    def test_plain(self):
-        self._test_proxy(_result.ResultProxy)
-
-    def test_buffered_row_result_proxy(self):
-        self._test_proxy(_result.BufferedRowResultProxy)
-
-    def test_fully_buffered_result_proxy(self):
-        self._test_proxy(_result.FullyBufferedResultProxy)
-
-    def test_buffered_column_result_proxy(self):
-        self._test_proxy(_result.BufferedColumnResultProxy)
-
-    def test_buffered_row_growth(self):
-        with self._proxy_fixture(_result.BufferedRowResultProxy):
-            with self.engine.connect() as conn:
-                conn.execute(self.table.insert(), [
-                    {'x': i, 'y': "t_%d" % i} for i in range(15, 1200)
-                ])
-                result = conn.execute(self.table.select())
-                checks = {
-                    0: 5, 1: 10, 9: 20, 135: 250, 274: 500,
-                    1351: 1000
-                }
-                for idx, row in enumerate(result, 0):
-                    if idx in checks:
-                        eq_(result._bufsize, checks[idx])
-                    le_(
-                        len(result._BufferedRowResultProxy__rowbuffer),
-                        1000
-                    )
-
-    def test_max_row_buffer_option(self):
-        with self._proxy_fixture(_result.BufferedRowResultProxy):
-            with self.engine.connect() as conn:
-                conn.execute(self.table.insert(), [
-                    {'x': i, 'y': "t_%d" % i} for i in range(15, 1200)
-                ])
-                result = conn.execution_options(max_row_buffer=27).execute(
-                    self.table.select()
-                )
-                for idx, row in enumerate(result, 0):
-                    if idx in (16, 70, 150, 250):
-                        eq_(result._bufsize, 27)
-                    le_(
-                        len(result._BufferedRowResultProxy__rowbuffer),
-                        27
-                    )
-
-
 class EngineEventsTest(fixtures.TestBase):
     __requires__ = 'ad_hoc_engines',
     __backend__ = True
index aaeb82fa4c8a143e00980d0b5dbcd2b42ade84cf..3669390d5ab8e9f36aa3bf9fff1f462b8c1dca61 100644 (file)
@@ -1,5 +1,5 @@
 from sqlalchemy.testing import eq_, assert_raises_message, assert_raises, \
-    in_, not_in_, is_, ne_
+    in_, not_in_, is_, ne_, le_
 from sqlalchemy import testing
 from sqlalchemy.testing import fixtures, engines
 from sqlalchemy import util
@@ -11,6 +11,10 @@ from sqlalchemy.engine import result as _result
 from sqlalchemy.testing.schema import Table, Column
 import operator
 from sqlalchemy.testing import assertions
+from sqlalchemy import exc as sa_exc
+from sqlalchemy.testing.mock import patch, Mock
+from contextlib import contextmanager
+from sqlalchemy.engine import default
 
 
 class ResultProxyTest(fixtures.TablesTest):
@@ -915,6 +919,182 @@ class ResultProxyTest(fixtures.TablesTest):
         eq_(r['_parent'], 'Hidden parent')
         eq_(r['_row'], 'Hidden row')
 
+    def test_nontuple_row(self):
+        """ensure the C version of BaseRowProxy handles
+        duck-type-dependent rows."""
+
+        from sqlalchemy.engine import RowProxy
+
+        class MyList(object):
+
+            def __init__(self, l):
+                self.l = l
+
+            def __len__(self):
+                return len(self.l)
+
+            def __getitem__(self, i):
+                return list.__getitem__(self.l, i)
+
+        proxy = RowProxy(object(), MyList(['value']), [None], {
+                         'key': (None, None, 0), 0: (None, None, 0)})
+        eq_(list(proxy), ['value'])
+        eq_(proxy[0], 'value')
+        eq_(proxy['key'], 'value')
+
+    @testing.provide_metadata
+    def test_no_rowcount_on_selects_inserts(self):
+        """assert that rowcount is only called on deletes and updates.
+
+        This because cursor.rowcount may can be expensive on some dialects
+        such as Firebird, however many dialects require it be called
+        before the cursor is closed.
+
+        """
+
+        metadata = self.metadata
+
+        engine = engines.testing_engine()
+
+        t = Table('t1', metadata,
+                  Column('data', String(10))
+                  )
+        metadata.create_all(engine)
+
+        with patch.object(
+                engine.dialect.execution_ctx_cls, "rowcount") as mock_rowcount:
+            mock_rowcount.__get__ = Mock()
+            engine.execute(t.insert(),
+                           {'data': 'd1'},
+                           {'data': 'd2'},
+                           {'data': 'd3'})
+
+            eq_(len(mock_rowcount.__get__.mock_calls), 0)
+
+            eq_(
+                engine.execute(t.select()).fetchall(),
+                [('d1', ), ('d2', ), ('d3', )]
+            )
+            eq_(len(mock_rowcount.__get__.mock_calls), 0)
+
+            engine.execute(t.update(), {'data': 'd4'})
+
+            eq_(len(mock_rowcount.__get__.mock_calls), 1)
+
+            engine.execute(t.delete())
+            eq_(len(mock_rowcount.__get__.mock_calls), 2)
+
+    def test_rowproxy_is_sequence(self):
+        import collections
+        from sqlalchemy.engine import RowProxy
+
+        row = RowProxy(
+            object(), ['value'], [None],
+            {'key': (None, None, 0), 0: (None, None, 0)})
+        assert isinstance(row, collections.Sequence)
+
+    @testing.provide_metadata
+    def test_rowproxy_getitem_indexes_compiled(self):
+        values = Table('rp', self.metadata,
+                       Column('key', String(10), primary_key=True),
+                       Column('value', String(10)))
+        values.create()
+
+        testing.db.execute(values.insert(), dict(key='One', value='Uno'))
+        row = testing.db.execute(values.select()).first()
+        eq_(row['key'], 'One')
+        eq_(row['value'], 'Uno')
+        eq_(row[0], 'One')
+        eq_(row[1], 'Uno')
+        eq_(row[-2], 'One')
+        eq_(row[-1], 'Uno')
+        eq_(row[1:0:-1], ('Uno',))
+
+    def test_rowproxy_getitem_indexes_raw(self):
+        row = testing.db.execute("select 'One' as key, 'Uno' as value").first()
+        eq_(row['key'], 'One')
+        eq_(row['value'], 'Uno')
+        eq_(row[0], 'One')
+        eq_(row[1], 'Uno')
+        eq_(row[-2], 'One')
+        eq_(row[-1], 'Uno')
+        eq_(row[1:0:-1], ('Uno',))
+
+    @testing.requires.cextensions
+    def test_row_c_sequence_check(self):
+        import csv
+
+        metadata = MetaData()
+        metadata.bind = 'sqlite://'
+        users = Table('users', metadata,
+                      Column('id', Integer, primary_key=True),
+                      Column('name', String(40)),
+                      )
+        users.create()
+
+        users.insert().execute(name='Test')
+        row = users.select().execute().fetchone()
+
+        s = util.StringIO()
+        writer = csv.writer(s)
+        # csv performs PySequenceCheck call
+        writer.writerow(row)
+        assert s.getvalue().strip() == '1,Test'
+
+    @testing.requires.selectone
+    def test_empty_accessors(self):
+        statements = [
+            (
+                "select 1",
+                [
+                    lambda r: r.last_inserted_params(),
+                    lambda r: r.last_updated_params(),
+                    lambda r: r.prefetch_cols(),
+                    lambda r: r.postfetch_cols(),
+                    lambda r: r.inserted_primary_key
+                ],
+                "Statement is not a compiled expression construct."
+            ),
+            (
+                select([1]),
+                [
+                    lambda r: r.last_inserted_params(),
+                    lambda r: r.inserted_primary_key
+                ],
+                r"Statement is not an insert\(\) expression construct."
+            ),
+            (
+                select([1]),
+                [
+                    lambda r: r.last_updated_params(),
+                ],
+                r"Statement is not an update\(\) expression construct."
+            ),
+            (
+                select([1]),
+                [
+                    lambda r: r.prefetch_cols(),
+                    lambda r: r.postfetch_cols()
+                ],
+                r"Statement is not an insert\(\) "
+                r"or update\(\) expression construct."
+            ),
+        ]
+
+        for stmt, meths, msg in statements:
+            r = testing.db.execute(stmt)
+            try:
+                for meth in meths:
+                    assert_raises_message(
+                        sa_exc.InvalidRequestError,
+                        msg,
+                        meth, r
+                    )
+
+            finally:
+                r.close()
+
+
 
 class KeyTargetingTest(fixtures.TablesTest):
     run_inserts = 'once'
@@ -1317,3 +1497,158 @@ class PositionalTextTest(fixtures.TablesTest):
             "Could not locate column in row for column 'text1.a'",
             lambda: row[text1.c.a]
         )
+
+
+class AlternateResultProxyTest(fixtures.TablesTest):
+    __requires__ = ('sqlite', )
+
+    @classmethod
+    def setup_bind(cls):
+        cls.engine = engine = engines.testing_engine('sqlite://')
+        return engine
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table(
+            'test', metadata,
+            Column('x', Integer, primary_key=True),
+            Column('y', String(50, convert_unicode='force'))
+        )
+
+    @classmethod
+    def insert_data(cls):
+        cls.engine.execute(cls.tables.test.insert(), [
+            {'x': i, 'y': "t_%d" % i} for i in range(1, 12)
+        ])
+
+    @contextmanager
+    def _proxy_fixture(self, cls):
+        self.table = self.tables.test
+
+        class ExcCtx(default.DefaultExecutionContext):
+
+            def get_result_proxy(self):
+                return cls(self)
+        self.patcher = patch.object(
+            self.engine.dialect, "execution_ctx_cls", ExcCtx)
+        with self.patcher:
+            yield
+
+    def _test_proxy(self, cls):
+        with self._proxy_fixture(cls):
+            rows = []
+            r = self.engine.execute(select([self.table]))
+            assert isinstance(r, cls)
+            for i in range(5):
+                rows.append(r.fetchone())
+            eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+
+            rows = r.fetchmany(3)
+            eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
+
+            rows = r.fetchall()
+            eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
+
+            r = self.engine.execute(select([self.table]))
+            rows = r.fetchmany(None)
+            eq_(rows[0], (1, "t_1"))
+            # number of rows here could be one, or the whole thing
+            assert len(rows) == 1 or len(rows) == 11
+
+            r = self.engine.execute(select([self.table]).limit(1))
+            r.fetchone()
+            eq_(r.fetchone(), None)
+
+            r = self.engine.execute(select([self.table]).limit(5))
+            rows = r.fetchmany(6)
+            eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+
+            # result keeps going just fine with blank results...
+            eq_(r.fetchmany(2), [])
+
+            eq_(r.fetchmany(2), [])
+
+            eq_(r.fetchall(), [])
+
+            eq_(r.fetchone(), None)
+
+            # until we close
+            r.close()
+
+            self._assert_result_closed(r)
+
+            r = self.engine.execute(select([self.table]).limit(5))
+            eq_(r.first(), (1, "t_1"))
+            self._assert_result_closed(r)
+
+            r = self.engine.execute(select([self.table]).limit(5))
+            eq_(r.scalar(), 1)
+            self._assert_result_closed(r)
+
+    def _assert_result_closed(self, r):
+        assert_raises_message(
+            sa_exc.ResourceClosedError,
+            "object is closed",
+            r.fetchone
+        )
+
+        assert_raises_message(
+            sa_exc.ResourceClosedError,
+            "object is closed",
+            r.fetchmany, 2
+        )
+
+        assert_raises_message(
+            sa_exc.ResourceClosedError,
+            "object is closed",
+            r.fetchall
+        )
+
+    def test_plain(self):
+        self._test_proxy(_result.ResultProxy)
+
+    def test_buffered_row_result_proxy(self):
+        self._test_proxy(_result.BufferedRowResultProxy)
+
+    def test_fully_buffered_result_proxy(self):
+        self._test_proxy(_result.FullyBufferedResultProxy)
+
+    def test_buffered_column_result_proxy(self):
+        self._test_proxy(_result.BufferedColumnResultProxy)
+
+    def test_buffered_row_growth(self):
+        with self._proxy_fixture(_result.BufferedRowResultProxy):
+            with self.engine.connect() as conn:
+                conn.execute(self.table.insert(), [
+                    {'x': i, 'y': "t_%d" % i} for i in range(15, 1200)
+                ])
+                result = conn.execute(self.table.select())
+                checks = {
+                    0: 5, 1: 10, 9: 20, 135: 250, 274: 500,
+                    1351: 1000
+                }
+                for idx, row in enumerate(result, 0):
+                    if idx in checks:
+                        eq_(result._bufsize, checks[idx])
+                    le_(
+                        len(result._BufferedRowResultProxy__rowbuffer),
+                        1000
+                    )
+
+    def test_max_row_buffer_option(self):
+        with self._proxy_fixture(_result.BufferedRowResultProxy):
+            with self.engine.connect() as conn:
+                conn.execute(self.table.insert(), [
+                    {'x': i, 'y': "t_%d" % i} for i in range(15, 1200)
+                ])
+                result = conn.execution_options(max_row_buffer=27).execute(
+                    self.table.select()
+                )
+                for idx, row in enumerate(result, 0):
+                    if idx in (16, 70, 150, 250):
+                        eq_(result._bufsize, 27)
+                    le_(
+                        len(result._BufferedRowResultProxy__rowbuffer),
+                        27
+                    )
+