--- /dev/null
+.. change::
+ :tags: usecase, postgresql
+ :tickets: 4623
+
+ Added new dialect flag for the psycopg2 dialect, ``executemany_mode`` which
+ supersedes the previous experimental ``use_batch_mode`` flag.
+ ``executemany_mode`` supports both the "execute batch" and "execute values"
+ functions provided by psycopg2, the latter which is used for compiled
+ :func:`.insert` constructs. Pull request courtesy Yuval Dinari.
+
+ .. seealso::
+
+ :ref:`psycopg2_executemany_mode`
+
+
+
:ref:`psycopg2_unicode`
-* ``use_batch_mode``: This flag allows ``psycopg2.extras.execute_batch``
- for ``cursor.executemany()`` calls performed by the :class:`.Engine`.
- It is currently experimental but
- may well become True by default as it is critical for executemany
- performance.
+* ``executemany_mode``, ``executemany_batch_page_size``,
+ ``executemany_values_page_size``: Allows use of psycopg2
+ extensions for optimizing "executemany"-stye queries. See the referenced
+ section below for details.
.. seealso::
- :ref:`psycopg2_batch_mode`
+ :ref:`psycopg2_executemany_mode`
+
+* ``use_batch_mode``: this is the previous setting used to affect "executemany"
+ mode and is now deprecated.
+
Unix Domain Connections
------------------------
.. versionadded:: 1.0.6
-.. _psycopg2_batch_mode:
+.. _psycopg2_executemany_mode:
-Psycopg2 Batch Mode (Fast Execution)
-------------------------------------
+Psycopg2 Fast Execution Helpers
+-------------------------------
Modern versions of psycopg2 include a feature known as
`Fast Execution Helpers \
-<http://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_,
-which have been shown in benchmarking to improve psycopg2's executemany()
-performance with INSERTS by multiple orders of magnitude. SQLAlchemy
-allows this extension to be used for all ``executemany()`` style calls
-invoked by an :class:`.Engine` when used with :ref:`multiple parameter sets <execute_multiple>`,
-by adding the ``use_batch_mode`` flag to :func:`.create_engine`::
+<http://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_, which
+have been shown in benchmarking to improve psycopg2's executemany()
+performance, primarily with INSERT statements, by multiple orders of magnitude.
+SQLAlchemy allows this extension to be used for all ``executemany()`` style
+calls invoked by an :class:`.Engine` when used with :ref:`multiple parameter
+sets <execute_multiple>`, which includes the use of this feature both by the
+Core as well as by the ORM for inserts of objects with non-autogenerated
+primary key values, by adding the ``executemany_mode`` flag to
+:func:`.create_engine`::
engine = create_engine(
"postgresql+psycopg2://scott:tiger@host/dbname",
- use_batch_mode=True)
+ executemany_mode='batch')
+
+
+.. versionchanged:: 1.3.7 - the ``use_batch_mode`` flag has been superseded
+ by a new parameter ``executemany_mode`` which provides support both for
+ psycopg2's ``execute_batch`` helper as well as the ``execute_values``
+ helper.
+
+Possible options for ``executemany_mode`` include:
+
+* ``None`` - By default, psycopg2's extensions are not used, and the usual
+ ``cursor.executemany()`` method is used when invoking batches of statements.
+
+* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` so that multiple copies
+ of a SQL query, each one corresponding to a parameter set passed to
+ ``executemany()``, are joined into a single SQL string separated by a
+ semicolon. This is the same behavior as was provided by the
+ ``use_batch_mode=True`` flag.
+
+* ``'values'``- For Core :func:`.insert` constructs only (including those
+ emitted by the ORM automatically), the ``psycopg2.extras.execute_values``
+ extension is used so that multiple parameter sets are grouped into a single
+ INSERT statement and joined together with multiple VALUES expressions. This
+ method requires that the string text of the VALUES clause inside the
+ INSERT statement is manipulated, so is only supported with a compiled
+ :func:`.insert` construct where the format is predictable. For all other
+ constructs, including plain textual INSERT statements not rendered by the
+ SQLAlchemy expression language compiler, the
+ ``psycopg2.extras.execute_batch`` method is used. It is therefore important
+ to note that **"values" mode implies that "batch" mode is also used for
+ all statements for which "values" mode does not apply**.
+
+For both strategies, the ``executemany_batch_page_size`` and
+``executemany_values_page_size`` arguments control how many parameter sets
+should be represented in each execution. Because "values" mode implies a
+fallback down to "batch" mode for non-INSERT statements, there are two
+independent page size arguments. For each, the default value of ``None`` means
+to use psycopg2's defaults, which at the time of this writing are quite low at
+100. For the ``execute_values`` method, a number as high as 10000 may prove
+to be performant, whereas for ``execute_batch``, as the number represents
+full statements repeated, a number closer to the default of 100 is likely
+more appropriate::
-Batch mode is considered to be **experimental** at this time, however may
-be enabled by default in a future release.
+ engine = create_engine(
+ "postgresql+psycopg2://scott:tiger@host/dbname",
+ executemany_mode='values',
+ executemany_values_page_size=10000, executemany_batch_page_size=500)
-.. seealso::
- :ref:`execute_multiple` - demonstrates how to use DBAPI ``executemany()``
- with the :class:`.Connection` object.
+.. seealso::
-.. versionadded:: 1.2.0
+ :ref:`execute_multiple` - General information on using the
+ :class:`.Connection` object to execute statements in such a way as to make
+ use of the DBAPI ``.executemany()`` method.
+.. versionchanged:: 1.3.7 - Added support for
+ ``psycopg2.extras.execute_values``. The ``use_batch_mode`` flag is
+ superseded by the ``executemany_mode`` flag.
.. _psycopg2_unicode:
pass
+EXECUTEMANY_DEFAULT = util.symbol("executemany_default")
+EXECUTEMANY_BATCH = util.symbol("executemany_batch")
+EXECUTEMANY_VALUES = util.symbol("executemany_values")
+
+
class PGDialect_psycopg2(PGDialect):
driver = "psycopg2"
if util.py2k:
},
)
+ @util.deprecated_params(
+ use_batch_mode=(
+ "1.3.7",
+ "The psycopg2 use_batch_mode flag is superseded by "
+ "executemany_mode='batch'",
+ )
+ )
def __init__(
self,
server_side_cursors=False,
client_encoding=None,
use_native_hstore=True,
use_native_uuid=True,
- use_batch_mode=False,
+ executemany_mode=None,
+ executemany_batch_page_size=None,
+ executemany_values_page_size=None,
+ use_batch_mode=None,
**kwargs
):
PGDialect.__init__(self, **kwargs)
self.use_native_uuid = use_native_uuid
self.supports_unicode_binds = use_native_unicode
self.client_encoding = client_encoding
- self.psycopg2_batch_mode = use_batch_mode
+
+ # Parse executemany_mode argument, allowing it to be only one of the
+ # symbol names
+ self.executemany_mode = util.symbol.parse_user_argument(
+ executemany_mode,
+ {
+ EXECUTEMANY_DEFAULT: [None],
+ EXECUTEMANY_BATCH: ["batch"],
+ EXECUTEMANY_VALUES: ["values"],
+ },
+ "executemany_mode",
+ )
+ if use_batch_mode:
+ self.executemany_mode = EXECUTEMANY_BATCH
+
+ self.executemany_batch_page_size = executemany_batch_page_size
+ self.executemany_values_page_size = executemany_values_page_size
+
if self.dbapi and hasattr(self.dbapi, "__version__"):
m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__)
if m:
self.supports_sane_multi_rowcount = (
self.psycopg2_version
>= self.FEATURE_VERSION_MAP["sane_multi_rowcount"]
- and not self.psycopg2_batch_mode
+ and self.executemany_mode is EXECUTEMANY_DEFAULT
)
@classmethod
else:
return None
+ _insert_values_match = re.compile(r".* VALUES (\(.+\))").match
+
def do_executemany(self, cursor, statement, parameters, context=None):
- if self.psycopg2_batch_mode:
- extras = self._psycopg2_extras()
- extras.execute_batch(cursor, statement, parameters)
- else:
+ if self.executemany_mode is EXECUTEMANY_DEFAULT:
cursor.executemany(statement, parameters)
+ return
+
+ if (
+ self.executemany_mode is EXECUTEMANY_VALUES
+ and context
+ and context.isinsert
+ ):
+ executemany_values = self._insert_values_match(statement)
+ else:
+ executemany_values = None
+
+ if executemany_values:
+ # Currently, SQLAlchemy does not pass "RETURNING" statements
+ # into executemany(), since no DBAPI has ever supported that
+ # until the introduction of psycopg2's executemany_values, so
+ # we are not yet using the fetch=True flag.
+ statement = statement.replace(executemany_values.group(1), "%s")
+ if self.executemany_values_page_size:
+ kwargs = {"page_size": self.executemany_values_page_size}
+ else:
+ kwargs = {}
+ self._psycopg2_extras().execute_values(
+ cursor,
+ statement,
+ parameters,
+ template=executemany_values.group(1),
+ **kwargs
+ )
+
+ else:
+ if self.executemany_batch_page_size:
+ kwargs = {"page_size": self.executemany_batch_page_size}
+ else:
+ kwargs = {}
+ self._psycopg2_extras().execute_batch(
+ cursor, statement, parameters, **kwargs
+ )
@util.memoized_instancemethod
def _hstore_oids(self, conn):
# coding: utf-8
-
+import contextlib
import datetime
import logging
import logging.handlers
from sqlalchemy import TypeDecorator
from sqlalchemy.dialects.postgresql import base as postgresql
from sqlalchemy.dialects.postgresql import psycopg2 as psycopg2_dialect
+from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_BATCH
+from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_DEFAULT
+from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_VALUES
from sqlalchemy.engine import engine_from_config
from sqlalchemy.engine import url
from sqlalchemy.testing import engines
from sqlalchemy.testing import expect_deprecated
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
+from sqlalchemy.testing import mock
from sqlalchemy.testing.assertions import assert_raises
from sqlalchemy.testing.assertions import assert_raises_message
from sqlalchemy.testing.assertions import AssertsCompiledSQL
from sqlalchemy.testing.assertions import eq_
from sqlalchemy.testing.assertions import eq_regex
from sqlalchemy.testing.assertions import ne_
-from sqlalchemy.testing.mock import Mock
from ...engine import test_execute
def test_version_parsing(self):
def mock_conn(res):
- return Mock(
- execute=Mock(return_value=Mock(scalar=Mock(return_value=res)))
+ return mock.Mock(
+ execute=mock.Mock(
+ return_value=mock.Mock(scalar=mock.Mock(return_value=res))
+ )
)
dialect = postgresql.dialect()
eq_(cparams, {"host": "somehost", "any_random_thing": "yes"})
-class BatchInsertsTest(fixtures.TablesTest):
+class ExecuteManyMode(object):
__only_on__ = "postgresql+psycopg2"
__backend__ = True
run_create_tables = "each"
+ options = None
+
@classmethod
def define_tables(cls, metadata):
Table(
Column("z", Integer, server_default="5"),
)
+ @contextlib.contextmanager
+ def expect_deprecated_opts(self):
+ yield
+
def setup(self):
- super(BatchInsertsTest, self).setup()
- self.engine = engines.testing_engine(options={"use_batch_mode": True})
+ super(ExecuteManyMode, self).setup()
+ with self.expect_deprecated_opts():
+ self.engine = engines.testing_engine(options=self.options)
def teardown(self):
self.engine.dispose()
- super(BatchInsertsTest, self).teardown()
+ super(ExecuteManyMode, self).teardown()
def test_insert(self):
with self.engine.connect() as conn:
[(1, "x1", "y1", 5), (2, "x2", "y2", 5), (3, "x3", "y3", 5)],
)
+ def test_insert_no_page_size(self):
+ from psycopg2 import extras
+
+ eng = self.engine
+ if eng.dialect.executemany_mode is EXECUTEMANY_BATCH:
+ meth = extras.execute_batch
+ stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)"
+ expected_kwargs = {}
+ else:
+ meth = extras.execute_values
+ stmt = "INSERT INTO data (x, y) VALUES %s"
+ expected_kwargs = {"template": "(%(x)s, %(y)s)"}
+
+ with mock.patch.object(
+ extras, meth.__name__, side_effect=meth
+ ) as mock_exec:
+ with eng.connect() as conn:
+ conn.execute(
+ self.tables.data.insert(),
+ [
+ {"x": "x1", "y": "y1"},
+ {"x": "x2", "y": "y2"},
+ {"x": "x3", "y": "y3"},
+ ],
+ )
+
+ eq_(
+ mock_exec.mock_calls,
+ [
+ mock.call(
+ mock.ANY,
+ stmt,
+ (
+ {"x": "x1", "y": "y1"},
+ {"x": "x2", "y": "y2"},
+ {"x": "x3", "y": "y3"},
+ ),
+ **expected_kwargs
+ )
+ ],
+ )
+
+ def test_insert_page_size(self):
+ opts = self.options.copy()
+ opts["executemany_batch_page_size"] = 500
+ opts["executemany_values_page_size"] = 1000
+
+ with self.expect_deprecated_opts():
+ eng = engines.testing_engine(options=opts)
+
+ from psycopg2 import extras
+
+ if eng.dialect.executemany_mode is EXECUTEMANY_BATCH:
+ meth = extras.execute_batch
+ stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)"
+ expected_kwargs = {"page_size": 500}
+ else:
+ meth = extras.execute_values
+ stmt = "INSERT INTO data (x, y) VALUES %s"
+ expected_kwargs = {"page_size": 1000, "template": "(%(x)s, %(y)s)"}
+
+ with mock.patch.object(
+ extras, meth.__name__, side_effect=meth
+ ) as mock_exec:
+ with eng.connect() as conn:
+ conn.execute(
+ self.tables.data.insert(),
+ [
+ {"x": "x1", "y": "y1"},
+ {"x": "x2", "y": "y2"},
+ {"x": "x3", "y": "y3"},
+ ],
+ )
+
+ eq_(
+ mock_exec.mock_calls,
+ [
+ mock.call(
+ mock.ANY,
+ stmt,
+ (
+ {"x": "x1", "y": "y1"},
+ {"x": "x2", "y": "y2"},
+ {"x": "x3", "y": "y3"},
+ ),
+ **expected_kwargs
+ )
+ ],
+ )
+
+ def test_update_fallback(self):
+ from psycopg2 import extras
+
+ eng = self.engine
+ meth = extras.execute_batch
+ stmt = "UPDATE data SET y=%(yval)s WHERE data.x = %(xval)s"
+ expected_kwargs = {}
+
+ with mock.patch.object(
+ extras, meth.__name__, side_effect=meth
+ ) as mock_exec:
+ with eng.connect() as conn:
+ conn.execute(
+ self.tables.data.update()
+ .where(self.tables.data.c.x == bindparam("xval"))
+ .values(y=bindparam("yval")),
+ [
+ {"xval": "x1", "yval": "y5"},
+ {"xval": "x3", "yval": "y6"},
+ ],
+ )
+
+ eq_(
+ mock_exec.mock_calls,
+ [
+ mock.call(
+ mock.ANY,
+ stmt,
+ (
+ {"xval": "x1", "yval": "y5"},
+ {"xval": "x3", "yval": "y6"},
+ ),
+ **expected_kwargs
+ )
+ ],
+ )
+
def test_not_sane_rowcount(self):
self.engine.connect().close()
assert not self.engine.dialect.supports_sane_multi_rowcount
)
+class UseBatchModeTest(ExecuteManyMode, fixtures.TablesTest):
+ options = {"use_batch_mode": True}
+
+ def expect_deprecated_opts(self):
+ return expect_deprecated(
+ "The psycopg2 use_batch_mode flag is superseded by "
+ "executemany_mode='batch'"
+ )
+
+
+class ExecutemanyBatchModeTest(ExecuteManyMode, fixtures.TablesTest):
+ options = {"executemany_mode": "batch"}
+
+
+class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest):
+ options = {"executemany_mode": "values"}
+
+
+class ExecutemanyFlagOptionsTest(fixtures.TablesTest):
+ __only_on__ = "postgresql+psycopg2"
+ __backend__ = True
+
+ def test_executemany_correct_flag_options(self):
+ for opt, expected in [
+ (None, EXECUTEMANY_DEFAULT),
+ ("batch", EXECUTEMANY_BATCH),
+ ("values", EXECUTEMANY_VALUES),
+ ]:
+ self.engine = engines.testing_engine(
+ options={"executemany_mode": opt}
+ )
+ is_(self.engine.dialect.executemany_mode, expected)
+
+ def test_executemany_wrong_flag_options(self):
+ for opt in [1, True, "batch_insert"]:
+ assert_raises_message(
+ exc.ArgumentError,
+ "Invalid value for 'executemany_mode': %r" % opt,
+ engines.testing_engine,
+ options={"executemany_mode": opt},
+ )
+
+
class MiscBackendTest(
fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL
):
if version:
dialect = postgresql.dialect()
- dialect._get_server_version_info = Mock(return_value=version)
+ dialect._get_server_version_info = mock.Mock(
+ return_value=version
+ )
dialect.initialize(testing.db.connect())
else:
dialect = testing.db.dialect