]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Add fast execution helper support.
authorMike Bayer <mike_mp@zzzcomputing.com>
Tue, 10 Oct 2017 17:33:59 +0000 (13:33 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 10 Oct 2017 17:43:35 +0000 (13:43 -0400)
Added a new flag ``use_batch_mode`` to the psycopg2 dialect.  This flag
enables the use of psycopg2's ``psycopg2.extras.execute_batch``
extension when the :class:`.Engine` calls upon ``cursor.executemany()``.
This extension provides a critical performance increase by over an order of magnitude
when running INSERT statements in batch.  The flag is False by default
as it is considered to be experimental for now.

Change-Id: Ib88d28bc792958d47109f644ff1d08c897db4ff7
Fixes: #4109
doc/build/changelog/migration_12.rst
doc/build/faq/performance.rst
lib/sqlalchemy/dialects/postgresql/psycopg2.py
test/dialect/postgresql/test_dialect.py

index 4ded828c44dbfefd5a3260bedbb654173d0d9c0e..598435ae81f5145b9b8bb4eae327058c9e10f080 100644 (file)
@@ -1351,6 +1351,33 @@ is already applied.
 Dialect Improvements and Changes - PostgreSQL
 =============================================
 
+.. _change_4109:
+
+Support for Batch Mode / Fast Execution Helpers
+------------------------------------------------
+
+The psycopg2 ``cursor.executemany()`` method has been identified as performing
+poorly, particularly with INSERT statements.   To alleviate this, psycopg2
+has added `Fast Execution Helpers <http://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_
+which rework statements such as INSERT statements into fewer SQL calls with
+multiple VALUES clauses.   SQLAlchemy 1.2 now includes support for these
+helpers to be used transparently whenever the :class:`.Engine` makes use
+of ``cursor.executemany()`` to invoke a statement against multiple parameter
+sets.   The feature is off by default and can be enabled using the
+``use_batch_mode`` argument on :func:`.create_engine`::
+
+    engine = create_engine(
+        "postgresql+psycopg2://scott:tiger@host/dbname",
+        use_batch_mode=True)
+
+The feature is considered to be experimental for the moment but may become
+on by default in a future release.
+
+.. seealso::
+
+    :ref:`psycopg2_batch_mode`
+
+
 .. _change_3959:
 
 Support for fields specification in INTERVAL, including full reflection
index d6ee557ea42a557a0a39309c8bb940ce75362f5b..fa298bdadfc3ef086115a4f7fe903c01833d8a5a 100644 (file)
@@ -298,6 +298,12 @@ SQL generation and execution system that the ORM builds on top of
 is part of the :doc:`Core <core/tutorial>`.  Using this system directly, we can produce an INSERT that
 is competitive with using the raw database API directly.
 
+.. note::
+
+    When using the psycopg2 dialect, consider making use of the
+    :ref:`batch execution helpers <psycopg2_batch_mode>` feature of psycopg2,
+    now supported directly by the SQLAlchemy psycopg2 dialect.
+
 Alternatively, the SQLAlchemy ORM offers the :ref:`bulk_operations`
 suite of methods, which provide hooks into subsections of the unit of
 work process in order to emit Core-level INSERT and UPDATE constructs with
index 3e2968d91c04cc466bc9f175fe42803bccc09c7c..8ac39c201a9e76670558c64d91a92448dd124f5f 100644 (file)
@@ -53,6 +53,15 @@ psycopg2-specific keyword arguments which are accepted by
 
     :ref:`psycopg2_unicode`
 
+* ``use_batch_mode``: This flag allows ``psycopg2.extras.execute_batch``
+  for ``cursor.executemany()`` calls performed by the :class:`.Engine`.
+  It is currently experimental but
+  may well become True by default as it is critical for executemany performance.
+
+  .. seealso::
+
+    :ref:`psycopg2_batch_mode`
+
 Unix Domain Connections
 ------------------------
 
@@ -101,6 +110,31 @@ The following DBAPI-specific options are respected when used with
 
   .. versionadded:: 1.0.6
 
+.. _psycopg2_batch_mode:
+
+Psycopg2 Batch Mode (Fast Execution)
+------------------------------------
+
+Modern versions of psycopg2 include a feature known as
+`Fast Execution Helpers <http://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_,
+which have been shown in benchmarking to improve psycopg2's executemany()
+performance with INSERTS by multiple orders of magnitude.   SQLAlchemy
+allows this extension to be used for all ``executemany()`` style calls
+invoked by an :class:`.Engine` when used with multiple parameter sets,
+by adding the ``use_batch_mode`` flag to :func:`.create_engine`::
+
+    engine = create_engine(
+        "postgresql+psycopg2://scott:tiger@host/dbname",
+        use_batch_mode=True)
+
+Batch mode is considered to be **experimental** at this time, however may
+be enabled by default in a future release.
+
+
+.. versionadded:: 1.2.0
+
+
+
 .. _psycopg2_unicode:
 
 Unicode with Psycopg2
@@ -510,6 +544,7 @@ class PGDialect_psycopg2(PGDialect):
     def __init__(self, server_side_cursors=False, use_native_unicode=True,
                  client_encoding=None,
                  use_native_hstore=True, use_native_uuid=True,
+                 use_batch_mode=False,
                  **kwargs):
         PGDialect.__init__(self, **kwargs)
         self.server_side_cursors = server_side_cursors
@@ -518,6 +553,7 @@ class PGDialect_psycopg2(PGDialect):
         self.use_native_uuid = use_native_uuid
         self.supports_unicode_binds = use_native_unicode
         self.client_encoding = client_encoding
+        self.psycopg2_batch_mode = use_batch_mode
         if self.dbapi and hasattr(self.dbapi, '__version__'):
             m = re.match(r'(\d+)\.(\d+)(?:\.(\d+))?',
                          self.dbapi.__version__)
@@ -540,7 +576,8 @@ class PGDialect_psycopg2(PGDialect):
         # http://initd.org/psycopg/docs/news.html#what-s-new-in-psycopg-2-0-9
         self.supports_sane_multi_rowcount = \
             self.psycopg2_version >= \
-            self.FEATURE_VERSION_MAP['sane_multi_rowcount']
+            self.FEATURE_VERSION_MAP['sane_multi_rowcount'] and \
+            not self.psycopg2_batch_mode
 
     @classmethod
     def dbapi(cls):
@@ -638,6 +675,13 @@ class PGDialect_psycopg2(PGDialect):
         else:
             return None
 
+    def do_executemany(self, cursor, statement, parameters, context=None):
+        if self.psycopg2_batch_mode:
+            extras = self._psycopg2_extras()
+            extras.execute_batch(cursor, statement, parameters)
+        else:
+            cursor.executemany(statement, parameters)
+
     @util.memoized_instancemethod
     def _hstore_oids(self, conn):
         if self.psycopg2_version >= self.FEATURE_VERSION_MAP['hstore_adapter']:
index e985718c772cdfb593b82afaefa0351a83a254cc..29aa62e3fe49204657f8c9498682ca0e999b2662 100644 (file)
@@ -9,7 +9,7 @@ import datetime
 from sqlalchemy import (
     Table, Column, select, MetaData, text, Integer, String, Sequence, Numeric,
     DateTime, BigInteger, func, extract, SmallInteger, TypeDecorator, literal,
-    cast)
+    cast, bindparam)
 from sqlalchemy import exc, schema
 from sqlalchemy.dialects.postgresql import base as postgresql
 import logging
@@ -84,6 +84,87 @@ class DialectTest(fixtures.TestBase):
         eq_(e.dialect.use_native_unicode, True)
 
 
+class BatchInsertsTest(fixtures.TablesTest):
+    __only_on__ = 'postgresql+psycopg2'
+    __backend__ = True
+
+    run_create_tables = "each"
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table(
+            'data', metadata,
+            Column('id', Integer, primary_key=True),
+            Column('x', String),
+            Column('y', String),
+            Column('z', Integer, server_default="5")
+        )
+
+    def setup(self):
+        super(BatchInsertsTest, self).setup()
+        self.engine = engines.testing_engine(options={"use_batch_mode": True})
+
+    def teardown(self):
+        self.engine.dispose()
+        super(BatchInsertsTest, self).teardown()
+
+    def test_insert(self):
+        with self.engine.connect() as conn:
+            conn.execute(
+                self.tables.data.insert(),
+                [
+                    {"x": "x1", "y": "y1"},
+                    {"x": "x2", "y": "y2"},
+                    {"x": "x3", "y": "y3"}
+                ]
+            )
+
+            eq_(
+                conn.execute(select([self.tables.data])).fetchall(),
+                [
+                    (1, "x1", "y1", 5),
+                    (2, "x2", "y2", 5),
+                    (3, "x3", "y3", 5)
+                ]
+            )
+
+    def test_not_sane_rowcount(self):
+        self.engine.connect().close()
+        assert not self.engine.dialect.supports_sane_multi_rowcount
+
+    def test_update(self):
+        with self.engine.connect() as conn:
+            conn.execute(
+                self.tables.data.insert(),
+                [
+                    {"x": "x1", "y": "y1"},
+                    {"x": "x2", "y": "y2"},
+                    {"x": "x3", "y": "y3"}
+                ]
+            )
+
+            conn.execute(
+                self.tables.data.update().
+                where(self.tables.data.c.x == bindparam('xval')).
+                values(y=bindparam('yval')),
+                [
+                    {"xval": "x1", "yval": "y5"},
+                    {"xval": "x3", "yval": "y6"}
+                ]
+            )
+            eq_(
+                conn.execute(
+                    select([self.tables.data]).
+                    order_by(self.tables.data.c.id)).
+                fetchall(),
+                [
+                    (1, "x1", "y5", 5),
+                    (2, "x2", "y2", 5),
+                    (3, "x3", "y6", 5)
+                ]
+            )
+
+
 class MiscBackendTest(
         fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):