]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Add new executemany_mode, support for psycopg2.extras.execute_values()
authorYuval Dinari <>
Mon, 12 Aug 2019 14:44:59 +0000 (10:44 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 12 Aug 2019 21:25:59 +0000 (17:25 -0400)
Added new dialect flag for the psycopg2 dialect, ``executemany_mode`` which
supersedes the previous experimental ``use_batch_mode`` flag.
``executemany_mode`` supports both the "execute batch" and "execute values"
functions provided by psycopg2, the latter which is used for compiled
:func:`.insert` constructs.   Pull request courtesy Yuval Dinari.

.. seealso::

    :ref:`executemany_mode`

Fixes: #4623
Closes: #4764
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/4764
Pull-request-sha: c3d3a36f7eb66c86d14ed9c1c31b4b48bd204855

Change-Id: I77e26ca729f9317af1488a6c054c23fa1a6b622b
(cherry picked from commit 65f8edd45816f91688220b68cc0563797c3dc4ba)

doc/build/changelog/unreleased_13/4623.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/postgresql/psycopg2.py
test/dialect/postgresql/test_dialect.py

diff --git a/doc/build/changelog/unreleased_13/4623.rst b/doc/build/changelog/unreleased_13/4623.rst
new file mode 100644 (file)
index 0000000..34ea80e
--- /dev/null
@@ -0,0 +1,16 @@
+.. change::
+    :tags: usecase, postgresql
+    :tickets: 4623
+
+    Added new dialect flag for the psycopg2 dialect, ``executemany_mode`` which
+    supersedes the previous experimental ``use_batch_mode`` flag.
+    ``executemany_mode`` supports both the "execute batch" and "execute values"
+    functions provided by psycopg2, the latter which is used for compiled
+    :func:`.insert` constructs.   Pull request courtesy Yuval Dinari.
+
+    .. seealso::
+
+        :ref:`psycopg2_executemany_mode`
+
+
+
index cd875e71c89a5cca5f9782c87e6525577ceea9e9..26014dadce3e5b6e62665f123946ec24123637af 100644 (file)
@@ -52,15 +52,18 @@ psycopg2-specific keyword arguments which are accepted by
 
     :ref:`psycopg2_unicode`
 
-* ``use_batch_mode``: This flag allows ``psycopg2.extras.execute_batch``
-  for ``cursor.executemany()`` calls performed by the :class:`.Engine`.
-  It is currently experimental but
-  may well become True by default as it is critical for executemany
-  performance.
+* ``executemany_mode``, ``executemany_batch_page_size``,
+  ``executemany_values_page_size``: Allows use of psycopg2
+  extensions for optimizing "executemany"-stye queries.  See the referenced
+  section below for details.
 
   .. seealso::
 
-    :ref:`psycopg2_batch_mode`
+    :ref:`psycopg2_executemany_mode`
+
+* ``use_batch_mode``: this is the previous setting used to affect "executemany"
+  mode and is now deprecated.
+
 
 Unix Domain Connections
 ------------------------
@@ -136,34 +139,83 @@ The following DBAPI-specific options are respected when used with
 
   .. versionadded:: 1.0.6
 
-.. _psycopg2_batch_mode:
+.. _psycopg2_executemany_mode:
 
-Psycopg2 Batch Mode (Fast Execution)
-------------------------------------
+Psycopg2 Fast Execution Helpers
+-------------------------------
 
 Modern versions of psycopg2 include a feature known as
 `Fast Execution Helpers \
-<http://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_,
-which have been shown in benchmarking to improve psycopg2's executemany()
-performance with INSERTS by multiple orders of magnitude.   SQLAlchemy
-allows this extension to be used for all ``executemany()`` style calls
-invoked by an :class:`.Engine` when used with :ref:`multiple parameter sets <execute_multiple>`,
-by adding the ``use_batch_mode`` flag to :func:`.create_engine`::
+<http://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_, which
+have been shown in benchmarking to improve psycopg2's executemany()
+performance, primarily with INSERT statements, by multiple orders of magnitude.
+SQLAlchemy allows this extension to be used for all ``executemany()`` style
+calls invoked by an :class:`.Engine` when used with :ref:`multiple parameter
+sets <execute_multiple>`, which includes the use of this feature both by the
+Core as well as by the ORM for inserts of objects with non-autogenerated
+primary key values, by adding the ``executemany_mode`` flag to
+:func:`.create_engine`::
 
     engine = create_engine(
         "postgresql+psycopg2://scott:tiger@host/dbname",
-        use_batch_mode=True)
+        executemany_mode='batch')
+
+
+.. versionchanged:: 1.3.7  - the ``use_batch_mode`` flag has been superseded
+   by a new parameter ``executemany_mode`` which provides support both for
+   psycopg2's ``execute_batch`` helper as well as the ``execute_values``
+   helper.
+
+Possible options for ``executemany_mode`` include:
+
+* ``None`` - By default, psycopg2's extensions are not used, and the usual
+  ``cursor.executemany()`` method is used when invoking batches of statements.
+
+* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` so that multiple copies
+  of a SQL query, each one corresponding to a parameter set passed to
+  ``executemany()``, are joined into a single SQL string separated by a
+  semicolon.   This is the same behavior as was provided by the
+  ``use_batch_mode=True`` flag.
+
+* ``'values'``- For Core :func:`.insert` constructs only (including those
+  emitted by the ORM automatically), the ``psycopg2.extras.execute_values``
+  extension is used so that multiple parameter sets are grouped into a single
+  INSERT statement and joined together with multiple VALUES expressions.   This
+  method requires that the string text of the VALUES clause inside the
+  INSERT statement is manipulated, so is only supported with a compiled
+  :func:`.insert` construct where the format is predictable.  For all other
+  constructs,  including plain textual INSERT statements not rendered  by the
+  SQLAlchemy expression language compiler, the
+  ``psycopg2.extras.execute_batch``  method is used.   It is therefore important
+  to note that **"values" mode implies that "batch" mode is also used for
+  all statements for which "values" mode does not apply**.
+
+For both strategies, the ``executemany_batch_page_size`` and
+``executemany_values_page_size`` arguments control how many parameter sets
+should be represented in each execution.  Because "values" mode implies a
+fallback down to "batch" mode for non-INSERT statements, there are two
+independent page size arguments.  For each, the default value of ``None`` means
+to use psycopg2's defaults, which at the time of this writing are quite low at
+100.   For the ``execute_values`` method, a number as high as 10000 may prove
+to be performant, whereas for ``execute_batch``, as the number represents
+full statements repeated, a number closer to the default of 100 is likely
+more appropriate::
 
-Batch mode is considered to be **experimental** at this time, however may
-be enabled by default in a future release.
+    engine = create_engine(
+        "postgresql+psycopg2://scott:tiger@host/dbname",
+        executemany_mode='values',
+        executemany_values_page_size=10000, executemany_batch_page_size=500)
 
-.. seealso::
 
-    :ref:`execute_multiple` - demonstrates how to use DBAPI ``executemany()``
-    with the :class:`.Connection` object.
+.. seealso::
 
-.. versionadded:: 1.2.0
+    :ref:`execute_multiple` - General information on using the
+    :class:`.Connection` object to execute statements in such a way as to make
+    use of the DBAPI ``.executemany()`` method.
 
+.. versionchanged:: 1.3.7 - Added support for
+   ``psycopg2.extras.execute_values``.   The ``use_batch_mode`` flag is
+   superseded by the ``executemany_mode`` flag.
 
 
 .. _psycopg2_unicode:
@@ -558,6 +610,11 @@ class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer):
     pass
 
 
+EXECUTEMANY_DEFAULT = util.symbol("executemany_default")
+EXECUTEMANY_BATCH = util.symbol("executemany_batch")
+EXECUTEMANY_VALUES = util.symbol("executemany_values")
+
+
 class PGDialect_psycopg2(PGDialect):
     driver = "psycopg2"
     if util.py2k:
@@ -603,6 +660,13 @@ class PGDialect_psycopg2(PGDialect):
         },
     )
 
+    @util.deprecated_params(
+        use_batch_mode=(
+            "1.3.7",
+            "The psycopg2 use_batch_mode flag is superseded by "
+            "executemany_mode='batch'",
+        )
+    )
     def __init__(
         self,
         server_side_cursors=False,
@@ -610,7 +674,10 @@ class PGDialect_psycopg2(PGDialect):
         client_encoding=None,
         use_native_hstore=True,
         use_native_uuid=True,
-        use_batch_mode=False,
+        executemany_mode=None,
+        executemany_batch_page_size=None,
+        executemany_values_page_size=None,
+        use_batch_mode=None,
         **kwargs
     ):
         PGDialect.__init__(self, **kwargs)
@@ -620,7 +687,24 @@ class PGDialect_psycopg2(PGDialect):
         self.use_native_uuid = use_native_uuid
         self.supports_unicode_binds = use_native_unicode
         self.client_encoding = client_encoding
-        self.psycopg2_batch_mode = use_batch_mode
+
+        # Parse executemany_mode argument, allowing it to be only one of the
+        # symbol names
+        self.executemany_mode = util.symbol.parse_user_argument(
+            executemany_mode,
+            {
+                EXECUTEMANY_DEFAULT: [None],
+                EXECUTEMANY_BATCH: ["batch"],
+                EXECUTEMANY_VALUES: ["values"],
+            },
+            "executemany_mode",
+        )
+        if use_batch_mode:
+            self.executemany_mode = EXECUTEMANY_BATCH
+
+        self.executemany_batch_page_size = executemany_batch_page_size
+        self.executemany_values_page_size = executemany_values_page_size
+
         if self.dbapi and hasattr(self.dbapi, "__version__"):
             m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__)
             if m:
@@ -645,7 +729,7 @@ class PGDialect_psycopg2(PGDialect):
         self.supports_sane_multi_rowcount = (
             self.psycopg2_version
             >= self.FEATURE_VERSION_MAP["sane_multi_rowcount"]
-            and not self.psycopg2_batch_mode
+            and self.executemany_mode is EXECUTEMANY_DEFAULT
         )
 
     @classmethod
@@ -765,12 +849,48 @@ class PGDialect_psycopg2(PGDialect):
         else:
             return None
 
+    _insert_values_match = re.compile(r".* VALUES (\(.+\))").match
+
     def do_executemany(self, cursor, statement, parameters, context=None):
-        if self.psycopg2_batch_mode:
-            extras = self._psycopg2_extras()
-            extras.execute_batch(cursor, statement, parameters)
-        else:
+        if self.executemany_mode is EXECUTEMANY_DEFAULT:
             cursor.executemany(statement, parameters)
+            return
+
+        if (
+            self.executemany_mode is EXECUTEMANY_VALUES
+            and context
+            and context.isinsert
+        ):
+            executemany_values = self._insert_values_match(statement)
+        else:
+            executemany_values = None
+
+        if executemany_values:
+            # Currently, SQLAlchemy does not pass "RETURNING" statements
+            # into executemany(), since no DBAPI has ever supported that
+            # until the introduction of psycopg2's executemany_values, so
+            # we are not yet using the fetch=True flag.
+            statement = statement.replace(executemany_values.group(1), "%s")
+            if self.executemany_values_page_size:
+                kwargs = {"page_size": self.executemany_values_page_size}
+            else:
+                kwargs = {}
+            self._psycopg2_extras().execute_values(
+                cursor,
+                statement,
+                parameters,
+                template=executemany_values.group(1),
+                **kwargs
+            )
+
+        else:
+            if self.executemany_batch_page_size:
+                kwargs = {"page_size": self.executemany_batch_page_size}
+            else:
+                kwargs = {}
+            self._psycopg2_extras().execute_batch(
+                cursor, statement, parameters, **kwargs
+            )
 
     @util.memoized_instancemethod
     def _hstore_oids(self, conn):
index 0bbfe50fb57a86e7afc0bab5075507fbb4387f8b..798831cd383a6a4824ce136fd6f0a6093d2e2b57 100644 (file)
@@ -1,5 +1,5 @@
 # coding: utf-8
-
+import contextlib
 import datetime
 import logging
 import logging.handlers
@@ -28,12 +28,16 @@ from sqlalchemy import text
 from sqlalchemy import TypeDecorator
 from sqlalchemy.dialects.postgresql import base as postgresql
 from sqlalchemy.dialects.postgresql import psycopg2 as psycopg2_dialect
+from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_BATCH
+from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_DEFAULT
+from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_VALUES
 from sqlalchemy.engine import engine_from_config
 from sqlalchemy.engine import url
 from sqlalchemy.testing import engines
 from sqlalchemy.testing import expect_deprecated
 from sqlalchemy.testing import fixtures
 from sqlalchemy.testing import is_
+from sqlalchemy.testing import mock
 from sqlalchemy.testing.assertions import assert_raises
 from sqlalchemy.testing.assertions import assert_raises_message
 from sqlalchemy.testing.assertions import AssertsCompiledSQL
@@ -41,7 +45,6 @@ from sqlalchemy.testing.assertions import AssertsExecutionResults
 from sqlalchemy.testing.assertions import eq_
 from sqlalchemy.testing.assertions import eq_regex
 from sqlalchemy.testing.assertions import ne_
-from sqlalchemy.testing.mock import Mock
 from ...engine import test_execute
 
 
@@ -50,8 +53,10 @@ class DialectTest(fixtures.TestBase):
 
     def test_version_parsing(self):
         def mock_conn(res):
-            return Mock(
-                execute=Mock(return_value=Mock(scalar=Mock(return_value=res)))
+            return mock.Mock(
+                execute=mock.Mock(
+                    return_value=mock.Mock(scalar=mock.Mock(return_value=res))
+                )
             )
 
         dialect = postgresql.dialect()
@@ -153,12 +158,14 @@ class DialectTest(fixtures.TestBase):
         eq_(cparams, {"host": "somehost", "any_random_thing": "yes"})
 
 
-class BatchInsertsTest(fixtures.TablesTest):
+class ExecuteManyMode(object):
     __only_on__ = "postgresql+psycopg2"
     __backend__ = True
 
     run_create_tables = "each"
 
+    options = None
+
     @classmethod
     def define_tables(cls, metadata):
         Table(
@@ -170,13 +177,18 @@ class BatchInsertsTest(fixtures.TablesTest):
             Column("z", Integer, server_default="5"),
         )
 
+    @contextlib.contextmanager
+    def expect_deprecated_opts(self):
+        yield
+
     def setup(self):
-        super(BatchInsertsTest, self).setup()
-        self.engine = engines.testing_engine(options={"use_batch_mode": True})
+        super(ExecuteManyMode, self).setup()
+        with self.expect_deprecated_opts():
+            self.engine = engines.testing_engine(options=self.options)
 
     def teardown(self):
         self.engine.dispose()
-        super(BatchInsertsTest, self).teardown()
+        super(ExecuteManyMode, self).teardown()
 
     def test_insert(self):
         with self.engine.connect() as conn:
@@ -194,6 +206,133 @@ class BatchInsertsTest(fixtures.TablesTest):
                 [(1, "x1", "y1", 5), (2, "x2", "y2", 5), (3, "x3", "y3", 5)],
             )
 
+    def test_insert_no_page_size(self):
+        from psycopg2 import extras
+
+        eng = self.engine
+        if eng.dialect.executemany_mode is EXECUTEMANY_BATCH:
+            meth = extras.execute_batch
+            stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)"
+            expected_kwargs = {}
+        else:
+            meth = extras.execute_values
+            stmt = "INSERT INTO data (x, y) VALUES %s"
+            expected_kwargs = {"template": "(%(x)s, %(y)s)"}
+
+        with mock.patch.object(
+            extras, meth.__name__, side_effect=meth
+        ) as mock_exec:
+            with eng.connect() as conn:
+                conn.execute(
+                    self.tables.data.insert(),
+                    [
+                        {"x": "x1", "y": "y1"},
+                        {"x": "x2", "y": "y2"},
+                        {"x": "x3", "y": "y3"},
+                    ],
+                )
+
+        eq_(
+            mock_exec.mock_calls,
+            [
+                mock.call(
+                    mock.ANY,
+                    stmt,
+                    (
+                        {"x": "x1", "y": "y1"},
+                        {"x": "x2", "y": "y2"},
+                        {"x": "x3", "y": "y3"},
+                    ),
+                    **expected_kwargs
+                )
+            ],
+        )
+
+    def test_insert_page_size(self):
+        opts = self.options.copy()
+        opts["executemany_batch_page_size"] = 500
+        opts["executemany_values_page_size"] = 1000
+
+        with self.expect_deprecated_opts():
+            eng = engines.testing_engine(options=opts)
+
+        from psycopg2 import extras
+
+        if eng.dialect.executemany_mode is EXECUTEMANY_BATCH:
+            meth = extras.execute_batch
+            stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)"
+            expected_kwargs = {"page_size": 500}
+        else:
+            meth = extras.execute_values
+            stmt = "INSERT INTO data (x, y) VALUES %s"
+            expected_kwargs = {"page_size": 1000, "template": "(%(x)s, %(y)s)"}
+
+        with mock.patch.object(
+            extras, meth.__name__, side_effect=meth
+        ) as mock_exec:
+            with eng.connect() as conn:
+                conn.execute(
+                    self.tables.data.insert(),
+                    [
+                        {"x": "x1", "y": "y1"},
+                        {"x": "x2", "y": "y2"},
+                        {"x": "x3", "y": "y3"},
+                    ],
+                )
+
+        eq_(
+            mock_exec.mock_calls,
+            [
+                mock.call(
+                    mock.ANY,
+                    stmt,
+                    (
+                        {"x": "x1", "y": "y1"},
+                        {"x": "x2", "y": "y2"},
+                        {"x": "x3", "y": "y3"},
+                    ),
+                    **expected_kwargs
+                )
+            ],
+        )
+
+    def test_update_fallback(self):
+        from psycopg2 import extras
+
+        eng = self.engine
+        meth = extras.execute_batch
+        stmt = "UPDATE data SET y=%(yval)s WHERE data.x = %(xval)s"
+        expected_kwargs = {}
+
+        with mock.patch.object(
+            extras, meth.__name__, side_effect=meth
+        ) as mock_exec:
+            with eng.connect() as conn:
+                conn.execute(
+                    self.tables.data.update()
+                    .where(self.tables.data.c.x == bindparam("xval"))
+                    .values(y=bindparam("yval")),
+                    [
+                        {"xval": "x1", "yval": "y5"},
+                        {"xval": "x3", "yval": "y6"},
+                    ],
+                )
+
+        eq_(
+            mock_exec.mock_calls,
+            [
+                mock.call(
+                    mock.ANY,
+                    stmt,
+                    (
+                        {"xval": "x1", "yval": "y5"},
+                        {"xval": "x3", "yval": "y6"},
+                    ),
+                    **expected_kwargs
+                )
+            ],
+        )
+
     def test_not_sane_rowcount(self):
         self.engine.connect().close()
         assert not self.engine.dialect.supports_sane_multi_rowcount
@@ -223,6 +362,49 @@ class BatchInsertsTest(fixtures.TablesTest):
             )
 
 
+class UseBatchModeTest(ExecuteManyMode, fixtures.TablesTest):
+    options = {"use_batch_mode": True}
+
+    def expect_deprecated_opts(self):
+        return expect_deprecated(
+            "The psycopg2 use_batch_mode flag is superseded by "
+            "executemany_mode='batch'"
+        )
+
+
+class ExecutemanyBatchModeTest(ExecuteManyMode, fixtures.TablesTest):
+    options = {"executemany_mode": "batch"}
+
+
+class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest):
+    options = {"executemany_mode": "values"}
+
+
+class ExecutemanyFlagOptionsTest(fixtures.TablesTest):
+    __only_on__ = "postgresql+psycopg2"
+    __backend__ = True
+
+    def test_executemany_correct_flag_options(self):
+        for opt, expected in [
+            (None, EXECUTEMANY_DEFAULT),
+            ("batch", EXECUTEMANY_BATCH),
+            ("values", EXECUTEMANY_VALUES),
+        ]:
+            self.engine = engines.testing_engine(
+                options={"executemany_mode": opt}
+            )
+            is_(self.engine.dialect.executemany_mode, expected)
+
+    def test_executemany_wrong_flag_options(self):
+        for opt in [1, True, "batch_insert"]:
+            assert_raises_message(
+                exc.ArgumentError,
+                "Invalid value for 'executemany_mode': %r" % opt,
+                engines.testing_engine,
+                options={"executemany_mode": opt},
+            )
+
+
 class MiscBackendTest(
     fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL
 ):
@@ -503,7 +685,9 @@ $$ LANGUAGE plpgsql;
 
             if version:
                 dialect = postgresql.dialect()
-                dialect._get_server_version_info = Mock(return_value=version)
+                dialect._get_server_version_info = mock.Mock(
+                    return_value=version
+                )
                 dialect.initialize(testing.db.connect())
             else:
                 dialect = testing.db.dialect