]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- add test cases for pullreq github:182, where we add a new
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 14 Jun 2015 20:43:16 +0000 (16:43 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 14 Jun 2015 20:43:16 +0000 (16:43 -0400)
"max_row_buffer" execution option for BufferedRowResultProxy
- also add documentation, changelog and version notes
- rework the max_row_buffer argument to be interpreted from
the execution options upfront when the BufferedRowResultProxy
is first initialized.

doc/build/changelog/changelog_10.rst
lib/sqlalchemy/dialects/postgresql/psycopg2.py
lib/sqlalchemy/engine/result.py
test/engine/test_execute.py
test/orm/test_query.py

index f448865591a997a8c66ab7875f1efd4752b4747b..155302388aed169e09f71a51029f507281dd0b28 100644 (file)
 .. changelog::
     :version: 1.0.6
 
+    .. change::
+        :tags: feature, postgresql
+        :pullreq: github:182
+
+        Added new execution option ``max_row_buffer`` which is interpreted
+        by the psycopg2 dialect when the ``stream_results`` option is
+        used, which sets a limit on the size of the row buffer that may be
+        allocated.  This value is also provided based on the integer
+        value sent to :meth:`.Query.yield_per`.  Pull request courtesy
+        mcclurem.
+
     .. change::
         :tags: bug, orm
         :tickets: 3448
index 35de41fef815383365aaaffe192ad32fb4f28c69..36a9d7bf781e9752b891c5739a03b5ae2131db56 100644 (file)
@@ -74,6 +74,8 @@ See also:
 `PQconnectdbParams <http://www.postgresql.org/docs/9.1/static/\
 libpq-connect.html#LIBPQ-PQCONNECTDBPARAMS>`_
 
+.. _psycopg2_execution_options:
+
 Per-Statement/Connection Execution Options
 -------------------------------------------
 
@@ -81,16 +83,23 @@ The following DBAPI-specific options are respected when used with
 :meth:`.Connection.execution_options`, :meth:`.Executable.execution_options`,
 :meth:`.Query.execution_options`, in addition to those not specific to DBAPIs:
 
-* isolation_level - Set the transaction isolation level for the lifespan of a
+* ``isolation_level`` - Set the transaction isolation level for the lifespan of a
   :class:`.Connection` (can only be set on a connection, not a statement
   or query).   See :ref:`psycopg2_isolation_level`.
 
-* stream_results - Enable or disable usage of psycopg2 server side cursors -
+* ``stream_results`` - Enable or disable usage of psycopg2 server side cursors -
   this feature makes use of "named" cursors in combination with special
   result handling methods so that result rows are not fully buffered.
   If ``None`` or not set, the ``server_side_cursors`` option of the
   :class:`.Engine` is used.
 
+* ``max_row_buffer`` - when using ``stream_results``, an integer value that
+  specifies the maximum number of rows to buffer at a time.  This is
+  interpreted by the :class:`.BufferedRowResultProxy`, and if omitted the
+  buffer will grow to ultimately store 1000 rows at a time.
+
+  .. versionadded:: 1.0.6
+
 .. _psycopg2_unicode:
 
 Unicode with Psycopg2
index 41b30c98311b2090a1967b8284f57119db44c1b4..b2b78dee83e64cae9bd8a6579176adcb1f30fafb 100644 (file)
@@ -1068,9 +1068,26 @@ class BufferedRowResultProxy(ResultProxy):
     The pre-fetching behavior fetches only one row initially, and then
     grows its buffer size by a fixed amount with each successive need
     for additional rows up to a size of 1000.
+
+    The size argument is configurable using the ``max_row_buffer``
+    execution option::
+
+        with psycopg2_engine.connect() as conn:
+
+            result = conn.execution_options(
+                stream_results=True, max_row_buffer=50
+                ).execute("select * from table")
+
+    .. versionadded:: 1.0.6 Added the ``max_row_buffer`` option.
+
+    .. seealso::
+
+        :ref:`psycopg2_execution_options`
     """
 
     def _init_metadata(self):
+        self._max_row_buffer = self.context.execution_options.get(
+            'max_row_buffer', None)
         self.__buffer_rows()
         super(BufferedRowResultProxy, self)._init_metadata()
 
@@ -1095,8 +1112,8 @@ class BufferedRowResultProxy(ResultProxy):
         size = getattr(self, '_bufsize', 1)
         self.__rowbuffer = collections.deque(self.cursor.fetchmany(size))
         self._bufsize = self.size_growth.get(size, size)
-        if self.context.execution_options.get('max_row_buffer') is not None:
-            self._bufsize = min(self.context.execution_options['max_row_buffer'], self._bufsize)
+        if self._max_row_buffer is not None:
+            self._bufsize = min(self._max_row_buffer, self._bufsize)
 
     def _soft_close(self, **kw):
         self.__rowbuffer.clear()
index c7b524335c41f4859684b4cbf075653e64b08bd5..5ddf4ad094fb9296828fb2446bbed541742cfe27 100644 (file)
@@ -1,7 +1,7 @@
 # coding: utf-8
 
 from sqlalchemy.testing import eq_, assert_raises, assert_raises_message, \
-    config, is_, is_not_
+    config, is_, is_not_, le_
 import re
 from sqlalchemy.testing.util import picklers
 from sqlalchemy.interfaces import ConnectionProxy
@@ -1047,76 +1047,91 @@ class ExecutionOptionsTest(fixtures.TestBase):
         )
 
 
-class AlternateResultProxyTest(fixtures.TestBase):
+class AlternateResultProxyTest(fixtures.TablesTest):
     __requires__ = ('sqlite', )
 
     @classmethod
-    def setup_class(cls):
+    def setup_bind(cls):
         cls.engine = engine = testing_engine('sqlite://')
-        m = MetaData()
-        cls.table = t = Table('test', m,
-                              Column('x', Integer, primary_key=True),
-                              Column('y', String(50, convert_unicode='force'))
-                              )
-        m.create_all(engine)
-        engine.execute(t.insert(), [
+        return engine
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table(
+            'test', metadata,
+            Column('x', Integer, primary_key=True),
+            Column('y', String(50, convert_unicode='force'))
+        )
+
+    @classmethod
+    def insert_data(cls):
+        cls.engine.execute(cls.tables.test.insert(), [
             {'x': i, 'y': "t_%d" % i} for i in range(1, 12)
         ])
 
-    def _test_proxy(self, cls):
+    @contextmanager
+    def _proxy_fixture(self, cls):
+        self.table = self.tables.test
+
         class ExcCtx(default.DefaultExecutionContext):
 
             def get_result_proxy(self):
                 return cls(self)
-        self.engine.dialect.execution_ctx_cls = ExcCtx
-        rows = []
-        r = self.engine.execute(select([self.table]))
-        assert isinstance(r, cls)
-        for i in range(5):
-            rows.append(r.fetchone())
-        eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+        self.patcher = patch.object(
+            self.engine.dialect, "execution_ctx_cls", ExcCtx)
+        with self.patcher:
+            yield
 
-        rows = r.fetchmany(3)
-        eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
+    def _test_proxy(self, cls):
+        with self._proxy_fixture(cls):
+            rows = []
+            r = self.engine.execute(select([self.table]))
+            assert isinstance(r, cls)
+            for i in range(5):
+                rows.append(r.fetchone())
+            eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
 
-        rows = r.fetchall()
-        eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
+            rows = r.fetchmany(3)
+            eq_(rows, [(i, "t_%d" % i) for i in range(6, 9)])
 
-        r = self.engine.execute(select([self.table]))
-        rows = r.fetchmany(None)
-        eq_(rows[0], (1, "t_1"))
-        # number of rows here could be one, or the whole thing
-        assert len(rows) == 1 or len(rows) == 11
+            rows = r.fetchall()
+            eq_(rows, [(i, "t_%d" % i) for i in range(9, 12)])
 
-        r = self.engine.execute(select([self.table]).limit(1))
-        r.fetchone()
-        eq_(r.fetchone(), None)
+            r = self.engine.execute(select([self.table]))
+            rows = r.fetchmany(None)
+            eq_(rows[0], (1, "t_1"))
+            # number of rows here could be one, or the whole thing
+            assert len(rows) == 1 or len(rows) == 11
 
-        r = self.engine.execute(select([self.table]).limit(5))
-        rows = r.fetchmany(6)
-        eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
+            r = self.engine.execute(select([self.table]).limit(1))
+            r.fetchone()
+            eq_(r.fetchone(), None)
 
-        # result keeps going just fine with blank results...
-        eq_(r.fetchmany(2), [])
+            r = self.engine.execute(select([self.table]).limit(5))
+            rows = r.fetchmany(6)
+            eq_(rows, [(i, "t_%d" % i) for i in range(1, 6)])
 
-        eq_(r.fetchmany(2), [])
+            # result keeps going just fine with blank results...
+            eq_(r.fetchmany(2), [])
 
-        eq_(r.fetchall(), [])
+            eq_(r.fetchmany(2), [])
 
-        eq_(r.fetchone(), None)
+            eq_(r.fetchall(), [])
 
-        # until we close
-        r.close()
+            eq_(r.fetchone(), None)
 
-        self._assert_result_closed(r)
+            # until we close
+            r.close()
 
-        r = self.engine.execute(select([self.table]).limit(5))
-        eq_(r.first(), (1, "t_1"))
-        self._assert_result_closed(r)
+            self._assert_result_closed(r)
 
-        r = self.engine.execute(select([self.table]).limit(5))
-        eq_(r.scalar(), 1)
-        self._assert_result_closed(r)
+            r = self.engine.execute(select([self.table]).limit(5))
+            eq_(r.first(), (1, "t_1"))
+            self._assert_result_closed(r)
+
+            r = self.engine.execute(select([self.table]).limit(5))
+            eq_(r.scalar(), 1)
+            self._assert_result_closed(r)
 
     def _assert_result_closed(self, r):
         assert_raises_message(
@@ -1149,6 +1164,54 @@ class AlternateResultProxyTest(fixtures.TestBase):
     def test_buffered_column_result_proxy(self):
         self._test_proxy(_result.BufferedColumnResultProxy)
 
+    def test_buffered_row_growth(self):
+        with self._proxy_fixture(_result.BufferedRowResultProxy):
+            with self.engine.connect() as conn:
+                conn.execute(self.table.insert(), [
+                    {'x': i, 'y': "t_%d" % i} for i in range(15, 1200)
+                ])
+                result = conn.execute(self.table.select())
+                checks = {
+                    0: 5, 1: 10, 9: 20, 135: 250, 274: 500,
+                    1351: 1000
+                }
+                for idx, row in enumerate(result, 0):
+                    if idx in checks:
+                        eq_(result._bufsize, checks[idx])
+                    le_(
+                        len(result._BufferedRowResultProxy__rowbuffer),
+                        1000
+                    )
+
+    def test_max_row_buffer_option(self):
+        with self._proxy_fixture(_result.BufferedRowResultProxy):
+            with self.engine.connect() as conn:
+                conn.execute(self.table.insert(), [
+                    {'x': i, 'y': "t_%d" % i} for i in range(15, 1200)
+                ])
+                result = conn.execute(self.table.select())
+                checks = {
+                    0: 5, 1: 10, 9: 20,
+                }
+                for idx, row in enumerate(result, 0):
+                    if idx in checks:
+                        eq_(result._bufsize, checks[idx])
+                    le_(
+                        len(result._BufferedRowResultProxy__rowbuffer),
+                        1000
+                    )
+
+                result = conn.execution_options(max_row_buffer=27).execute(
+                    self.table.select()
+                )
+                for idx, row in enumerate(result, 0):
+                    if idx in (16, 70, 150, 250):
+                        eq_(result._bufsize, 27)
+                    le_(
+                        len(result._BufferedRowResultProxy__rowbuffer),
+                        27
+                    )
+
 
 class EngineEventsTest(fixtures.TestBase):
     __requires__ = 'ad_hoc_engines',
index 41c0e2a210731811addd0584bd6ae856434629ab..62c97ec90d0a7102624f88ad7167e7080fe7c98f 100644 (file)
@@ -2675,10 +2675,12 @@ class YieldTest(_fixtures.FixtureTest):
         User = self.classes.User
 
         sess = create_session()
-        q = sess.query(User).yield_per(1)
+        q = sess.query(User).yield_per(15)
         q = q.execution_options(foo='bar')
         assert q._yield_per
-        eq_(q._execution_options, {"stream_results": True, "foo": "bar"})
+        eq_(
+            q._execution_options,
+            {"stream_results": True, "foo": "bar", "max_row_buffer": 15})
 
     def test_no_joinedload_opt(self):
         self._eagerload_mappings()