Dialect Changes
===============
+psycopg2 version 2.7 or higher is required for the PostgreSQL psycopg2 dialect
+------------------------------------------------------------------------------
+
+The psycopg2 dialect relies upon many features of psycopg2 released
+in the past few years. To simplify the dialect, version 2.7, released
+in March, 2017 is now the minimum version required.
+
+.. _change_5401:
+
+psycopg2 dialect features "execute_values" with RETURNING for INSERT statements by default
+------------------------------------------------------------------------------------------
+
+The first half of a significant performance enhancement for PostgreSQL when
+using both Core and ORM, the psycopg2 dialect now uses
+``psycopg2.extras.execute_values()`` by default for compiled INSERT statements
+and also implements RETURNING support in this mode.
+
+This extension method allows many rows to be INSERTed within a single
+statement, using an extended VALUES clause for the statement. While
+SQLAlchemy's :func:`_sql.insert` construct already supports this syntax via
+the :meth:`_sql.Insert.values` method, the extension method allows the
+construction of the VALUES clause to occur dynamically when the statement
+is executed as an "executemany" execution, which is what occurs when one
+passes a list of parameter dictionaries to :meth:`_engine.Connection.execute`.
+It also occurs beyond the cache boundary so that the INSERT statement may
+be cached before the VALUES are rendered.
+
+A quick test of the ``execute_values()`` approach using the
+``bulk_inserts.py`` script in the :ref:`examples_performance` example
+suite reveals an approximate **fivefold performance increase**::
+
+ $ python -m examples.performance bulk_inserts --test test_core_insert --num 100000 --dburl postgresql://scott:tiger@localhost/test
+
+ # 1.3
+ test_core_insert : A single Core INSERT construct inserting mappings in bulk. (100000 iterations); total time 5.229326 sec
+
+ # 1.4
+ test_core_insert : A single Core INSERT construct inserting mappings in bulk. (100000 iterations); total time 0.944007 sec
+
+Support for the "batch" extension was added in version 1.2 in
+:ref:`change_4109`, and enhanced to include support for the ``execute_values``
+extension in 1.3 in :ticket:`4623`. In 1.4 the ``execute_values`` extension is
+now being turned on by default for INSERT statements; the "batch" extension
+for UPDATE and DELETE remains off by default.
+
+In addition, the ``execute_values`` extension function supports returning the
+rows that are generated by RETURNING as an aggregated list. The psycopg2
+dialect will now retrieve this list if the given :func:`_sql.insert` construct
+requests returning via the :meth:`.Insert.returning` method or similar methods
+intended to return generated defaults; the rows are then installed in the
+result so that they are retreieved as though they came from the cursor
+directly. This allows tools like the ORM to use batched inserts in all cases,
+which is expected to provide a dramatic performance improvement.
+
+
+The ``executemany_mode`` feature of the psycopg2 dialect has been revised
+with the following changes:
+
+* A new mode ``"values_only"`` is added. This mode uses the very performant
+ ``psycopg2.extras.execute_values()`` extension method for compiled INSERT
+ statements run with executemany(), but does not use ``execute_batch()`` for
+ UPDATE and DELETE statements. This new mode is now the default setting for
+ the psycopg2 dialect.
+
+* The existing ``"values"`` mode is now named ``"values_plus_batch"``. This mode
+ will use ``execute_values`` for INSERT statements and ``execute_batch``
+ for UPDATE and DELETE statements. The mode is not enabled by default
+ because it disables the proper functioning of ``cursor.rowcount`` with
+ UPDATE and DELETE statements executed with ``executemany()``.
+
+* RETURNING support is enabled for ``"values_only"`` and ``"values"`` for
+ INSERT statements. The psycopg2 dialect will receive the rows back
+ from psycopg2 using the fetch=True flag and install them into the result
+ set as though they came directly from the cursor (which they ulimately did,
+ however psycopg2's extension function has aggregated multiple batches into
+ one list).
+
+* The default "page_size" setting for ``execute_values`` has been increased
+ from 100 to 1000. The default remains at 100 for the ``execute_batch``
+ function. These parameters may both be modified as was the case before.
+
+* The ``use_batch_mode`` flag that was part of the 1.2 version of the feature
+ is removed; the behavior remains controllable via the ``executemany_mode``
+ flag added in 1.3.
+
+* The Core engine and dialect has been enhanced to support executemany
+ plus returning mode, currently only available with psycopg2, by providing
+ new :attr:`_engine.CursorResult.inserted_primary_key_rows` and
+ :attr:`_engine.CursorResult.returned_default_rows` accessors.
+
+.. seealso::
+
+ :ref:`psycopg2_executemany_mode`
+
+
+:ticket:`5401`
+
+
.. _change_4895:
Removed "join rewriting" logic from SQLite dialect; updated imports
--- /dev/null
+.. change::
+ :tags: peformance, postgresql
+ :tickets: 5401
+
+ The psycopg2 dialect now defaults to using the very performant
+ ``execute_values()`` psycopg2 extension for compiled INSERT statements,
+ and also impements RETURNING support when this extension is used. This
+ allows INSERT statements that even include an autoincremented SERIAL
+ or IDENTITY value to run very fast while still being able to return the
+ newly generated primary key values. The ORM will then integrate this
+ new feature in a separate change.
+
+ .. seealso::
+
+ :ref:`change_5401` - full list of changes regarding the
+ ``executemany_mode`` parameter.
+
--- /dev/null
+.. change::
+ :tags: change, postgresql
+
+ When using the psycopg2 dialect for PostgreSQL, psycopg2 minimum version is
+ set at 2.7. The psycopg2 dialect relies upon many features of psycopg2
+ released in the past few years, so to simplify the dialect, version 2.7,
+ released in March, 2017 is now the minimum version required.
+
"_schema": "sqlalchemy.schema",
"_types": "sqlalchemy.types",
"_expression": "sqlalchemy.sql.expression",
+ "_sql": "sqlalchemy.sql.expression",
"_functions": "sqlalchemy.sql.functions",
"_pool": "sqlalchemy.pool",
"_event": "sqlalchemy.event",
.. sourcecode:: pycon+sql
>>> result.inserted_primary_key
- [1]
+ (1,)
The value of ``1`` was automatically generated by SQLite, but only because we
did not specify the ``id`` column in our
:ref:`psycopg2_executemany_mode`
-* ``use_batch_mode``: this is the previous setting used to affect "executemany"
- mode and is now deprecated.
-
Unix Domain Connections
------------------------
<http://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_, which
have been shown in benchmarking to improve psycopg2's executemany()
performance, primarily with INSERT statements, by multiple orders of magnitude.
-SQLAlchemy allows this extension to be used for all ``executemany()`` style
-calls invoked by an :class:`_engine.Engine`
-when used with :ref:`multiple parameter
-sets <execute_multiple>`, which includes the use of this feature both by the
-Core as well as by the ORM for inserts of objects with non-autogenerated
-primary key values, by adding the ``executemany_mode`` flag to
-:func:`_sa.create_engine`::
+SQLAlchemy internally makes use of these extensions for ``executemany()`` style
+calls, which correspond to lists of parameters being passed to
+:meth:`_engine.Connection.execute` as detailed in :ref:`multiple parameter
+sets <execute_multiple>`. The ORM also uses this mode internally whenever
+possible.
+
+The two available extensions on the psycopg2 side are the ``execute_values()``
+and ``execute_batch()`` functions. The psycopg2 dialect defaults to using the
+``execute_values()`` extension for all qualifying INSERT statements.
+
+.. versionchanged:: 1.4 The psycopg2 dialect now defaults to a new mode
+ ``"values_only"`` for ``executemany_mode``, which allows an order of
+ magnitude performance improvement for INSERT statements, but does not
+ include "batch" mode for UPDATE and DELETE statements which removes the
+ ability of ``cursor.rowcount`` to function correctly.
+
+The use of these extensions is controlled by the ``executemany_mode`` flag
+which may be passed to :func:`_sa.create_engine`::
engine = create_engine(
"postgresql+psycopg2://scott:tiger@host/dbname",
- executemany_mode='batch')
-
+ executemany_mode='values_plus_batch')
-.. versionchanged:: 1.3.7 - the ``use_batch_mode`` flag has been superseded
- by a new parameter ``executemany_mode`` which provides support both for
- psycopg2's ``execute_batch`` helper as well as the ``execute_values``
- helper.
Possible options for ``executemany_mode`` include:
-* ``None`` - By default, psycopg2's extensions are not used, and the usual
- ``cursor.executemany()`` method is used when invoking batches of statements.
+* ``values_only`` - this is the default value. the psycopg2 execute_values()
+ extension is used for qualifying INSERT statements, which rewrites the INSERT
+ to include multiple VALUES clauses so that many parameter sets can be
+ inserted with one statement.
-* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` so that multiple copies
+ .. versionadded:: 1.4 Added ``"values_only"`` setting for ``executemany_mode``
+ which is also now the default.
+
+* ``None`` - No psycopg2 extensions are not used, and the usual
+ ``cursor.executemany()`` method is used when invoking statements with
+ multiple parameter sets.
+
+* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` for all qualifying
+ INSERT, UPDATE and DELETE statements, so that multiple copies
of a SQL query, each one corresponding to a parameter set passed to
``executemany()``, are joined into a single SQL string separated by a
- semicolon. This is the same behavior as was provided by the
- ``use_batch_mode=True`` flag.
-
-* ``'values'``- For Core :func:`_expression.insert`
- constructs only (including those
- emitted by the ORM automatically), the ``psycopg2.extras.execute_values``
- extension is used so that multiple parameter sets are grouped into a single
- INSERT statement and joined together with multiple VALUES expressions. This
- method requires that the string text of the VALUES clause inside the
- INSERT statement is manipulated, so is only supported with a compiled
- :func:`_expression.insert` construct where the format is predictable.
- For all other
- constructs, including plain textual INSERT statements not rendered by the
- SQLAlchemy expression language compiler, the
- ``psycopg2.extras.execute_batch`` method is used. It is therefore important
- to note that **"values" mode implies that "batch" mode is also used for
- all statements for which "values" mode does not apply**.
-
-For both strategies, the ``executemany_batch_page_size`` and
-``executemany_values_page_size`` arguments control how many parameter sets
-should be represented in each execution. Because "values" mode implies a
-fallback down to "batch" mode for non-INSERT statements, there are two
-independent page size arguments. For each, the default value of ``None`` means
-to use psycopg2's defaults, which at the time of this writing are quite low at
-100. For the ``execute_values`` method, a number as high as 10000 may prove
-to be performant, whereas for ``execute_batch``, as the number represents
-full statements repeated, a number closer to the default of 100 is likely
-more appropriate::
+ semicolon. When using this mode, the :attr:`_engine.CursorResult.rowcount`
+ attribute will not contain a value for executemany-style executions.
+
+* ``'values_plus_batch'``- ``execute_values`` is used for qualifying INSERT
+ statements, ``execute_batch`` is used for UPDATE and DELETE.
+ When using this mode, the :attr:`_engine.CursorResult.rowcount`
+ attribute will not contain a value for executemany-style executions against
+ UPDATE and DELETE statements.
+
+By "qualifying statements", we mean that the statement being executed
+must be a Core :func:`_expression.insert`, :func:`_expression.update`
+or :func:`_expression.delete` construct, and not a plain textual SQL
+string or one constructed using :func:`_expression.text`. When using the
+ORM, all insert/update/delete statements used by the ORM flush process
+are qualifying.
+
+The "page size" for the "values" and "batch" strategies can be affected
+by using the ``executemany_batch_page_size`` and
+``executemany_values_page_size`` engine parameters. These
+control how many parameter sets
+should be represented in each execution. The "values" page size defaults
+to 1000, which is different that psycopg2's default. The "batch" page
+size defaults to 100. These can be affected by passing new values to
+:func:`_engine.create_engine`::
engine = create_engine(
"postgresql+psycopg2://scott:tiger@host/dbname",
executemany_mode='values',
executemany_values_page_size=10000, executemany_batch_page_size=500)
+.. versionchanged:: 1.4
+
+ The default for ``executemany_values_page_size`` is now 1000, up from
+ 100.
.. seealso::
object to execute statements in such a way as to make
use of the DBAPI ``.executemany()`` method.
-.. versionchanged:: 1.3.7 - Added support for
- ``psycopg2.extras.execute_values``. The ``use_batch_mode`` flag is
- superseded by the ``executemany_mode`` flag.
-
.. _psycopg2_unicode:
from ... import processors
from ... import types as sqltypes
from ... import util
+from ...engine import cursor as _cursor
from ...util import collections_abc
try:
class _PGJSON(JSON):
def result_processor(self, dialect, coltype):
- if dialect._has_native_json:
- return None
- else:
- return super(_PGJSON, self).result_processor(dialect, coltype)
+ return None
class _PGJSONB(JSONB):
def result_processor(self, dialect, coltype):
- if dialect._has_native_jsonb:
- return None
- else:
- return super(_PGJSONB, self).result_processor(dialect, coltype)
+ return None
class _PGUUID(UUID):
class PGExecutionContext_psycopg2(PGExecutionContext):
+ _psycopg2_fetched_rows = None
+
def create_server_side_cursor(self):
# use server-side cursors:
# http://lists.initd.org/pipermail/psycopg/2007-January/005251.html
return self._dbapi_connection.cursor(ident)
def post_exec(self):
+ if (
+ self._psycopg2_fetched_rows
+ and self.compiled
+ and self.compiled.returning
+ ):
+ # psycopg2 execute_values will provide for a real cursor where
+ # cursor.description works correctly. however, it executes the
+ # INSERT statement multiple times for multiple pages of rows, so
+ # while this cursor also supports calling .fetchall() directly, in
+ # order to get the list of all rows inserted across multiple pages,
+ # we have to retrieve the aggregated list from the execute_values()
+ # function directly.
+ strat_cls = _cursor.FullyBufferedCursorFetchStrategy
+ self.cursor_fetch_strategy = strat_cls(
+ self.cursor, initial_buffer=self._psycopg2_fetched_rows
+ )
self._log_notices(self.cursor)
def _log_notices(self, cursor):
pass
-EXECUTEMANY_DEFAULT = util.symbol("executemany_default")
-EXECUTEMANY_BATCH = util.symbol("executemany_batch")
-EXECUTEMANY_VALUES = util.symbol("executemany_values")
+EXECUTEMANY_DEFAULT = util.symbol("executemany_default", canonical=0)
+EXECUTEMANY_BATCH = util.symbol("executemany_batch", canonical=1)
+EXECUTEMANY_VALUES = util.symbol("executemany_values", canonical=2)
+EXECUTEMANY_VALUES_PLUS_BATCH = util.symbol(
+ "executemany_values_plus_batch",
+ canonical=EXECUTEMANY_BATCH | EXECUTEMANY_VALUES,
+)
class PGDialect_psycopg2(PGDialect):
preparer = PGIdentifierPreparer_psycopg2
psycopg2_version = (0, 0)
- FEATURE_VERSION_MAP = dict(
- native_json=(2, 5),
- native_jsonb=(2, 5, 4),
- sane_multi_rowcount=(2, 0, 9),
- array_oid=(2, 4, 3),
- hstore_adapter=(2, 4),
- )
-
- _has_native_hstore = False
- _has_native_json = False
- _has_native_jsonb = False
+ _has_native_hstore = True
engine_config_types = PGDialect.engine_config_types.union(
{"use_native_unicode": util.asbool}
},
)
- @util.deprecated_params(
- use_batch_mode=(
- "1.3.7",
- "The psycopg2 use_batch_mode flag is superseded by "
- "executemany_mode='batch'",
- )
- )
def __init__(
self,
server_side_cursors=False,
client_encoding=None,
use_native_hstore=True,
use_native_uuid=True,
- executemany_mode=None,
- executemany_batch_page_size=None,
- executemany_values_page_size=None,
- use_batch_mode=None,
+ executemany_mode="values_only",
+ executemany_batch_page_size=100,
+ executemany_values_page_size=1000,
**kwargs
):
PGDialect.__init__(self, **kwargs)
self.server_side_cursors = server_side_cursors
self.use_native_unicode = use_native_unicode
+ if not use_native_hstore:
+ self._has_native_hstore = False
self.use_native_hstore = use_native_hstore
self.use_native_uuid = use_native_uuid
self.supports_unicode_binds = use_native_unicode
{
EXECUTEMANY_DEFAULT: [None],
EXECUTEMANY_BATCH: ["batch"],
- EXECUTEMANY_VALUES: ["values"],
+ EXECUTEMANY_VALUES: ["values_only"],
+ EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch", "values"],
},
"executemany_mode",
)
- if use_batch_mode:
- self.executemany_mode = EXECUTEMANY_BATCH
+
+ if self.executemany_mode & EXECUTEMANY_VALUES:
+ self.insert_executemany_returning = True
self.executemany_batch_page_size = executemany_batch_page_size
self.executemany_values_page_size = executemany_values_page_size
int(x) for x in m.group(1, 2, 3) if x is not None
)
+ if self.psycopg2_version < (2, 7):
+ raise ImportError(
+ "psycopg2 version 2.7 or higher is required."
+ )
+
def initialize(self, connection):
super(PGDialect_psycopg2, self).initialize(connection)
self._has_native_hstore = (
self.use_native_hstore
and self._hstore_oids(connection.connection) is not None
)
- self._has_native_json = (
- self.psycopg2_version >= self.FEATURE_VERSION_MAP["native_json"]
- )
- self._has_native_jsonb = (
- self.psycopg2_version >= self.FEATURE_VERSION_MAP["native_jsonb"]
- )
# http://initd.org/psycopg/docs/news.html#what-s-new-in-psycopg-2-0-9
- self.supports_sane_multi_rowcount = (
- self.psycopg2_version
- >= self.FEATURE_VERSION_MAP["sane_multi_rowcount"]
- and self.executemany_mode is EXECUTEMANY_DEFAULT
+ self.supports_sane_multi_rowcount = not (
+ self.executemany_mode & EXECUTEMANY_BATCH
)
@classmethod
kw = {"oid": oid}
if util.py2k:
kw["unicode"] = True
- if (
- self.psycopg2_version
- >= self.FEATURE_VERSION_MAP["array_oid"]
- ):
- kw["array_oid"] = array_oid
+ kw["array_oid"] = array_oid
extras.register_hstore(conn, **kw)
fns.append(on_connect)
if self.dbapi and self._json_deserializer:
def on_connect(conn):
- if self._has_native_json:
- extras.register_default_json(
- conn, loads=self._json_deserializer
- )
- if self._has_native_jsonb:
- extras.register_default_jsonb(
- conn, loads=self._json_deserializer
- )
+ extras.register_default_json(
+ conn, loads=self._json_deserializer
+ )
+ extras.register_default_jsonb(
+ conn, loads=self._json_deserializer
+ )
fns.append(on_connect)
return None
def do_executemany(self, cursor, statement, parameters, context=None):
- if self.executemany_mode is EXECUTEMANY_DEFAULT:
- cursor.executemany(statement, parameters)
- return
-
if (
- self.executemany_mode is EXECUTEMANY_VALUES
+ self.executemany_mode & EXECUTEMANY_VALUES
and context
and context.isinsert
and context.compiled.insert_single_values_expr
kwargs = {"page_size": self.executemany_values_page_size}
else:
kwargs = {}
- self._psycopg2_extras().execute_values(
+ xtras = self._psycopg2_extras()
+ context._psycopg2_fetched_rows = xtras.execute_values(
cursor,
statement,
parameters,
template=executemany_values,
+ fetch=bool(context.compiled.returning),
**kwargs
)
- else:
+ elif self.executemany_mode & EXECUTEMANY_BATCH:
if self.executemany_batch_page_size:
kwargs = {"page_size": self.executemany_batch_page_size}
else:
self._psycopg2_extras().execute_batch(
cursor, statement, parameters, **kwargs
)
+ else:
+ cursor.executemany(statement, parameters)
@util.memoized_instancemethod
def _hstore_oids(self, conn):
- if self.psycopg2_version >= self.FEATURE_VERSION_MAP["hstore_adapter"]:
- extras = self._psycopg2_extras()
- oids = extras.HstoreAdapter.get_oids(conn)
- if oids is not None and oids[0]:
- return oids[0:2]
- return None
+ extras = self._psycopg2_extras()
+ if hasattr(conn, "connection"):
+ conn = conn.connection
+ oids = extras.HstoreAdapter.get_oids(conn)
+ if oids is not None and oids[0]:
+ return oids[0:2]
+ else:
+ return None
def create_connect_args(self, url):
opts = url.translate_connect_args(username="user")
# ensure we don't retain a link to the view object for keys()
# which links to the values, which we don't want to cache
keys = sorted(distilled_params[0])
- inline = len(distilled_params) > 1
+ for_executemany = len(distilled_params) > 1
else:
keys = []
- inline = False
+ for_executemany = False
dialect = self.dialect
dialect=dialect,
compiled_cache=compiled_cache,
column_keys=keys,
- inline=inline,
+ for_executemany=for_executemany,
schema_translate_map=schema_translate_map,
linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
)
__slots__ = ("_rowbuffer", "alternate_cursor_description")
def __init__(
- self, dbapi_cursor, alternate_description, initial_buffer=None
+ self, dbapi_cursor, alternate_description=None, initial_buffer=None
):
self.alternate_cursor_description = alternate_description
if initial_buffer is not None:
self.connection._safe_close_cursor(cursor)
self._soft_closed = True
- @util.memoized_property
+ @property
+ def inserted_primary_key_rows(self):
+ """Return a list of tuples, each containing the primary key for each row
+ just inserted.
+
+ Usually, this method will return at most a list with a single
+ entry which is the same row one would get back from
+ :attr:`_engine.CursorResult.inserted_primary_key`. To support
+ "executemany with INSERT" mode, multiple rows can be part of the
+ list returned.
+
+ .. versionadded:: 1.4
+
+ """
+ if not self.context.compiled:
+ raise exc.InvalidRequestError(
+ "Statement is not a compiled " "expression construct."
+ )
+ elif not self.context.isinsert:
+ raise exc.InvalidRequestError(
+ "Statement is not an insert() " "expression construct."
+ )
+ elif self.context._is_explicit_returning:
+ raise exc.InvalidRequestError(
+ "Can't call inserted_primary_key "
+ "when returning() "
+ "is used."
+ )
+ return self.context.inserted_primary_key_rows
+
+ @property
def inserted_primary_key(self):
"""Return the primary key for the row just inserted.
"""
- if not self.context.compiled:
+ if self.context.executemany:
raise exc.InvalidRequestError(
- "Statement is not a compiled " "expression construct."
- )
- elif not self.context.isinsert:
- raise exc.InvalidRequestError(
- "Statement is not an insert() " "expression construct."
- )
- elif self.context._is_explicit_returning:
- raise exc.InvalidRequestError(
- "Can't call inserted_primary_key "
- "when returning() "
- "is used."
+ "This statement was an executemany call; if primary key "
+ "returning is supported, please "
+ "use .inserted_primary_key_rows."
)
- return self.context.inserted_primary_key
+ ikp = self.inserted_primary_key_rows
+ if ikp:
+ return ikp[0]
+ else:
+ return None
def last_updated_params(self):
"""Return the collection of updated parameters from this
else:
return self.context.compiled_parameters[0]
+ @property
+ def returned_defaults_rows(self):
+ """Return a list of rows each containing the values of default
+ columns that were fetched using
+ the :meth:`.ValuesBase.return_defaults` feature.
+
+ The return value is a list of :class:`.Row` objects.
+
+ .. versionadded:: 1.4
+
+ """
+ return self.context.returned_default_rows
+
@property
def returned_defaults(self):
"""Return the values of default columns that were fetched using
:meth:`.ValuesBase.return_defaults`
"""
- return self.context.returned_defaults
+
+ if self.context.executemany:
+ raise exc.InvalidRequestError(
+ "This statement was an executemany call; if return defaults "
+ "is supported, please use .returned_defaults_rows."
+ )
+
+ rows = self.context.returned_default_rows
+ if rows:
+ return rows[0]
+ else:
+ return None
def lastrow_has_defaults(self):
"""Return ``lastrow_has_defaults()`` from the underlying
postfetch_lastrowid = True
implicit_returning = False
full_returning = False
+ insert_executemany_returning = False
cte_follows_insert = False
compiled = None
statement = None
result_column_struct = None
- returned_defaults = None
+ returned_default_rows = None
execution_options = util.immutabledict()
cursor_fetch_strategy = _cursor._DEFAULT_FETCH
if self.isinsert:
if self._is_implicit_returning:
- row = result.fetchone()
- self.returned_defaults = row
- self._setup_ins_pk_from_implicit_returning(row)
+ rows = result.all()
- # test that it has a cursor metadata that is accurate.
- # the first row will have been fetched and current assumptions
+ self.returned_default_rows = rows
+
+ self._setup_ins_pk_from_implicit_returning(result, rows)
+
+ # test that it has a cursor metadata that is accurate. the
+ # first row will have been fetched and current assumptions
# are that the result has only one row, until executemany()
# support is added here.
assert result._metadata.returns_rows
elif self.isupdate and self._is_implicit_returning:
row = result.fetchone()
- self.returned_defaults = row
+ self.returned_default_rows = [row]
result._soft_close()
# test that it has a cursor metadata that is accurate.
return result
def _setup_ins_pk_from_lastrowid(self):
- key_getter = self.compiled._key_getters_for_crud_column[2]
- table = self.compiled.statement.table
- compiled_params = self.compiled_parameters[0]
+
+ getter = self.compiled._inserted_primary_key_from_lastrowid_getter
lastrowid = self.get_lastrowid()
- if lastrowid is not None:
- autoinc_col = table._autoincrement_column
- if autoinc_col is not None:
- # apply type post processors to the lastrowid
- proc = autoinc_col.type._cached_result_processor(
- self.dialect, None
- )
- if proc is not None:
- lastrowid = proc(lastrowid)
- self.inserted_primary_key = [
- lastrowid
- if c is autoinc_col
- else compiled_params.get(key_getter(c), None)
- for c in table.primary_key
- ]
- else:
- # don't have a usable lastrowid, so
- # do the same as _setup_ins_pk_from_empty
- self.inserted_primary_key = [
- compiled_params.get(key_getter(c), None)
- for c in table.primary_key
- ]
+ self.inserted_primary_key_rows = [
+ getter(lastrowid, self.compiled_parameters[0])
+ ]
def _setup_ins_pk_from_empty(self):
- key_getter = self.compiled._key_getters_for_crud_column[2]
- table = self.compiled.statement.table
- compiled_params = self.compiled_parameters[0]
- self.inserted_primary_key = [
- compiled_params.get(key_getter(c), None) for c in table.primary_key
+
+ getter = self.compiled._inserted_primary_key_from_lastrowid_getter
+
+ self.inserted_primary_key_rows = [
+ getter(None, self.compiled_parameters[0])
]
- def _setup_ins_pk_from_implicit_returning(self, row):
- if row is None:
- self.inserted_primary_key = None
+ def _setup_ins_pk_from_implicit_returning(self, result, rows):
+
+ if not rows:
+ self.inserted_primary_key_rows = []
return
- key_getter = self.compiled._key_getters_for_crud_column[2]
- table = self.compiled.statement.table
- compiled_params = self.compiled_parameters[0]
-
- # TODO: why are we using keyed index here? can't we get the ints?
- # can compiler build up the structure here as far as what was
- # explicit and what comes back in returning?
- row_mapping = row._mapping
- self.inserted_primary_key = [
- row_mapping[col] if value is None else value
- for col, value in [
- (col, compiled_params.get(key_getter(col), None))
- for col in table.primary_key
- ]
+ getter = self.compiled._inserted_primary_key_from_returning_getter
+ compiled_params = self.compiled_parameters
+
+ self.inserted_primary_key_rows = [
+ getter(row, param) for row, param in zip(rows, compiled_params)
]
def lastrow_has_defaults(self):
statement, params
)
- primary_key = result.context.inserted_primary_key
+ primary_key = result.inserted_primary_key
if primary_key is not None:
# set primary key attributes
for pk, col in zip(
load_evt_attrs = []
if returning_cols:
- row = result.context.returned_defaults
+ row = result.returned_defaults
if row is not None:
for row_value, col in zip(row, returning_cols):
# pk cols returned from insert are handled
"""
+ inline = False
+
def __init__(
self,
dialect,
statement,
cache_key=None,
column_keys=None,
- inline=False,
+ for_executemany=False,
linting=NO_LINTING,
**kwargs
):
:param column_keys: a list of column names to be compiled into an
INSERT or UPDATE statement.
- :param inline: whether to generate INSERT statements as "inline", e.g.
- not formatted to return any generated defaults
+ :param for_executemany: whether INSERT / UPDATE statements should
+ expect that they are to be invoked in an "executemany" style,
+ which may impact how the statement will be expected to return the
+ values of defaults and autoincrement / sequences and similar.
+ Depending on the backend and driver in use, support for retreiving
+ these values may be disabled which means SQL expressions may
+ be rendered inline, RETURNING may not be rendered, etc.
:param kwargs: additional keyword arguments to be consumed by the
superclass.
if cache_key:
self._cache_key_bind_match = {b: b for b in cache_key[1]}
- # compile INSERT/UPDATE defaults/sequences inlined (no pre-
- # execute)
- self.inline = inline or getattr(statement, "_inline", False)
+ # compile INSERT/UPDATE defaults/sequences to expect executemany
+ # style execution, which may mean no pre-execute of defaults,
+ # or no RETURNING
+ self.for_executemany = for_executemany
self.linting = linting
Compiled.__init__(self, dialect, statement, **kwargs)
- if (
- self.isinsert or self.isupdate or self.isdelete
- ) and statement._returning:
- self.returning = statement._returning
+ if self.isinsert or self.isupdate or self.isdelete:
+ if statement._returning:
+ self.returning = statement._returning
+
+ if self.isinsert or self.isupdate:
+ if statement._inline:
+ self.inline = True
+ elif self.for_executemany and (
+ not self.isinsert
+ or (
+ self.dialect.insert_executemany_returning
+ and statement._return_defaults
+ )
+ ):
+ self.inline = True
if self.positional and self._numeric_binds:
self._apply_numbered_params()
self._result_columns
)
+ @util.memoized_property
+ def _inserted_primary_key_from_lastrowid_getter(self):
+ key_getter = self._key_getters_for_crud_column[2]
+ table = self.statement.table
+
+ getters = [
+ (operator.methodcaller("get", key_getter(col), None), col)
+ for col in table.primary_key
+ ]
+
+ autoinc_col = table._autoincrement_column
+ if autoinc_col is not None:
+ # apply type post processors to the lastrowid
+ proc = autoinc_col.type._cached_result_processor(
+ self.dialect, None
+ )
+ else:
+ proc = None
+
+ def get(lastrowid, parameters):
+ if proc is not None:
+ lastrowid = proc(lastrowid)
+
+ if lastrowid is None:
+ return tuple(getter(parameters) for getter, col in getters)
+ else:
+ return tuple(
+ lastrowid if col is autoinc_col else getter(parameters)
+ for getter, col in getters
+ )
+
+ return get
+
+ @util.memoized_property
+ def _inserted_primary_key_from_returning_getter(self):
+ key_getter = self._key_getters_for_crud_column[2]
+ table = self.statement.table
+
+ ret = {col: idx for idx, col in enumerate(self.returning)}
+
+ getters = [
+ (operator.itemgetter(ret[col]), True)
+ if col in ret
+ else (operator.methodcaller("get", key_getter(col), None), False)
+ for col in table.primary_key
+ ]
+
+ def get(row, parameters):
+ return tuple(
+ getter(row) if use_row else getter(parameters)
+ for getter, use_row in getters
+ )
+
+ return get
+
def default_from(self):
"""Called when a SELECT statement has no froms, and no FROM clause is
to be appended.
need_pks = (
compile_state.isinsert
- and not compiler.inline
+ and not stmt._inline
+ and (
+ not compiler.for_executemany
+ or (
+ compiler.dialect.insert_executemany_returning
+ and stmt._return_defaults
+ )
+ )
and not stmt._returning
and not compile_state._has_multi_parameters
)
This method differs from :meth:`.UpdateBase.returning` in these ways:
- 1. :meth:`.ValuesBase.return_defaults` is only intended for use with
- an INSERT or an UPDATE statement that matches exactly one row.
- While the RETURNING construct in the general sense supports
- multiple rows for a multi-row UPDATE or DELETE statement, or for
- special cases of INSERT that return multiple rows (e.g. INSERT from
- SELECT, multi-valued VALUES clause),
+ 1. :meth:`.ValuesBase.return_defaults` is only intended for use with an
+ INSERT or an UPDATE statement that matches exactly one row per
+ parameter set. While the RETURNING construct in the general sense
+ supports multiple rows for a multi-row UPDATE or DELETE statement,
+ or for special cases of INSERT that return multiple rows (e.g.
+ INSERT from SELECT, multi-valued VALUES clause),
:meth:`.ValuesBase.return_defaults` is intended only for an
- "ORM-style" single-row INSERT/UPDATE statement. The row returned
- by the statement is also consumed implicitly when
+ "ORM-style" single-row INSERT/UPDATE statement. The row
+ returned by the statement is also consumed implicitly when
:meth:`.ValuesBase.return_defaults` is used. By contrast,
- :meth:`.UpdateBase.returning` leaves the RETURNING result-set
- intact with a collection of any number of rows.
+ :meth:`.UpdateBase.returning` leaves the RETURNING result-set intact
+ with a collection of any number of rows.
2. It is compatible with the existing logic to fetch auto-generated
primary key values, also known as "implicit returning". Backends
an exception. The return value of
:attr:`_engine.CursorResult.returned_defaults` will be ``None``
+ 4. An INSERT statement invoked with executemany() is supported if the
+ backend database driver supports the
+ ``insert_executemany_returning`` feature, currently this includes
+ PostgreSQL with psycopg2. When executemany is used, the
+ :attr:`_engine.CursorResult.returned_defaults_rows` and
+ :attr:`_engine.CursorResult.inserted_primary_key_rows` accessors
+ will return the inserted defaults and primary keys.
+
+ .. versionadded:: 1.4
+
:meth:`.ValuesBase.return_defaults` is used by the ORM to provide
an efficient implementation for the ``eager_defaults`` feature of
:func:`.mapper`.
:attr:`_engine.CursorResult.returned_defaults`
+ :attr:`_engine.CursorResult.returned_defaults_rows`
+
+ :attr:`_engine.CursorResult.inserted_primary_key`
+
+ :attr:`_engine.CursorResult.inserted_primary_key_rows`
+
"""
self._return_defaults = cols or True
's bound engine,
if any.
- :param inline: Used for INSERT statements, for a dialect which does
- not support inline retrieval of newly generated primary key
- columns, will force the expression used to create the new primary
- key value to be rendered inline within the INSERT statement's
- VALUES clause. This typically refers to Sequence execution but may
- also refer to any server-side default generation function
- associated with a primary key `Column`.
-
:param compile_kwargs: optional dictionary of additional parameters
that will be passed through to the compiler within all "visit"
methods. This allows any custom flag to be passed through to
dialect,
compiled_cache=None,
column_keys=None,
- inline=False,
+ for_executemany=False,
schema_translate_map=None,
**kw
):
cache_key,
tuple(column_keys),
bool(schema_translate_map),
- inline,
+ for_executemany,
)
compiled_sql = compiled_cache.get(key)
dialect,
cache_key=elem_cache_key,
column_keys=column_keys,
- inline=inline,
+ for_executemany=for_executemany,
schema_translate_map=schema_translate_map,
**kw
)
dialect,
cache_key=elem_cache_key,
column_keys=column_keys,
- inline=inline,
+ for_executemany=for_executemany,
schema_translate_map=schema_translate_map,
**kw
)
schema_translate_map=None,
render_schema_translate=False,
default_schema_name=None,
- inline_flag=None,
):
if use_default_dialect:
dialect = default.DefaultDialect()
},
check_post_param,
)
- if inline_flag is not None:
- eq_(c.inline, inline_flag)
class ComparesTables(object):
compiled = execute_observed.clauseelement.compile(
dialect=compare_dialect,
column_keys=context.compiled.column_keys,
- inline=context.compiled.inline,
+ for_executemany=context.compiled.for_executemany,
schema_translate_map=map_,
)
_received_statement = re.sub(r"[\n\t]", "", util.text_type(compiled))
"%(database)s %(does_support)s 'RETURNING of multiple rows'",
)
+ @property
+ def insert_executemany_returning(self):
+ """target platform supports RETURNING when INSERT is used with
+ executemany(), e.g. multiple parameter sets, indicating
+ as many rows come back as do parameter sets were passed.
+
+ """
+
+ return exclusions.only_if(
+ lambda config: config.db.dialect.insert_executemany_returning,
+ "%(database)s %(does_support)s 'RETURNING of "
+ "multiple rows with INSERT executemany'",
+ )
+
@property
def returning(self):
"""target platform supports RETURNING for at least one row.
self.tables.autoinc_pk.insert(), data="some data"
)
pk = connection.scalar(select([self.tables.autoinc_pk.c.id]))
- eq_(r.inserted_primary_key, [pk])
+ eq_(r.inserted_primary_key, (pk,))
@requirements.dbapi_lastrowid
def test_native_lastrowid_autoinc(self, connection):
)
)
- eq_(result.inserted_primary_key, [None])
+ eq_(result.inserted_primary_key, (None,))
result = connection.execute(
select([dest_table.c.data]).order_by(dest_table.c.data)
),
)
)
- eq_(result.inserted_primary_key, [None])
+ eq_(result.inserted_primary_key, (None,))
result = connection.execute(
select([dest_table.c.data]).order_by(dest_table.c.data)
self.tables.autoinc_pk.insert(), data="some data"
)
pk = connection.scalar(select([self.tables.autoinc_pk.c.id]))
- eq_(r.inserted_primary_key, [pk])
+ eq_(r.inserted_primary_key, (pk,))
__all__ = ("LastrowidTest", "InsertBehaviorTest", "ReturningTest")
def test_insert_lastrowid(self, connection):
r = connection.execute(self.tables.seq_pk.insert(), data="some data")
- eq_(r.inserted_primary_key, [testing.db.dialect.default_sequence_base])
+ eq_(
+ r.inserted_primary_key, (testing.db.dialect.default_sequence_base,)
+ )
def test_nextval_direct(self, connection):
r = connection.execute(self.tables.seq_pk.c.id.default)
r = connection.execute(
self.tables.seq_opt_pk.insert(), data="some data"
)
- eq_(r.inserted_primary_key, [1])
+ eq_(r.inserted_primary_key, (1,))
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
mssql_pyodbc = pyodbc
mysql = mysqlclient
oracle = cx_oracle
-postgresql = psycopg2
+postgresql = psycopg2>=2.7
postgresql_pg8000 = pg8000
postgresql_psycopg2binary = psycopg2-binary
postgresql_psycopg2cffi = psycopg2cffi
eq_([(9, "Python")], list(cats))
result = conn.execute(cattable.insert().values(description="PHP"))
- eq_([10], result.inserted_primary_key)
+ eq_(result.inserted_primary_key, (10,))
lastcat = conn.execute(
cattable.select().order_by(desc(cattable.c.id))
)
# if windows drivers / servers have different behavior here.
meta.create_all(connection)
r = connection.execute(t2.insert(), descr="hello")
- self.assert_(r.inserted_primary_key == [200])
+ eq_(r.inserted_primary_key, (200,))
r = connection.execute(t1.insert(), descr="hello")
- self.assert_(r.inserted_primary_key == [100])
+ eq_(r.inserted_primary_key, (100,))
@testing.provide_metadata
def _test_disable_scope_identity(self):
)
stmt = stmt.on_duplicate_key_update(bar=stmt.inserted.bar)
result = conn.execute(stmt)
- eq_(result.inserted_primary_key, [2])
+ eq_(result.inserted_primary_key, (2,))
eq_(
conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
[(1, "ab", "bz", False)],
)
stmt = stmt.on_duplicate_key_update(updated_once=None)
result = conn.execute(stmt)
- eq_(result.inserted_primary_key, [2])
+ eq_(result.inserted_primary_key, (2,))
eq_(
conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
[(1, "b", "bz", None)],
bar=func.concat(stmt.inserted.bar, "_foo")
)
result = conn.execute(stmt)
- eq_(result.inserted_primary_key, [2])
+ eq_(result.inserted_primary_key, (2,))
eq_(
conn.execute(foos.select().where(foos.c.id == 1)).fetchall(),
[(1, "ab_foo", "bz", False)],
bar=stmt.inserted.bar, baz="newbz"
)
)
- eq_(result.inserted_primary_key, [1])
+ eq_(result.inserted_primary_key, (1,))
stmt = insert(foos).values({"id": 1, "bar": "b", "baz": "bz"})
result = conn.execute(
bar=stmt.inserted.bar, baz="newbz"
)
)
- eq_(result.inserted_primary_key, [1])
+ eq_(result.inserted_primary_key, (1,))
from sqlalchemy.sql import table
from sqlalchemy.sql import util as sql_util
from sqlalchemy.testing import engines
+from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.assertions import assert_raises
from sqlalchemy.testing.assertions import assert_raises_message
t.create(engine)
with engine.begin() as conn:
r = conn.execute(t.insert())
- assert r.inserted_primary_key == [1]
+ eq_(r.inserted_primary_key, (1,))
class CompileTest(fixtures.TestBase, AssertsCompiledSQL):
# coding: utf-8
-import contextlib
import datetime
import logging
import logging.handlers
from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_BATCH
from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_DEFAULT
from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_VALUES
+from sqlalchemy.engine import cursor as _cursor
from sqlalchemy.engine import engine_from_config
from sqlalchemy.engine import url
from sqlalchemy.testing import engines
-from sqlalchemy.testing import expect_deprecated
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import mock
from sqlalchemy.testing.assertions import ne_
from ...engine import test_execute
+if True:
+ from sqlalchemy.dialects.postgresql.psycopg2 import (
+ EXECUTEMANY_VALUES_PLUS_BATCH,
+ )
+
class DialectTest(fixtures.TestBase):
"""python-side dialect tests. """
Column("z", Integer, server_default="5"),
)
- @contextlib.contextmanager
- def expect_deprecated_opts(self):
- yield
-
def setup(self):
super(ExecuteManyMode, self).setup()
- with self.expect_deprecated_opts():
- self.engine = engines.testing_engine(options=self.options)
+ self.engine = engines.testing_engine(options=self.options)
def teardown(self):
self.engine.dispose()
def test_insert(self):
from psycopg2 import extras
- if self.engine.dialect.executemany_mode is EXECUTEMANY_BATCH:
+ values_page_size = self.engine.dialect.executemany_values_page_size
+ batch_page_size = self.engine.dialect.executemany_batch_page_size
+ if self.engine.dialect.executemany_mode & EXECUTEMANY_VALUES:
+ meth = extras.execute_values
+ stmt = "INSERT INTO data (x, y) VALUES %s"
+ expected_kwargs = {
+ "template": "(%(x)s, %(y)s)",
+ "page_size": values_page_size,
+ "fetch": False,
+ }
+ elif self.engine.dialect.executemany_mode & EXECUTEMANY_BATCH:
meth = extras.execute_batch
stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)"
- expected_kwargs = {}
+ expected_kwargs = {"page_size": batch_page_size}
else:
- meth = extras.execute_values
- stmt = "INSERT INTO data (x, y) VALUES %s"
- expected_kwargs = {"template": "(%(x)s, %(y)s)"}
+ assert False
with mock.patch.object(
extras, meth.__name__, side_effect=meth
def test_insert_no_page_size(self):
from psycopg2 import extras
+ values_page_size = self.engine.dialect.executemany_values_page_size
+ batch_page_size = self.engine.dialect.executemany_batch_page_size
+
eng = self.engine
- if eng.dialect.executemany_mode is EXECUTEMANY_BATCH:
+ if eng.dialect.executemany_mode & EXECUTEMANY_VALUES:
+ meth = extras.execute_values
+ stmt = "INSERT INTO data (x, y) VALUES %s"
+ expected_kwargs = {
+ "template": "(%(x)s, %(y)s)",
+ "page_size": values_page_size,
+ "fetch": False,
+ }
+ elif eng.dialect.executemany_mode & EXECUTEMANY_BATCH:
meth = extras.execute_batch
stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)"
- expected_kwargs = {}
+ expected_kwargs = {"page_size": batch_page_size}
else:
- meth = extras.execute_values
- stmt = "INSERT INTO data (x, y) VALUES %s"
- expected_kwargs = {"template": "(%(x)s, %(y)s)"}
+ assert False
with mock.patch.object(
extras, meth.__name__, side_effect=meth
opts["executemany_batch_page_size"] = 500
opts["executemany_values_page_size"] = 1000
- with self.expect_deprecated_opts():
- eng = engines.testing_engine(options=opts)
+ eng = engines.testing_engine(options=opts)
- if eng.dialect.executemany_mode is EXECUTEMANY_BATCH:
+ if eng.dialect.executemany_mode & EXECUTEMANY_VALUES:
+ meth = extras.execute_values
+ stmt = "INSERT INTO data (x, y) VALUES %s"
+ expected_kwargs = {
+ "fetch": False,
+ "page_size": 1000,
+ "template": "(%(x)s, %(y)s)",
+ }
+ elif eng.dialect.executemany_mode & EXECUTEMANY_BATCH:
meth = extras.execute_batch
stmt = "INSERT INTO data (x, y) VALUES (%(x)s, %(y)s)"
expected_kwargs = {"page_size": 500}
else:
- meth = extras.execute_values
- stmt = "INSERT INTO data (x, y) VALUES %s"
- expected_kwargs = {"page_size": 1000, "template": "(%(x)s, %(y)s)"}
+ assert False
with mock.patch.object(
extras, meth.__name__, side_effect=meth
def test_update_fallback(self):
from psycopg2 import extras
+ batch_page_size = self.engine.dialect.executemany_batch_page_size
eng = self.engine
meth = extras.execute_batch
stmt = "UPDATE data SET y=%(yval)s WHERE data.x = %(xval)s"
- expected_kwargs = {}
+ expected_kwargs = {"page_size": batch_page_size}
with mock.patch.object(
extras, meth.__name__, side_effect=meth
],
)
- eq_(
- mock_exec.mock_calls,
- [
- mock.call(
- mock.ANY,
- stmt,
- (
- {"xval": "x1", "yval": "y5"},
- {"xval": "x3", "yval": "y6"},
- ),
- **expected_kwargs
- )
- ],
- )
+ if eng.dialect.executemany_mode & EXECUTEMANY_BATCH:
+ eq_(
+ mock_exec.mock_calls,
+ [
+ mock.call(
+ mock.ANY,
+ stmt,
+ (
+ {"xval": "x1", "yval": "y5"},
+ {"xval": "x3", "yval": "y6"},
+ ),
+ **expected_kwargs
+ )
+ ],
+ )
+ else:
+ eq_(mock_exec.mock_calls, [])
def test_not_sane_rowcount(self):
self.engine.connect().close()
- assert not self.engine.dialect.supports_sane_multi_rowcount
+ if self.engine.dialect.executemany_mode & EXECUTEMANY_BATCH:
+ assert not self.engine.dialect.supports_sane_multi_rowcount
+ else:
+ assert self.engine.dialect.supports_sane_multi_rowcount
def test_update(self):
with self.engine.connect() as conn:
)
-class UseBatchModeTest(ExecuteManyMode, fixtures.TablesTest):
- options = {"use_batch_mode": True}
-
- def expect_deprecated_opts(self):
- return expect_deprecated(
- "The psycopg2 use_batch_mode flag is superseded by "
- "executemany_mode='batch'"
- )
-
-
class ExecutemanyBatchModeTest(ExecuteManyMode, fixtures.TablesTest):
options = {"executemany_mode": "batch"}
class ExecutemanyValuesInsertsTest(ExecuteManyMode, fixtures.TablesTest):
- options = {"executemany_mode": "values"}
+ options = {"executemany_mode": "values_only"}
+
+ def test_insert_returning_values(self):
+ """the psycopg2 dialect needs to assemble a fully buffered result
+ with the return value of execute_values().
+
+ """
+ t = self.tables.data
+
+ with self.engine.connect() as conn:
+ page_size = conn.dialect.executemany_values_page_size or 100
+ data = [
+ {"x": "x%d" % i, "y": "y%d" % i}
+ for i in range(1, page_size * 5 + 27)
+ ]
+ result = conn.execute(t.insert().returning(t.c.x, t.c.y), data)
+
+ eq_([tup[0] for tup in result.cursor.description], ["x", "y"])
+ eq_(result.keys(), ["x", "y"])
+ assert t.c.x in result.keys()
+ assert t.c.id not in result.keys()
+ assert not result._soft_closed
+ assert isinstance(
+ result.cursor_strategy,
+ _cursor.FullyBufferedCursorFetchStrategy,
+ )
+ assert not result.cursor.closed
+ assert not result.closed
+ eq_(result.mappings().all(), data)
+
+ assert result._soft_closed
+ # assert result.closed
+ assert result.cursor is None
+
+ def test_insert_returning_defaults(self):
+ t = self.tables.data
+
+ with self.engine.connect() as conn:
+
+ result = conn.execute(t.insert(), {"x": "x0", "y": "y0"})
+ first_pk = result.inserted_primary_key[0]
+
+ page_size = conn.dialect.executemany_values_page_size or 100
+ total_rows = page_size * 5 + 27
+ data = [
+ {"x": "x%d" % i, "y": "y%d" % i} for i in range(1, total_rows)
+ ]
+ result = conn.execute(t.insert().returning(t.c.id, t.c.z), data)
+
+ eq_(
+ result.all(),
+ [(pk, 5) for pk in range(1 + first_pk, total_rows + first_pk)],
+ )
def test_insert_w_newlines(self):
from psycopg2 import extras
{"id": 3, "y": "y3", "z": 3},
),
template="(%(id)s, (SELECT 5 \nFROM data), %(y)s, %(z)s)",
+ fetch=False,
+ page_size=conn.dialect.executemany_values_page_size,
)
],
)
)
eq_(mock_values.mock_calls, [])
- eq_(
- mock_batch.mock_calls,
- [
- mock.call(
- mock.ANY,
- "INSERT INTO data (id, y, z) VALUES "
- "(%(id)s, %(y)s, %(z)s)",
- (
- {"id": 1, "y": "y1", "z": 1},
- {"id": 2, "y": "y2", "z": 2},
- {"id": 3, "y": "y3", "z": 3},
- ),
- )
- ],
- )
+
+ if self.engine.dialect.executemany_mode & EXECUTEMANY_BATCH:
+ eq_(
+ mock_batch.mock_calls,
+ [
+ mock.call(
+ mock.ANY,
+ "INSERT INTO data (id, y, z) VALUES "
+ "(%(id)s, %(y)s, %(z)s)",
+ (
+ {"id": 1, "y": "y1", "z": 1},
+ {"id": 2, "y": "y2", "z": 2},
+ {"id": 3, "y": "y3", "z": 3},
+ ),
+ )
+ ],
+ )
+ else:
+ eq_(mock_batch.mock_calls, [])
+
+
+class ExecutemanyValuesPlusBatchInsertsTest(
+ ExecuteManyMode, fixtures.TablesTest
+):
+ options = {"executemany_mode": "values_plus_batch"}
class ExecutemanyFlagOptionsTest(fixtures.TablesTest):
for opt, expected in [
(None, EXECUTEMANY_DEFAULT),
("batch", EXECUTEMANY_BATCH),
- ("values", EXECUTEMANY_VALUES),
+ ("values_only", EXECUTEMANY_VALUES),
+ ("values_plus_batch", EXECUTEMANY_VALUES_PLUS_BATCH),
]:
self.engine = engines.testing_engine(
options={"executemany_mode": opt}
r = connection.execute(
t.insert(), user_name="user", user_password="lala"
)
- assert r.inserted_primary_key == [1]
+ eq_(r.inserted_primary_key, (1,))
result = connection.execute(t.select()).fetchall()
assert result == [(1, "user", "lala")]
connection.execute(text("DROP TABLE speedy_users"))
insert(users).on_conflict_do_nothing(),
dict(id=1, name="name1"),
)
- eq_(result.inserted_primary_key, [1])
+ eq_(result.inserted_primary_key, (1,))
eq_(result.returned_defaults, None)
result = conn.execute(
insert(users).on_conflict_do_nothing(),
dict(id=1, name="name2"),
)
- eq_(result.inserted_primary_key, [1])
+ eq_(result.inserted_primary_key, (1,))
eq_(result.returned_defaults, None)
eq_(
insert(users).on_conflict_do_nothing(constraint="uq_login_email"),
dict(name="name1", login_email="email1"),
)
- eq_(result.inserted_primary_key, [1])
+ eq_(result.inserted_primary_key, (1,))
eq_(result.returned_defaults, (1,))
result = connection.execute(
),
dict(id=1, name="name1"),
)
- eq_(result.inserted_primary_key, [1])
+ eq_(result.inserted_primary_key, (1,))
eq_(result.returned_defaults, None)
result = conn.execute(
),
dict(id=1, name="name2"),
)
- eq_(result.inserted_primary_key, [1])
+ eq_(result.inserted_primary_key, (1,))
eq_(result.returned_defaults, None)
eq_(
)
result = conn.execute(i, dict(id=1, name="name1"))
- eq_(result.inserted_primary_key, [1])
+ eq_(result.inserted_primary_key, (1,))
eq_(result.returned_defaults, None)
eq_(
)
result = conn.execute(i, dict(id=1, name="name2"))
- eq_(result.inserted_primary_key, [1])
+ eq_(result.inserted_primary_key, (1,))
eq_(result.returned_defaults, None)
eq_(
set_=dict(name=i.excluded.name),
)
result = conn.execute(i, dict(id=1, name="name3"))
- eq_(result.inserted_primary_key, [1])
+ eq_(result.inserted_primary_key, (1,))
eq_(result.returned_defaults, None)
eq_(
).values(id=1, name="name4")
result = conn.execute(i)
- eq_(result.inserted_primary_key, [1])
+ eq_(result.inserted_primary_key, (1,))
eq_(result.returned_defaults, None)
eq_(
).values(id=1, name="name4")
result = conn.execute(i)
- eq_(result.inserted_primary_key, [1])
+ eq_(result.inserted_primary_key, (1,))
eq_(result.returned_defaults, None)
eq_(
)
result = conn.execute(i)
- eq_(result.inserted_primary_key, [None])
+ eq_(result.inserted_primary_key, (None,))
eq_(result.returned_defaults, None)
eq_(
lets_index_this="not",
),
)
- eq_(result.inserted_primary_key, [1])
+ eq_(result.inserted_primary_key, (1,))
eq_(result.returned_defaults, None)
eq_(
lets_index_this="unique",
),
)
- eq_(result.inserted_primary_key, [42])
+ eq_(result.inserted_primary_key, (42,))
eq_(result.returned_defaults, None)
eq_(
lets_index_this="unique",
),
)
- eq_(result.inserted_primary_key, [43])
+ eq_(result.inserted_primary_key, (43,))
eq_(result.returned_defaults, None)
eq_(
result = conn.execute(
i, dict(name="name3", login_email="name1@gmail.com")
)
- eq_(result.inserted_primary_key, [1])
+ eq_(result.inserted_primary_key, (1,))
eq_(result.returned_defaults, (1,))
eq_(
# execute with explicit id
r = conn.execute(table.insert(), {"id": 30, "data": "d1"})
- eq_(r.inserted_primary_key, [30])
+ eq_(r.inserted_primary_key, (30,))
# execute with prefetch id
r = conn.execute(table.insert(), {"data": "d2"})
- eq_(r.inserted_primary_key, [1])
+ eq_(r.inserted_primary_key, (1,))
# executemany with explicit ids
with engine.connect() as conn:
conn.execute(table.insert(), {"id": 30, "data": "d1"})
r = conn.execute(table.insert(), {"data": "d2"})
- eq_(r.inserted_primary_key, [5])
+ eq_(r.inserted_primary_key, (5,))
conn.execute(
table.insert(),
{"id": 31, "data": "d3"},
# execute with explicit id
r = conn.execute(table.insert(), {"id": 30, "data": "d1"})
- eq_(r.inserted_primary_key, [30])
+ eq_(r.inserted_primary_key, (30,))
# execute with prefetch id
r = conn.execute(table.insert(), {"data": "d2"})
- eq_(r.inserted_primary_key, [1])
+ eq_(r.inserted_primary_key, (1,))
# executemany with explicit ids
with engine.connect() as conn:
conn.execute(table.insert(), {"id": 30, "data": "d1"})
r = conn.execute(table.insert(), {"data": "d2"})
- eq_(r.inserted_primary_key, [5])
+ eq_(r.inserted_primary_key, (5,))
conn.execute(
table.insert(),
{"id": 31, "data": "d3"},
t2 = Table("t", m2, autoload=True, implicit_returning=False)
eq_(t2.c.id.server_default.arg.text, "nextval('t_id_seq'::regclass)")
r = t2.insert().execute()
- eq_(r.inserted_primary_key, [1])
+ eq_(r.inserted_primary_key, (1,))
testing.db.connect().execution_options(
autocommit=True
).exec_driver_sql("alter table t_id_seq rename to foobar_id_seq")
"nextval('foobar_id_seq'::regclass)",
)
r = t3.insert().execute()
- eq_(r.inserted_primary_key, [2])
+ eq_(r.inserted_primary_key, (2,))
@testing.provide_metadata
def test_altered_type_autoincrement_pk_reflection(self):
from sqlalchemy import column
from sqlalchemy import DateTime
from sqlalchemy import Enum
-from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import Float
from sqlalchemy import func
def test_bind_serialize_default(self):
- dialect = postgresql.dialect()
+ dialect = postgresql.dialect(use_native_hstore=False)
proc = self.test_table.c.hash.type._cached_bind_processor(dialect)
eq_(
proc(util.OrderedDict([("key1", "value1"), ("key2", "value2")])),
)
def test_bind_serialize_with_slashes_and_quotes(self):
- dialect = postgresql.dialect()
+ dialect = postgresql.dialect(use_native_hstore=False)
proc = self.test_table.c.hash.type._cached_bind_processor(dialect)
eq_(proc({'\\"a': '\\"1'}), '"\\\\\\"a"=>"\\\\\\"1"')
def test_parse_error(self):
- dialect = postgresql.dialect()
+ dialect = postgresql.dialect(use_native_hstore=False)
proc = self.test_table.c.hash.type._cached_result_processor(
dialect, None
)
)
def test_result_deserialize_default(self):
- dialect = postgresql.dialect()
+ dialect = postgresql.dialect(use_native_hstore=False)
proc = self.test_table.c.hash.type._cached_result_processor(
dialect, None
)
)
def test_result_deserialize_with_slashes_and_quotes(self):
- dialect = postgresql.dialect()
+ dialect = postgresql.dialect(use_native_hstore=False)
proc = self.test_table.c.hash.type._cached_result_processor(
dialect, None
)
)
self._assert_column_is_JSON_NULL(conn, column="nulldata")
- def _non_native_engine(self, json_serializer=None, json_deserializer=None):
- if json_serializer is not None or json_deserializer is not None:
- options = {
- "json_serializer": json_serializer,
- "json_deserializer": json_deserializer,
- }
- else:
- options = {}
-
- if testing.against(
- "postgresql+psycopg2"
- ) and testing.db.dialect.psycopg2_version >= (2, 5):
- from psycopg2.extras import register_default_json
-
- engine = engines.testing_engine(options=options)
-
- @event.listens_for(engine, "connect")
- def connect(dbapi_connection, connection_record):
- engine.dialect._has_native_json = False
-
- def pass_(value):
- return value
-
- register_default_json(dbapi_connection, loads=pass_)
-
- elif options:
- engine = engines.testing_engine(options=options)
- else:
- engine = testing.db
- engine.connect().close()
- return engine
-
def test_reflect(self):
insp = inspect(testing.db)
cols = insp.get_columns("data_table")
assert isinstance(cols[2]["type"], self.test_type)
- @testing.requires.psycopg2_native_json
- def test_insert_native(self, connection):
+ def test_insert(self, connection):
self._test_insert(connection)
- @testing.requires.psycopg2_native_json
- def test_insert_native_nulls(self, connection):
+ def test_insert_nulls(self, connection):
self._test_insert_nulls(connection)
- @testing.requires.psycopg2_native_json
- def test_insert_native_none_as_null(self, connection):
+ def test_insert_none_as_null(self, connection):
self._test_insert_none_as_null(connection)
- @testing.requires.psycopg2_native_json
- def test_insert_native_nulljson_into_none_as_null(self, connection):
+ def test_insert_nulljson_into_none_as_null(self, connection):
self._test_insert_nulljson_into_none_as_null(connection)
- def test_insert_python(self):
- engine = self._non_native_engine()
- self._test_insert(engine)
-
- def test_insert_python_nulls(self):
- engine = self._non_native_engine()
- self._test_insert_nulls(engine)
-
- def test_insert_python_none_as_null(self):
- engine = self._non_native_engine()
- self._test_insert_none_as_null(engine)
-
- def test_insert_python_nulljson_into_none_as_null(self):
- engine = self._non_native_engine()
- self._test_insert_nulljson_into_none_as_null(engine)
-
- def _test_custom_serialize_deserialize(self, native):
+ def test_custom_serialize_deserialize(self):
import json
def loads(value):
value["x"] = "dumps_y"
return json.dumps(value)
- if native:
- engine = engines.testing_engine(
- options=dict(json_serializer=dumps, json_deserializer=loads)
- )
- else:
- engine = self._non_native_engine(
- json_serializer=dumps, json_deserializer=loads
- )
+ engine = engines.testing_engine(
+ options=dict(json_serializer=dumps, json_deserializer=loads)
+ )
s = select([cast({"key": "value", "x": "q"}, self.test_type)])
with engine.begin() as conn:
eq_(conn.scalar(s), {"key": "value", "x": "dumps_y_loads"})
- @testing.requires.psycopg2_native_json
- def test_custom_native(self):
- self._test_custom_serialize_deserialize(True)
-
- @testing.requires.psycopg2_native_json
- def test_custom_python(self):
- self._test_custom_serialize_deserialize(False)
-
- @testing.requires.psycopg2_native_json
- def test_criterion_native(self):
+ def test_criterion(self):
engine = testing.db
self._fixture_data(engine)
self._test_criterion(engine)
- def test_criterion_python(self):
- engine = self._non_native_engine()
- self._fixture_data(engine)
- self._test_criterion(engine)
-
def test_path_query(self, connection):
engine = testing.db
self._fixture_data(engine)
).first()
eq_(result, ({"k1": "r3v1", "k2": "r3v2"},))
- def _test_fixed_round_trip(self, engine):
- with engine.begin() as conn:
- s = select(
- [
- cast(
- {"key": "value", "key2": {"k1": "v1", "k2": "v2"}},
- self.test_type,
- )
- ]
- )
- eq_(
- conn.scalar(s),
- {"key": "value", "key2": {"k1": "v1", "k2": "v2"}},
- )
-
- def test_fixed_round_trip_python(self):
- engine = self._non_native_engine()
- self._test_fixed_round_trip(engine)
-
- @testing.requires.psycopg2_native_json
- def test_fixed_round_trip_native(self):
- engine = testing.db
- self._test_fixed_round_trip(engine)
-
- def _test_unicode_round_trip(self, engine):
- with engine.begin() as conn:
- s = select(
- [
- cast(
- {
- util.u("réveillé"): util.u("réveillé"),
- "data": {"k1": util.u("drôle")},
- },
- self.test_type,
- )
- ]
- )
- eq_(
- conn.scalar(s),
- {
- util.u("réveillé"): util.u("réveillé"),
- "data": {"k1": util.u("drôle")},
- },
- )
-
- def test_unicode_round_trip_python(self):
- engine = self._non_native_engine()
- self._test_unicode_round_trip(engine)
+ def test_fixed_round_trip(self, connection):
+ s = select(
+ [
+ cast(
+ {"key": "value", "key2": {"k1": "v1", "k2": "v2"}},
+ self.test_type,
+ )
+ ]
+ )
+ eq_(
+ connection.scalar(s),
+ {"key": "value", "key2": {"k1": "v1", "k2": "v2"}},
+ )
- @testing.requires.psycopg2_native_json
- def test_unicode_round_trip_native(self):
- engine = testing.db
- self._test_unicode_round_trip(engine)
+ def test_unicode_round_trip(self, connection):
+ s = select(
+ [
+ cast(
+ {
+ util.u("réveillé"): util.u("réveillé"),
+ "data": {"k1": util.u("drôle")},
+ },
+ self.test_type,
+ )
+ ]
+ )
+ eq_(
+ connection.scalar(s),
+ {
+ util.u("réveillé"): util.u("réveillé"),
+ "data": {"k1": util.u("drôle")},
+ },
+ )
def test_eval_none_flag_orm(self):
Base = declarative_base()
test_type = JSONB
@testing.requires.postgresql_utf8_server_encoding
- def test_unicode_round_trip_python(self):
- super(JSONBRoundTripTest, self).test_unicode_round_trip_python()
-
- @testing.requires.postgresql_utf8_server_encoding
- def test_unicode_round_trip_native(self):
- super(JSONBRoundTripTest, self).test_unicode_round_trip_native()
+ def test_unicode_round_trip(self, connection):
+ super(JSONBRoundTripTest, self).test_unicode_round_trip(connection)
class JSONBSuiteTest(suite.JSONTest):
def test_manytomany_passive(self):
self._test_manytomany(True)
- @testing.requires.non_updating_cascade
+ @testing.fails_if(
+ testing.requires.on_update_cascade
+ + testing.requires.sane_multi_rowcount
+ )
def test_manytomany_nonpassive(self):
self._test_manytomany(False)
and config.db.dialect._dbapi_version <= (1, 10, 1)
)
- @property
- def psycopg2_native_json(self):
- return self.psycopg2_compatibility
-
@property
def psycopg2_native_hstore(self):
return self.psycopg2_compatibility
with engine.begin() as conn:
conn.execute(t2.insert(), nextid=1)
r = conn.execute(t1.insert(), data="hi")
- eq_([1], r.inserted_primary_key)
+ eq_((1,), r.inserted_primary_key)
conn.execute(t2.insert(), nextid=2)
r = conn.execute(t1.insert(), data="there")
- eq_([2], r.inserted_primary_key)
+ eq_((2,), r.inserted_primary_key)
r = conn.execute(date_table.insert())
assert isinstance(r.inserted_primary_key[0], datetime.datetime)
not testing.db.dialect.implicit_returning
or not implicit_returning
):
- eq_(r.inserted_primary_key, [None])
+ eq_(r.inserted_primary_key, (None,))
else:
eq_(
- r.inserted_primary_key, [expected_result],
+ r.inserted_primary_key, (expected_result,),
)
eq_(
)
metadata.create_all(connection)
r = connection.execute(t.insert(), dict(data="data"))
- eq_(r.inserted_primary_key, [None])
+ eq_(r.inserted_primary_key, (None,))
eq_(list(connection.execute(t.select())), [("key_one", "data")])
@testing.requires.returning
)
metadata.create_all(connection)
r = connection.execute(t.insert(), dict(data="data"))
- eq_(r.inserted_primary_key, ["key_one"])
+ eq_(r.inserted_primary_key, ("key_one",))
eq_(list(connection.execute(t.select())), [("key_one", "data")])
@testing.provide_metadata
assert t._autoincrement_column is None
metadata.create_all(connection)
r = connection.execute(t.insert(), dict(data="data"))
- eq_(r.inserted_primary_key, [None])
+ eq_(r.inserted_primary_key, (None,))
if testing.against("sqlite"):
eq_(list(connection.execute(t.select())), [(1, "data")])
else:
t2 = Table("x", m2, autoload_with=connection, implicit_returning=False)
r = connection.execute(t2.insert(), dict(data="data"))
- eq_(r.inserted_primary_key, [None])
+ eq_(r.inserted_primary_key, (None,))
if testing.against("sqlite"):
eq_(list(connection.execute(t2.select())), [(1, "data")])
else:
metadata.create_all(connection)
r = connection.execute(t.insert(), dict(data="data"))
- eq_(r.inserted_primary_key, [5])
+ eq_(r.inserted_primary_key, (5,))
eq_(list(connection.execute(t.select())), [(5, "data")])
"INSERT INTO test (col1, col2) VALUES (foo(:foo_1), "
"(SELECT coalesce(max(foo.id)) AS coalesce_1 FROM "
"foo))",
- inline_flag=True,
)
def test_insert_inline_kw_default(self):
stmt = table.insert(values={}, inline=True)
self.assert_compile(
- stmt,
- "INSERT INTO sometable (foo) VALUES (foobar())",
- inline_flag=True,
+ stmt, "INSERT INTO sometable (foo) VALUES (foobar())",
)
with testing.expect_deprecated_20(
stmt = table.insert(inline=True)
self.assert_compile(
- stmt,
- "INSERT INTO sometable (foo) VALUES (foobar())",
- params={},
- inline_flag=True,
+ stmt, "INSERT INTO sometable (foo) VALUES (foobar())", params={},
)
def test_update_inline_kw_defaults(self):
"UPDATE test SET col1=foo(:foo_1), col2=(SELECT "
"coalesce(max(foo.id)) AS coalesce_1 FROM foo), "
"col3=:col3",
- inline_flag=True,
)
def test_update_dialect_kwargs(self):
)
def test_incorrect_none_type(self):
+ from sqlalchemy.sql.expression import FunctionElement
+
class MissingType(FunctionElement):
name = "mt"
type = None
TypeError,
"Object None associated with '.type' attribute is "
"not a TypeEngine class or object",
- MissingType().compile,
+ lambda: column("x", MissingType()) == 5,
)
def test_as_comparison(self):
"INSERT INTO test (col1, col2) VALUES (foo(:foo_1), "
"(SELECT coalesce(max(foo.id)) AS coalesce_1 FROM "
"foo))",
- inline_flag=False,
)
self.assert_compile(
"INSERT INTO test (col1, col2) VALUES (foo(:foo_1), "
"(SELECT coalesce(max(foo.id)) AS coalesce_1 FROM "
"foo))",
- inline_flag=True,
)
def test_generic_insert_bind_params_all_columns(self):
self.assert_compile(
table.insert().values(),
"INSERT INTO sometable (foo) VALUES (foobar())",
- inline_flag=False,
)
self.assert_compile(
table.insert(),
"INSERT INTO sometable (foo) VALUES (foobar())",
params={},
- inline_flag=False,
)
self.assert_compile(
table.insert().values().inline(),
"INSERT INTO sometable (foo) VALUES (foobar())",
- inline_flag=True,
)
self.assert_compile(
table.insert().inline(),
"INSERT INTO sometable (foo) VALUES (foobar())",
params={},
- inline_flag=True,
)
def test_insert_returning_not_in_default(self):
)
t.create(eng)
r = eng.execute(t.insert().values(y=5))
- eq_(r.inserted_primary_key, [0])
+ eq_(r.inserted_primary_key, (0,))
@testing.fails_on(
"sqlite", "sqlite autoincrement doesn't work with composite pks"
eq_(id_, 12)
r = t6.insert().values(manual_id=id_).execute()
- eq_(r.inserted_primary_key, [12, 1])
+ eq_(r.inserted_primary_key, (12, 1))
def test_implicit_id_insert_select_columns(self):
users = self.tables.users
self._test(
t.insert().values(id=1, data="data", x=5),
(1, "data", 5),
- inserted_primary_key=[1],
+ inserted_primary_key=(1,),
)
def test_uppercase_inline(self):
self._test(
t.insert().inline().values(id=1, data="data", x=5),
(1, "data", 5),
- inserted_primary_key=[1],
+ inserted_primary_key=(1,),
)
@testing.crashes(
self._test(
t.insert().inline().values(data="data", x=5),
(1, "data", 5),
- inserted_primary_key=[None],
+ inserted_primary_key=(None,),
)
def test_uppercase_implicit(self):
self._test(
t.insert().values(data="data", x=5),
(testing.db.dialect.default_sequence_base, "data", 5),
- inserted_primary_key=[testing.db.dialect.default_sequence_base],
+ inserted_primary_key=(testing.db.dialect.default_sequence_base,),
)
def test_uppercase_direct_params(self):
self._test(
t.insert().values(id=1, data="data", x=5),
(1, "data", 5),
- inserted_primary_key=[1],
+ inserted_primary_key=(1,),
)
@testing.requires.returning
self._test(
t.insert().values(id=1, data="data", x=5),
(1, "data", 5),
- inserted_primary_key=[],
+ inserted_primary_key=(),
)
@testing.requires.returning
self._test(
t.insert().values(data="data", x=5),
(testing.db.dialect.default_sequence_base, "data", 5),
- inserted_primary_key=[],
+ inserted_primary_key=(),
)
@testing.requires.emulated_lastrowid_even_with_sequences
self._test(
t.insert().inline().values(data="data", x=5),
(testing.db.dialect.default_sequence_base, "data", 5),
- inserted_primary_key=[],
+ inserted_primary_key=(),
)
dict(result.returned_defaults._mapping),
{"id": 1, "data": None, "insdef": 0},
)
+ eq_(result.inserted_primary_key, (1,))
def test_update_non_default_plus_default(self, connection):
t1 = self.tables.t1
dict(result.returned_defaults._mapping),
{"id": 1, "data": None, "insdef": 0},
)
+ eq_(result.inserted_primary_key, (1,))
def test_update_all(self, connection):
t1 = self.tables.t1
)
eq_(dict(result.returned_defaults._mapping), {"upddef": 1})
+ @testing.requires.insert_executemany_returning
+ def test_insert_executemany_no_defaults_passed(self, connection):
+ t1 = self.tables.t1
+ result = connection.execute(
+ t1.insert().return_defaults(),
+ [
+ {"data": "d1"},
+ {"data": "d2"},
+ {"data": "d3"},
+ {"data": "d4"},
+ {"data": "d5"},
+ {"data": "d6"},
+ ],
+ )
+
+ eq_(
+ [row._mapping for row in result.returned_defaults_rows],
+ [
+ {"id": 1, "insdef": 0, "upddef": None},
+ {"id": 2, "insdef": 0, "upddef": None},
+ {"id": 3, "insdef": 0, "upddef": None},
+ {"id": 4, "insdef": 0, "upddef": None},
+ {"id": 5, "insdef": 0, "upddef": None},
+ {"id": 6, "insdef": 0, "upddef": None},
+ ],
+ )
+
+ eq_(
+ result.inserted_primary_key_rows,
+ [(1,), (2,), (3,), (4,), (5,), (6,)],
+ )
+ assert_raises_message(
+ sa_exc.InvalidRequestError,
+ "This statement was an executemany call; "
+ "if return defaults is supported",
+ lambda: result.returned_defaults,
+ )
+ assert_raises_message(
+ sa_exc.InvalidRequestError,
+ "This statement was an executemany call; "
+ "if primary key returning is supported",
+ lambda: result.inserted_primary_key,
+ )
+
+ @testing.requires.insert_executemany_returning
+ def test_insert_executemany_insdefault_passed(self, connection):
+ t1 = self.tables.t1
+ result = connection.execute(
+ t1.insert().return_defaults(),
+ [
+ {"data": "d1", "insdef": 11},
+ {"data": "d2", "insdef": 12},
+ {"data": "d3", "insdef": 13},
+ {"data": "d4", "insdef": 14},
+ {"data": "d5", "insdef": 15},
+ {"data": "d6", "insdef": 16},
+ ],
+ )
+
+ eq_(
+ [row._mapping for row in result.returned_defaults_rows],
+ [
+ {"id": 1, "upddef": None},
+ {"id": 2, "upddef": None},
+ {"id": 3, "upddef": None},
+ {"id": 4, "upddef": None},
+ {"id": 5, "upddef": None},
+ {"id": 6, "upddef": None},
+ ],
+ )
+
+ eq_(
+ result.inserted_primary_key_rows,
+ [(1,), (2,), (3,), (4,), (5,), (6,)],
+ )
+ assert_raises_message(
+ sa_exc.InvalidRequestError,
+ "This statement was an executemany call; "
+ "if return defaults is supported",
+ lambda: result.returned_defaults,
+ )
+ assert_raises_message(
+ sa_exc.InvalidRequestError,
+ "This statement was an executemany call; "
+ "if primary key returning is supported",
+ lambda: result.inserted_primary_key,
+ )
+
+ @testing.requires.insert_executemany_returning
+ def test_insert_executemany_only_pk_passed(self, connection):
+ t1 = self.tables.t1
+ result = connection.execute(
+ t1.insert().return_defaults(),
+ [
+ {"id": 10, "data": "d1"},
+ {"id": 11, "data": "d2"},
+ {"id": 12, "data": "d3"},
+ {"id": 13, "data": "d4"},
+ {"id": 14, "data": "d5"},
+ {"id": 15, "data": "d6"},
+ ],
+ )
+
+ eq_(
+ [row._mapping for row in result.returned_defaults_rows],
+ [
+ {"insdef": 0, "upddef": None},
+ {"insdef": 0, "upddef": None},
+ {"insdef": 0, "upddef": None},
+ {"insdef": 0, "upddef": None},
+ {"insdef": 0, "upddef": None},
+ {"insdef": 0, "upddef": None},
+ ],
+ )
+ eq_(
+ result.inserted_primary_key_rows,
+ [(10,), (11,), (12,), (13,), (14,), (15,)],
+ )
+
class ImplicitReturningFlag(fixtures.TestBase):
__backend__ = True
self.metadata.create_all(connection)
result = connection.execute(t.insert())
- eq_(result.inserted_primary_key, [1])
+ eq_(result.inserted_primary_key, (1,))
class FutureSequenceTest(fixtures.FutureEngineMixin, SequenceTest):
"UPDATE test SET col1=foo(:foo_1), col2=(SELECT "
"coalesce(max(foo.id)) AS coalesce_1 FROM foo), "
"col3=:col3",
- inline_flag=False,
)
self.assert_compile(
"UPDATE test SET col1=foo(:foo_1), col2=(SELECT "
"coalesce(max(foo.id)) AS coalesce_1 FROM foo), "
"col3=:col3",
- inline_flag=True,
)
def test_update_1(self):
pytest-xdist
mock; python_version < '3.3'
- # psycopg2 minimum 2.7 needed only for correct profiling results
postgresql: psycopg2>=2.7
# mysqlclient minimum 1.4.0 needed only for correct profiling results