From: sumau Date: Mon, 28 Oct 2019 19:22:08 +0000 (-0400) Subject: Use simple growth scale with any max size for BufferedRowResultProxy X-Git-Tag: rel_1_4_0b1~647^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=d36b1f7f03841b9b346a6fd3395dd29333dce588;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Use simple growth scale with any max size for BufferedRowResultProxy The maximum buffer size for the :class:`.BufferedRowResultProxy`, which is used by dialects such as PostgreSQL when ``stream_results=True``, can now be set to a number greater than 1000 and the buffer will grow to that size. Previously, the buffer would not go beyond 1000 even if the value were set larger. The growth of the buffer is also now based on a simple multiplying factor currently set to 5. Pull request courtesy Soumaya Mauthoor. Fixes: #4914 Closes: #4930 Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/4930 Pull-request-sha: 66841f56e967c784f7078a787cec5129462006c8 Change-Id: I6286220bd9d488027fadc444039421a410e19a19 --- diff --git a/doc/build/changelog/unreleased_14/4914.rst b/doc/build/changelog/unreleased_14/4914.rst new file mode 100644 index 0000000000..49ad919681 --- /dev/null +++ b/doc/build/changelog/unreleased_14/4914.rst @@ -0,0 +1,12 @@ +.. change:: + :tags: usecase, postgresql + :tickets: 4914 + + The maximum buffer size for the :class:`.BufferedRowResultProxy`, which + is used by dialects such as PostgreSQL when ``stream_results=True``, can + now be set to a number greater than 1000 and the buffer will grow to + that size. Previously, the buffer would not go beyond 1000 even if the + value were set larger. The growth of the buffer is also now based + on a simple multiplying factor currently set to 5. Pull request courtesy + Soumaya Mauthoor. + diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index 1a4db1108d..14d49ee152 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -137,7 +137,8 @@ The following DBAPI-specific options are respected when used with interpreted by the :class:`.BufferedRowResultProxy`, and if omitted the buffer will grow to ultimately store 1000 rows at a time. - .. versionadded:: 1.0.6 + .. versionchanged:: 1.4 The ``max_row_buffer`` size can now be greater than + 1000, and the buffer will grow to that size. .. _psycopg2_executemany_mode: diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py index 733bd6f6ab..004a84da55 100644 --- a/lib/sqlalchemy/engine/result.py +++ b/lib/sqlalchemy/engine/result.py @@ -1486,10 +1486,8 @@ class BufferedRowResultProxy(ResultProxy): The pre-fetching behavior fetches only one row initially, and then grows its buffer size by a fixed amount with each successive need - for additional rows up to a size of 1000. - - The size argument is configurable using the ``max_row_buffer`` - execution option:: + for additional rows up the ``max_row_buffer`` size, which defaults + to 1000:: with psycopg2_engine.connect() as conn: @@ -1497,7 +1495,7 @@ class BufferedRowResultProxy(ResultProxy): stream_results=True, max_row_buffer=50 ).execute("select * from table") - .. versionadded:: 1.0.6 Added the ``max_row_buffer`` option. + .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows. .. seealso:: @@ -1506,34 +1504,21 @@ class BufferedRowResultProxy(ResultProxy): def _init_metadata(self): self._max_row_buffer = self.context.execution_options.get( - "max_row_buffer", None + "max_row_buffer", 1000 ) + self._growth_factor = 5 self.__buffer_rows() super(BufferedRowResultProxy, self)._init_metadata() - # this is a "growth chart" for the buffering of rows. - # each successive __buffer_rows call will use the next - # value in the list for the buffer size until the max - # is reached - size_growth = { - 1: 5, - 5: 10, - 10: 20, - 20: 50, - 50: 100, - 100: 250, - 250: 500, - 500: 1000, - } - def __buffer_rows(self): if self.cursor is None: return size = getattr(self, "_bufsize", 1) self.__rowbuffer = collections.deque(self.cursor.fetchmany(size)) - self._bufsize = self.size_growth.get(size, size) - if self._max_row_buffer is not None: - self._bufsize = min(self._max_row_buffer, self._bufsize) + if size < self._max_row_buffer: + self._bufsize = min( + self._max_row_buffer, size * self._growth_factor + ) def _soft_close(self, **kw): self.__rowbuffer.clear() diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index 2563c7d0c0..794508a329 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -1942,31 +1942,47 @@ class AlternateResultProxyTest(fixtures.TablesTest): r = conn.execute(stmt) eq_(r.scalar(), "HI THERE") - def test_buffered_row_growth(self): + @testing.fixture + def row_growth_fixture(self): with self._proxy_fixture(_result.BufferedRowResultProxy): with self.engine.connect() as conn: conn.execute( self.table.insert(), - [{"x": i, "y": "t_%d" % i} for i in range(15, 1200)], + [{"x": i, "y": "t_%d" % i} for i in range(15, 3000)], ) - result = conn.execute(self.table.select()) - checks = {0: 5, 1: 10, 9: 20, 135: 250, 274: 500, 1351: 1000} - for idx, row in enumerate(result, 0): - if idx in checks: - eq_(result._bufsize, checks[idx]) - le_(len(result._BufferedRowResultProxy__rowbuffer), 1000) - - def test_max_row_buffer_option(self): - with self._proxy_fixture(_result.BufferedRowResultProxy): - with self.engine.connect() as conn: - conn.execute( - self.table.insert(), - [{"x": i, "y": "t_%d" % i} for i in range(15, 1200)], - ) - result = conn.execution_options(max_row_buffer=27).execute( - self.table.select() - ) - for idx, row in enumerate(result, 0): - if idx in (16, 70, 150, 250): - eq_(result._bufsize, 27) - le_(len(result._BufferedRowResultProxy__rowbuffer), 27) + yield conn + + @testing.combinations( + ("no option", None, {0: 5, 1: 25, 9: 125, 135: 625, 274: 1000}), + ("lt 1000", 27, {0: 5, 16: 27, 70: 27, 150: 27, 250: 27}), + ( + "gt 1000", + 1500, + {0: 5, 1: 25, 9: 125, 135: 625, 274: 1500, 1351: 1500}, + ), + ( + "gt 1500", + 2000, + {0: 5, 1: 25, 9: 125, 135: 625, 274: 2000, 1351: 2000}, + ), + id_="iaa", + argnames="max_row_buffer,checks", + ) + def test_buffered_row_growth( + self, row_growth_fixture, max_row_buffer, checks + ): + if max_row_buffer: + result = row_growth_fixture.execution_options( + max_row_buffer=max_row_buffer + ).execute(self.table.select()) + else: + result = row_growth_fixture.execute(self.table.select()) + + assertion = {} + max_size = max(checks.values()) + for idx, row in enumerate(result, 0): + if idx in checks: + assertion[idx] = result._bufsize + le_(len(result._BufferedRowResultProxy__rowbuffer), max_size) + + eq_(checks, assertion)