From: Yuval Dinari <> Date: Mon, 12 Aug 2019 14:44:59 +0000 (-0400) Subject: Add new executemany_mode, support for psycopg2.extras.execute_values() X-Git-Tag: rel_1_3_7~2^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=02d2cce2d0391b2f5acf3691d00bd2d8c4a090c5;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Add new executemany_mode, support for psycopg2.extras.execute_values() Added new dialect flag for the psycopg2 dialect, ``executemany_mode`` which supersedes the previous experimental ``use_batch_mode`` flag. ``executemany_mode`` supports both the "execute batch" and "execute values" functions provided by psycopg2, the latter which is used for compiled :func:`.insert` constructs. Pull request courtesy Yuval Dinari. .. seealso:: :ref:`executemany_mode` Fixes: #4623 Closes: #4764 Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/4764 Pull-request-sha: c3d3a36f7eb66c86d14ed9c1c31b4b48bd204855 Change-Id: I77e26ca729f9317af1488a6c054c23fa1a6b622b (cherry picked from commit 65f8edd45816f91688220b68cc0563797c3dc4ba) --- diff --git a/doc/build/changelog/unreleased_13/4623.rst b/doc/build/changelog/unreleased_13/4623.rst new file mode 100644 index 0000000000..34ea80e372 --- /dev/null +++ b/doc/build/changelog/unreleased_13/4623.rst @@ -0,0 +1,16 @@ +.. change:: + :tags: usecase, postgresql + :tickets: 4623 + + Added new dialect flag for the psycopg2 dialect, ``executemany_mode`` which + supersedes the previous experimental ``use_batch_mode`` flag. + ``executemany_mode`` supports both the "execute batch" and "execute values" + functions provided by psycopg2, the latter which is used for compiled + :func:`.insert` constructs. Pull request courtesy Yuval Dinari. + + .. seealso:: + + :ref:`psycopg2_executemany_mode` + + + diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index cd875e71c8..26014dadce 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -52,15 +52,18 @@ psycopg2-specific keyword arguments which are accepted by :ref:`psycopg2_unicode` -* ``use_batch_mode``: This flag allows ``psycopg2.extras.execute_batch`` - for ``cursor.executemany()`` calls performed by the :class:`.Engine`. - It is currently experimental but - may well become True by default as it is critical for executemany - performance. +* ``executemany_mode``, ``executemany_batch_page_size``, + ``executemany_values_page_size``: Allows use of psycopg2 + extensions for optimizing "executemany"-stye queries. See the referenced + section below for details. .. seealso:: - :ref:`psycopg2_batch_mode` + :ref:`psycopg2_executemany_mode` + +* ``use_batch_mode``: this is the previous setting used to affect "executemany" + mode and is now deprecated. + Unix Domain Connections ------------------------ @@ -136,34 +139,83 @@ The following DBAPI-specific options are respected when used with .. versionadded:: 1.0.6 -.. _psycopg2_batch_mode: +.. _psycopg2_executemany_mode: -Psycopg2 Batch Mode (Fast Execution) ------------------------------------- +Psycopg2 Fast Execution Helpers +------------------------------- Modern versions of psycopg2 include a feature known as `Fast Execution Helpers \ -`_, -which have been shown in benchmarking to improve psycopg2's executemany() -performance with INSERTS by multiple orders of magnitude. SQLAlchemy -allows this extension to be used for all ``executemany()`` style calls -invoked by an :class:`.Engine` when used with :ref:`multiple parameter sets `, -by adding the ``use_batch_mode`` flag to :func:`.create_engine`:: +`_, which +have been shown in benchmarking to improve psycopg2's executemany() +performance, primarily with INSERT statements, by multiple orders of magnitude. +SQLAlchemy allows this extension to be used for all ``executemany()`` style +calls invoked by an :class:`.Engine` when used with :ref:`multiple parameter +sets `, which includes the use of this feature both by the +Core as well as by the ORM for inserts of objects with non-autogenerated +primary key values, by adding the ``executemany_mode`` flag to +:func:`.create_engine`:: engine = create_engine( "postgresql+psycopg2://scott:tiger@host/dbname", - use_batch_mode=True) + executemany_mode='batch') + + +.. versionchanged:: 1.3.7 - the ``use_batch_mode`` flag has been superseded + by a new parameter ``executemany_mode`` which provides support both for + psycopg2's ``execute_batch`` helper as well as the ``execute_values`` + helper. + +Possible options for ``executemany_mode`` include: + +* ``None`` - By default, psycopg2's extensions are not used, and the usual + ``cursor.executemany()`` method is used when invoking batches of statements. + +* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` so that multiple copies + of a SQL query, each one corresponding to a parameter set passed to + ``executemany()``, are joined into a single SQL string separated by a + semicolon. This is the same behavior as was provided by the + ``use_batch_mode=True`` flag. + +* ``'values'``- For Core :func:`.insert` constructs only (including those + emitted by the ORM automatically), the ``psycopg2.extras.execute_values`` + extension is used so that multiple parameter sets are grouped into a single + INSERT statement and joined together with multiple VALUES expressions. This + method requires that the string text of the VALUES clause inside the + INSERT statement is manipulated, so is only supported with a compiled + :func:`.insert` construct where the format is predictable. For all other + constructs, including plain textual INSERT statements not rendered by the + SQLAlchemy expression language compiler, the + ``psycopg2.extras.execute_batch`` method is used. It is therefore important + to note that **"values" mode implies that "batch" mode is also used for + all statements for which "values" mode does not apply**. + +For both strategies, the ``executemany_batch_page_size`` and +``executemany_values_page_size`` arguments control how many parameter sets +should be represented in each execution. Because "values" mode implies a +fallback down to "batch" mode for non-INSERT statements, there are two +independent page size arguments. For each, the default value of ``None`` means +to use psycopg2's defaults, which at the time of this writing are quite low at +100. For the ``execute_values`` method, a number as high as 10000 may prove +to be performant, whereas for ``execute_batch``, as the number represents +full statements repeated, a number closer to the default of 100 is likely +more appropriate:: -Batch mode is considered to be **experimental** at this time, however may -be enabled by default in a future release. + engine = create_engine( + "postgresql+psycopg2://scott:tiger@host/dbname", + executemany_mode='values', + executemany_values_page_size=10000, executemany_batch_page_size=500) -.. seealso:: - :ref:`execute_multiple` - demonstrates how to use DBAPI ``executemany()`` - with the :class:`.Connection` object. +.. seealso:: -.. versionadded:: 1.2.0 + :ref:`execute_multiple` - General information on using the + :class:`.Connection` object to execute statements in such a way as to make + use of the DBAPI ``.executemany()`` method. +.. versionchanged:: 1.3.7 - Added support for + ``psycopg2.extras.execute_values``. The ``use_batch_mode`` flag is + superseded by the ``executemany_mode`` flag. .. _psycopg2_unicode: @@ -558,6 +610,11 @@ class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer): pass +EXECUTEMANY_DEFAULT = util.symbol("executemany_default") +EXECUTEMANY_BATCH = util.symbol("executemany_batch") +EXECUTEMANY_VALUES = util.symbol("executemany_values") + + class PGDialect_psycopg2(PGDialect): driver = "psycopg2" if util.py2k: @@ -603,6 +660,13 @@ class PGDialect_psycopg2(PGDialect): }, ) + @util.deprecated_params( + use_batch_mode=( + "1.3.7", + "The psycopg2 use_batch_mode flag is superseded by " + "executemany_mode='batch'", + ) + ) def __init__( self, server_side_cursors=False, @@ -610,7 +674,10 @@ class PGDialect_psycopg2(PGDialect): client_encoding=None, use_native_hstore=True, use_native_uuid=True, - use_batch_mode=False, + executemany_mode=None, + executemany_batch_page_size=None, + executemany_values_page_size=None, + use_batch_mode=None, **kwargs ): PGDialect.__init__(self, **kwargs) @@ -620,7 +687,24 @@ class PGDialect_psycopg2(PGDialect): self.use_native_uuid = use_native_uuid self.supports_unicode_binds = use_native_unicode self.client_encoding = client_encoding - self.psycopg2_batch_mode = use_batch_mode + + # Parse executemany_mode argument, allowing it to be only one of the + # symbol names + self.executemany_mode = util.symbol.parse_user_argument( + executemany_mode, + { + EXECUTEMANY_DEFAULT: [None], + EXECUTEMANY_BATCH: ["batch"], + EXECUTEMANY_VALUES: ["values"], + }, + "executemany_mode", + ) + if use_batch_mode: + self.executemany_mode = EXECUTEMANY_BATCH + + self.executemany_batch_page_size = executemany_batch_page_size + self.executemany_values_page_size = executemany_values_page_size + if self.dbapi and hasattr(self.dbapi, "__version__"): m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__) if m: @@ -645,7 +729,7 @@ class PGDialect_psycopg2(PGDialect): self.supports_sane_multi_rowcount = ( self.psycopg2_version >= self.FEATURE_VERSION_MAP["sane_multi_rowcount"] - and not self.psycopg2_batch_mode + and self.executemany_mode is EXECUTEMANY_DEFAULT ) @classmethod @@ -765,12 +849,48 @@ class PGDialect_psycopg2(PGDialect): else: return None + _insert_values_match = re.compile(r".* VALUES (\(.+\))").match + def do_executemany(self, cursor, statement, parameters, context=None): - if self.psycopg2_batch_mode: - extras = self._psycopg2_extras() - extras.execute_batch(cursor, statement, parameters) - else: + if self.executemany_mode is EXECUTEMANY_DEFAULT: cursor.executemany(statement, parameters) + return + + if ( + self.executemany_mode is EXECUTEMANY_VALUES + and context + and context.isinsert + ): + executemany_values = self._insert_values_match(statement) + else: + executemany_values = None + + if executemany_values: + # Currently, SQLAlchemy does not pass "RETURNING" statements + # into executemany(), since no DBAPI has ever supported that + # until the introduction of psycopg2's executemany_values, so + # we are not yet using the fetch=True flag. + statement = statement.replace(executemany_values.group(1), "%s") + if self.executemany_values_page_size: + kwargs = {"page_size": self.executemany_values_page_size} + else: + kwargs = {} + self._psycopg2_extras().execute_values( + cursor, + statement, + parameters, + template=executemany_values.group(1), + **kwargs + ) + + else: + if self.executemany_batch_page_size: + kwargs = {"page_size": self.executemany_batch_page_size} + else: + kwargs = {} + self._psycopg2_extras().execute_batch( + cursor, statement, parameters, **kwargs + ) @util.memoized_instancemethod def _hstore_oids(self, conn): diff --git a/test/dialect/postgresql/test_dialect.py b/test/dialect/postgresql/test_dialect.py index 0bbfe50fb5..798831cd38 100644 --- a/test/dialect/postgresql/test_dialect.py +++ b/test/dialect/postgresql/test_dialect.py @@ -1,5 +1,5 @@ # coding: utf-8 - +import contextlib import datetime import logging import logging.handlers @@ -28,12 +28,16 @@ from sqlalchemy import text from sqlalchemy import TypeDecorator from sqlalchemy.dialects.postgresql import base as postgresql from sqlalchemy.dialects.postgresql import psycopg2 as psycopg2_dialect +from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_BATCH +from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_DEFAULT +from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_VALUES from sqlalchemy.engine import engine_from_config from sqlalchemy.engine import url from sqlalchemy.testing import engines from sqlalchemy.testing import expect_deprecated from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ +from sqlalchemy.testing import mock from sqlalchemy.testing.assertions import assert_raises from sqlalchemy.testing.assertions import assert_raises_message from sqlalchemy.testing.assertions import AssertsCompiledSQL @@ -41,7 +45,6 @@ from sqlalchemy.testing.assertions import AssertsExecutionResults from sqlalchemy.testing.assertions import eq_ from sqlalchemy.testing.assertions import eq_regex from sqlalchemy.testing.assertions import ne_ -from sqlalchemy.testing.mock import Mock from ...engine import test_execute @@ -50,8 +53,10 @@ class DialectTest(fixtures.TestBase): def test_version_parsing(self): def mock_conn(res): - return Mock( - execute=Mock(return_value=Mock(scalar=Mock(return_value=res))) + return mock.Mock( + execute=mock.Mock( + return_value=mock.Mock(scalar=mock.Mock(return_value=res)) + ) ) dialect = postgresql.dialect() @@ -153,12 +158,14 @@ class DialectTest(fixtures.TestBase): eq_(cparams, {"host": "somehost", "any_random_thing": "yes"}) -class BatchInsertsTest(fixtures.TablesTest): +class ExecuteManyMode(object): __only_on__ = "postgresql+psycopg2" __backend__ = True run_create_tables = "each" + options = None + @classmethod def define_tables(cls, metadata): Table( @@ -170,13 +177,18 @@ class BatchInsertsTest(fixtures.TablesTest): Column("z", Integer, server_default="5"), ) + @contextlib.contextmanager + def expect_deprecated_opts(self): + yield + def setup(self): - super(BatchInsertsTest, self).setup() - self.engine = engines.testing_engine(options={"use_batch_mode": True}) + super(ExecuteManyMode, self).setup() + with self.expect_deprecated_opts(): + self.engine = engines.testing_engine(options=self.options) def teardown(self): self.engine.dispose() - super(BatchInsertsTest, self).teardown() + super(ExecuteManyMode, self).teardown() def test_insert(self): with self.engine.connect() as conn: @@ -194,6 +206,133 @@ class BatchInsertsTest(fixtures.TablesTest): [(1, "x1", "y1", 5), (2, "x2", "y2", 5), (3, "x3", "y3", 5)], ) + def test_insert_no_page_size(self): + from psycopg2 import extras + + eng = self.engine + if eng.dialect.executemany_mode is EXECUTEMANY_BATCH: + meth = extras.execute_batch + stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)" + expected_kwargs = {} + else: + meth = extras.execute_values + stmt = "INSERT INTO data (x, y) VALUES %s" + expected_kwargs = {"template": "(%(x)s, %(y)s)"} + + with mock.patch.object( + extras, meth.__name__, side_effect=meth + ) as mock_exec: + with eng.connect() as conn: + conn.execute( + self.tables.data.insert(), + [ + {"x": "x1", "y": "y1"}, + {"x": "x2", "y": "y2"}, + {"x": "x3", "y": "y3"}, + ], + ) + + eq_( + mock_exec.mock_calls, + [ + mock.call( + mock.ANY, + stmt, + ( + {"x": "x1", "y": "y1"}, + {"x": "x2", "y": "y2"}, + {"x": "x3", "y": "y3"}, + ), + **expected_kwargs + ) + ], + ) + + def test_insert_page_size(self): + opts = self.options.copy() + opts["executemany_batch_page_size"] = 500 + opts["executemany_values_page_size"] = 1000 + + with self.expect_deprecated_opts(): + eng = engines.testing_engine(options=opts) + + from psycopg2 import extras + + if eng.dialect.executemany_mode is EXECUTEMANY_BATCH: + meth = extras.execute_batch + stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)" + expected_kwargs = {"page_size": 500} + else: + meth = extras.execute_values + stmt = "INSERT INTO data (x, y) VALUES %s" + expected_kwargs = {"page_size": 1000, "template": "(%(x)s, %(y)s)"} + + with mock.patch.object( + extras, meth.__name__, side_effect=meth + ) as mock_exec: + with eng.connect() as conn: + conn.execute( + self.tables.data.insert(), + [ + {"x": "x1", "y": "y1"}, + {"x": "x2", "y": "y2"}, + {"x": "x3", "y": "y3"}, + ], + ) + + eq_( + mock_exec.mock_calls, + [ + mock.call( + mock.ANY, + stmt, + ( + {"x": "x1", "y": "y1"}, + {"x": "x2", "y": "y2"}, + {"x": "x3", "y": "y3"}, + ), + **expected_kwargs + ) + ], + ) + + def test_update_fallback(self): + from psycopg2 import extras + + eng = self.engine + meth = extras.execute_batch + stmt = "UPDATE data SET y=%(yval)s WHERE data.x = %(xval)s" + expected_kwargs = {} + + with mock.patch.object( + extras, meth.__name__, side_effect=meth + ) as mock_exec: + with eng.connect() as conn: + conn.execute( + self.tables.data.update() + .where(self.tables.data.c.x == bindparam("xval")) + .values(y=bindparam("yval")), + [ + {"xval": "x1", "yval": "y5"}, + {"xval": "x3", "yval": "y6"}, + ], + ) + + eq_( + mock_exec.mock_calls, + [ + mock.call( + mock.ANY, + stmt, + ( + {"xval": "x1", "yval": "y5"}, + {"xval": "x3", "yval": "y6"}, + ), + **expected_kwargs + ) + ], + ) + def test_not_sane_rowcount(self): self.engine.connect().close() assert not self.engine.dialect.supports_sane_multi_rowcount @@ -223,6 +362,49 @@ class BatchInsertsTest(fixtures.TablesTest): ) +class UseBatchModeTest(ExecuteManyMode, fixtures.TablesTest): + options = {"use_batch_mode": True} + + def expect_deprecated_opts(self): + return expect_deprecated( + "The psycopg2 use_batch_mode flag is superseded by " + "executemany_mode='batch'" + ) + + +class ExecutemanyBatchModeTest(ExecuteManyMode, fixtures.TablesTest): + options = {"executemany_mode": "batch"} + + +class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest): + options = {"executemany_mode": "values"} + + +class ExecutemanyFlagOptionsTest(fixtures.TablesTest): + __only_on__ = "postgresql+psycopg2" + __backend__ = True + + def test_executemany_correct_flag_options(self): + for opt, expected in [ + (None, EXECUTEMANY_DEFAULT), + ("batch", EXECUTEMANY_BATCH), + ("values", EXECUTEMANY_VALUES), + ]: + self.engine = engines.testing_engine( + options={"executemany_mode": opt} + ) + is_(self.engine.dialect.executemany_mode, expected) + + def test_executemany_wrong_flag_options(self): + for opt in [1, True, "batch_insert"]: + assert_raises_message( + exc.ArgumentError, + "Invalid value for 'executemany_mode': %r" % opt, + engines.testing_engine, + options={"executemany_mode": opt}, + ) + + class MiscBackendTest( fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL ): @@ -503,7 +685,9 @@ $$ LANGUAGE plpgsql; if version: dialect = postgresql.dialect() - dialect._get_server_version_info = Mock(return_value=version) + dialect._get_server_version_info = mock.Mock( + return_value=version + ) dialect.initialize(testing.db.connect()) else: dialect = testing.db.dialect