From 2f617f56f2acdce00b88f746c403cf5ed66d4d27 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Tue, 7 Apr 2020 14:15:43 -0400 Subject: [PATCH] Create initial 2.0 engine implementation Implemented the SQLAlchemy 2 :func:`.future.create_engine` function which is used for forwards compatibility with SQLAlchemy 2. This engine features always-transactional behavior with autobegin. Allow execution options per statement execution. This includes that the before_execute() and after_execute() events now accept an additional dictionary with these options, empty if not passed; a legacy event decorator is added for backwards compatibility which now also emits a deprecation warning. Add some basic tests for execution, transactions, and the new result object. Build out on a new testing fixture that swaps in the future engine completely to start with. Change-Id: I70e7338bb3f0ce22d2f702537d94bb249bd9fb0a Fixes: #4644 --- doc/build/changelog/migration_20.rst | 26 +- doc/build/changelog/unreleased_14/4644.rst | 11 + doc/build/core/future.rst | 7 + lib/sqlalchemy/cextension/utils.c | 7 + lib/sqlalchemy/dialects/mssql/base.py | 32 +- lib/sqlalchemy/dialects/mysql/base.py | 17 +- lib/sqlalchemy/dialects/oracle/base.py | 35 +- lib/sqlalchemy/dialects/postgresql/base.py | 6 +- lib/sqlalchemy/engine/base.py | 316 ++++++++---- lib/sqlalchemy/engine/create.py | 3 +- lib/sqlalchemy/engine/default.py | 127 ++++- lib/sqlalchemy/engine/events.py | 68 ++- lib/sqlalchemy/engine/result.py | 4 + lib/sqlalchemy/engine/util.py | 32 ++ lib/sqlalchemy/event/attr.py | 2 + lib/sqlalchemy/event/legacy.py | 20 + lib/sqlalchemy/future/__init__.py | 4 +- lib/sqlalchemy/future/engine.py | 434 ++++++++++++++++ lib/sqlalchemy/future/result.py | 181 ++++++- lib/sqlalchemy/log.py | 19 +- lib/sqlalchemy/orm/session.py | 19 +- lib/sqlalchemy/sql/base.py | 11 + lib/sqlalchemy/sql/compiler.py | 8 +- lib/sqlalchemy/sql/ddl.py | 8 +- lib/sqlalchemy/sql/elements.py | 8 +- lib/sqlalchemy/sql/functions.py | 8 +- lib/sqlalchemy/sql/schema.py | 8 +- lib/sqlalchemy/testing/assertsql.py | 4 +- lib/sqlalchemy/testing/config.py | 8 + lib/sqlalchemy/testing/engines.py | 7 +- lib/sqlalchemy/testing/fixtures.py | 23 + lib/sqlalchemy/testing/suite/test_ddl.py | 6 +- lib/sqlalchemy/testing/warnings.py | 7 + lib/sqlalchemy/util/deprecations.py | 2 + test/base/test_events.py | 46 +- test/base/test_tutorials.py | 4 +- test/dialect/mysql/test_reflection.py | 2 +- test/engine/test_deprecations.py | 92 ++++ test/engine/test_execute.py | 261 ++++++++-- test/engine/test_logging.py | 4 +- test/engine/test_transaction.py | 564 ++++++++++++++++++++- test/ext/test_baked.py | 4 +- test/orm/test_deprecations.py | 68 +++ test/orm/test_events.py | 18 - test/orm/test_transaction.py | 145 +++++- test/requirements.py | 4 + test/sql/test_defaults.py | 17 +- test/sql/test_resultset.py | 96 ++++ test/sql/test_sequences.py | 10 + 49 files changed, 2503 insertions(+), 310 deletions(-) create mode 100644 doc/build/changelog/unreleased_14/4644.rst create mode 100644 lib/sqlalchemy/future/engine.py diff --git a/doc/build/changelog/migration_20.rst b/doc/build/changelog/migration_20.rst index 22e1979a97..54df71f22a 100644 --- a/doc/build/changelog/migration_20.rst +++ b/doc/build/changelog/migration_20.rst @@ -131,7 +131,15 @@ to occur over the course of many major releases. Library-level (but not driver level) "Autocommit" removed from both Core and ORM ================================================================================ -.. admonition:: Certainty: almost definitely +.. admonition:: Certainty: definite + + Review the new future API for engines and connections at: + + :class:`_future.Connection` + + :class:`.future.Engine` + + :func:`_future.create_engine` "autocommit" at the ORM level is already not a widely used pattern except to the degree that the ``.begin()`` call is desirable, and a new flag @@ -459,10 +467,12 @@ Flask-style queries only. execute() method more strict, .execution_options() are available on ORM Session ================================================================================ -.. admonition:: Certainty: tentative +.. admonition:: Certainty: definite + + Review the new future API for connections at: + + :class:`_future.Connection` - Pending further prototyping, this is part of a larger plan that impacts - statement compilation, execution, and result processing. The use of execution options is expected to be more prominent as the Core and ORM are largely unified at the statement handling level. To suit this, @@ -507,10 +517,12 @@ So that an execution may look like:: ResultProxy replaced with Result which has more refined methods and behaviors ============================================================================= -.. admonition:: Certainty: tentative +.. admonition:: Certainty: definite + + Review the new future API for result sets: + + :class:`_future.Result` - This is again part of the rearchitecture of "execute()" internals and is - pending further prototyping. A major goal of SQLAlchemy 2.0 is to unify how "results" are handled between the ORM and Core. Towards this goal, version 1.4 will already standardized diff --git a/doc/build/changelog/unreleased_14/4644.rst b/doc/build/changelog/unreleased_14/4644.rst new file mode 100644 index 0000000000..8550b8cbc4 --- /dev/null +++ b/doc/build/changelog/unreleased_14/4644.rst @@ -0,0 +1,11 @@ +.. change:: + :tags: feature, engine, alchemy2 + :tickets: 4644 + + Implemented the SQLAlchemy 2 :func:`_future.create_engine` function which + is used for forwards compatibility with SQLAlchemy 2. This engine + features always-transactional behavior with autobegin. + + .. seealso:: + + :ref:`migration_20_toplevel` diff --git a/doc/build/core/future.rst b/doc/build/core/future.rst index 56a220c638..cbcb39df06 100644 --- a/doc/build/core/future.rst +++ b/doc/build/core/future.rst @@ -10,6 +10,13 @@ SQLAlchemy 2.0 Future (Core) .. module:: sqlalchemy.future +.. autoclass:: sqlalchemy.future.Connection + :members: + +.. autofunction:: sqlalchemy.future.create_engine + +.. autoclass:: sqlalchemy.future.Engine + :members: .. autofunction:: sqlalchemy.future.select diff --git a/lib/sqlalchemy/cextension/utils.c b/lib/sqlalchemy/cextension/utils.c index a437adc706..fb7fbe4e66 100644 --- a/lib/sqlalchemy/cextension/utils.c +++ b/lib/sqlalchemy/cextension/utils.c @@ -23,6 +23,9 @@ the MIT License: http://www.opensource.org/licenses/mit-license.php static PyObject * distill_params(PyObject *self, PyObject *args) { + // TODO: pass the Connection in so that there can be a standard + // method for warning on parameter format + PyObject *multiparams, *params; PyObject *enclosing_list, *double_enclosing_list; PyObject *zero_element, *zero_element_item; @@ -44,6 +47,8 @@ distill_params(PyObject *self, PyObject *args) if (multiparam_size == 0) { if (params != Py_None && PyDict_Size(params) != 0) { + // TODO: this is keyword parameters, emit parameter format + // deprecation warning enclosing_list = PyList_New(1); if (enclosing_list == NULL) { return NULL; @@ -152,6 +157,8 @@ distill_params(PyObject *self, PyObject *args) } } else { + // TODO: this is multiple positional params, emit parameter format + // deprecation warning zero_element = PyTuple_GetItem(multiparams, 0); if (PyObject_HasAttrString(zero_element, "__iter__") && !PyObject_HasAttrString(zero_element, "strip") diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py index b0021e873a..df6196baef 100644 --- a/lib/sqlalchemy/dialects/mssql/base.py +++ b/lib/sqlalchemy/dialects/mssql/base.py @@ -2544,16 +2544,20 @@ class MSDialect(default.DefaultDialect): @_db_plus_owner def has_table(self, connection, tablename, dbname, owner, schema): - columns = ischema.columns + tables = ischema.tables - whereclause = columns.c.table_name == tablename + s = sql.select([tables.c.table_name]).where( + sql.and_( + tables.c.table_type == "BASE TABLE", + tables.c.table_name == tablename, + ) + ) if owner: - whereclause = sql.and_( - whereclause, columns.c.table_schema == owner - ) - s = sql.select([columns], whereclause) + s = s.where(tables.c.table_schema == owner) + c = connection.execute(s) + return c.first() is not None @reflection.cache @@ -2569,13 +2573,15 @@ class MSDialect(default.DefaultDialect): @_db_plus_owner_listing def get_table_names(self, connection, dbname, owner, schema, **kw): tables = ischema.tables - s = sql.select( - [tables.c.table_name], - sql.and_( - tables.c.table_schema == owner, - tables.c.table_type == "BASE TABLE", - ), - order_by=[tables.c.table_name], + s = ( + sql.select([tables.c.table_name]) + .where( + sql.and_( + tables.c.table_schema == owner, + tables.c.table_type == "BASE TABLE", + ) + ) + .order_by(tables.c.table_name) ) table_names = [r[0] for r in connection.execute(s)] return table_names diff --git a/lib/sqlalchemy/dialects/mysql/base.py b/lib/sqlalchemy/dialects/mysql/base.py index c7c3bd433f..53c9163049 100644 --- a/lib/sqlalchemy/dialects/mysql/base.py +++ b/lib/sqlalchemy/dialects/mysql/base.py @@ -2379,25 +2379,25 @@ class MySQLDialect(default.DefaultDialect): raise def do_begin_twophase(self, connection, xid): - connection.execute(sql.text("XA BEGIN :xid"), xid=xid) + connection.execute(sql.text("XA BEGIN :xid"), dict(xid=xid)) def do_prepare_twophase(self, connection, xid): - connection.execute(sql.text("XA END :xid"), xid=xid) - connection.execute(sql.text("XA PREPARE :xid"), xid=xid) + connection.execute(sql.text("XA END :xid"), dict(xid=xid)) + connection.execute(sql.text("XA PREPARE :xid"), dict(xid=xid)) def do_rollback_twophase( self, connection, xid, is_prepared=True, recover=False ): if not is_prepared: - connection.execute(sql.text("XA END :xid"), xid=xid) - connection.execute(sql.text("XA ROLLBACK :xid"), xid=xid) + connection.execute(sql.text("XA END :xid"), dict(xid=xid)) + connection.execute(sql.text("XA ROLLBACK :xid"), dict(xid=xid)) def do_commit_twophase( self, connection, xid, is_prepared=True, recover=False ): if not is_prepared: self.do_prepare_twophase(connection, xid) - connection.execute(sql.text("XA COMMIT :xid"), xid=xid) + connection.execute(sql.text("XA COMMIT :xid"), dict(xid=xid)) def do_recover_twophase(self, connection): resultset = connection.exec_driver_sql("XA RECOVER") @@ -2501,8 +2501,7 @@ class MySQLDialect(default.DefaultDialect): "WHERE TABLE_NAME=:name AND " "TABLE_SCHEMA=:schema_name" ), - name=sequence_name, - schema_name=schema, + dict(name=sequence_name, schema_name=schema), ) return cursor.first() is not None @@ -2750,7 +2749,7 @@ class MySQLDialect(default.DefaultDialect): :table_data; """ ).bindparams(sql.bindparam("table_data", expanding=True)), - table_data=col_tuples, + dict(table_data=col_tuples), ) # in casing=0, table name and schema name come back in their diff --git a/lib/sqlalchemy/dialects/oracle/base.py b/lib/sqlalchemy/dialects/oracle/base.py index 50fa71d7ec..e0d33cf372 100644 --- a/lib/sqlalchemy/dialects/oracle/base.py +++ b/lib/sqlalchemy/dialects/oracle/base.py @@ -1467,8 +1467,10 @@ class OracleDialect(default.DefaultDialect): "SELECT table_name FROM all_tables " "WHERE table_name = :name AND owner = :schema_name" ), - name=self.denormalize_name(table_name), - schema_name=self.denormalize_name(schema), + dict( + name=self.denormalize_name(table_name), + schema_name=self.denormalize_name(schema), + ), ) return cursor.first() is not None @@ -1481,8 +1483,10 @@ class OracleDialect(default.DefaultDialect): "WHERE sequence_name = :name AND " "sequence_owner = :schema_name" ), - name=self.denormalize_name(sequence_name), - schema_name=self.denormalize_name(schema), + dict( + name=self.denormalize_name(sequence_name), + schema_name=self.denormalize_name(schema), + ), ) return cursor.first() is not None @@ -1525,7 +1529,7 @@ class OracleDialect(default.DefaultDialect): q += " AND ".join(clauses) result = connection.execution_options(future_result=True).execute( - sql.text(q), **params + sql.text(q), params ) if desired_owner: row = result.mappings().first() @@ -1621,7 +1625,7 @@ class OracleDialect(default.DefaultDialect): "OWNER = :owner " "AND IOT_NAME IS NULL " "AND DURATION IS NULL" ) - cursor = connection.execute(sql.text(sql_str), owner=schema) + cursor = connection.execute(sql.text(sql_str), dict(owner=schema)) return [self.normalize_name(row[0]) for row in cursor] @reflection.cache @@ -1641,14 +1645,16 @@ class OracleDialect(default.DefaultDialect): "AND DURATION IS NOT NULL" ) - cursor = connection.execute(sql.text(sql_str), owner=schema) + cursor = connection.execute(sql.text(sql_str), dict(owner=schema)) return [self.normalize_name(row[0]) for row in cursor] @reflection.cache def get_view_names(self, connection, schema=None, **kw): schema = self.denormalize_name(schema or self.default_schema_name) s = sql.text("SELECT view_name FROM all_views WHERE owner = :owner") - cursor = connection.execute(s, owner=self.denormalize_name(schema)) + cursor = connection.execute( + s, dict(owner=self.denormalize_name(schema)) + ) return [self.normalize_name(row[0]) for row in cursor] @reflection.cache @@ -1687,7 +1693,7 @@ class OracleDialect(default.DefaultDialect): text += " AND owner = :owner " text = text % {"dblink": dblink, "columns": ", ".join(columns)} - result = connection.execute(sql.text(text), **params) + result = connection.execute(sql.text(text), params) enabled = dict(DISABLED=False, ENABLED=True) @@ -1752,7 +1758,7 @@ class OracleDialect(default.DefaultDialect): text += " ORDER BY col.column_id" text = text % {"dblink": dblink, "char_length_col": char_length_col} - c = connection.execute(sql.text(text), **params) + c = connection.execute(sql.text(text), params) for row in c: colname = self.normalize_name(row[0]) @@ -1842,7 +1848,8 @@ class OracleDialect(default.DefaultDialect): """ c = connection.execute( - sql.text(COMMENT_SQL), table_name=table_name, schema_name=schema + sql.text(COMMENT_SQL), + dict(table_name=table_name, schema_name=schema), ) return {"text": c.scalar()} @@ -1890,7 +1897,7 @@ class OracleDialect(default.DefaultDialect): text = text % {"dblink": dblink} q = sql.text(text) - rp = connection.execute(q, **params) + rp = connection.execute(q, params) indexes = [] last_index_name = None pk_constraint = self.get_pk_constraint( @@ -1987,7 +1994,7 @@ class OracleDialect(default.DefaultDialect): ) text = text % {"dblink": dblink} - rp = connection.execute(sql.text(text), **params) + rp = connection.execute(sql.text(text), params) constraint_data = rp.fetchall() return constraint_data @@ -2215,7 +2222,7 @@ class OracleDialect(default.DefaultDialect): text += " AND owner = :schema" params["schema"] = schema - rp = connection.execute(sql.text(text), **params).scalar() + rp = connection.execute(sql.text(text), params).scalar() if rp: if util.py2k: rp = rp.decode(self.encoding) diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py index f9b3d9b95b..20540ac020 100644 --- a/lib/sqlalchemy/dialects/postgresql/base.py +++ b/lib/sqlalchemy/dialects/postgresql/base.py @@ -2753,7 +2753,7 @@ class PGDialect(default.DefaultDialect): s = s.columns(oid=sqltypes.Integer) if schema: s = s.bindparams(sql.bindparam("schema", type_=sqltypes.Unicode)) - c = connection.execute(s, table_name=table_name, schema=schema) + c = connection.execute(s, dict(table_name=table_name, schema=schema)) table_oid = c.scalar() if table_oid is None: raise exc.NoSuchTableError(table_name) @@ -3519,7 +3519,9 @@ class PGDialect(default.DefaultDialect): pgd.objoid = :table_oid """ - c = connection.execute(sql.text(COMMENT_SQL), table_oid=table_oid) + c = connection.execute( + sql.text(COMMENT_SQL), dict(table_oid=table_oid) + ) return {"text": c.scalar()} @reflection.cache diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index 8a340d9ce7..09e700b5cd 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -12,6 +12,7 @@ import sys from .interfaces import Connectable from .interfaces import ExceptionContext from .util import _distill_params +from .util import _distill_params_20 from .. import exc from .. import inspection from .. import log @@ -52,6 +53,8 @@ class Connection(Connectable): """ _schema_translate_map = None + _is_future = False + _sqla_logger_namespace = "sqlalchemy.engine.Connection" def __init__( self, @@ -85,7 +88,7 @@ class Connection(Connectable): if connection is not None else engine.raw_connection() ) - self.__transaction = None + self._transaction = None self.__savepoint_seq = 0 self.should_close_with_result = close_with_result @@ -168,13 +171,15 @@ class Connection(Connectable): else: return self - def _clone(self): - """Create a shallow copy of this Connection. + def _generate_for_options(self): + """define connection method chaining behavior for execution_options""" - """ - c = self.__class__.__new__(self.__class__) - c.__dict__ = self.__dict__.copy() - return c + if self._is_future: + return self + else: + c = self.__class__.__new__(self.__class__) + c.__dict__ = self.__dict__.copy() + return c def __enter__(self): return self @@ -340,7 +345,7 @@ class Connection(Connectable): """ # noqa - c = self._clone() + c = self._generate_for_options() c._execution_options = c._execution_options.union(opt) if self._has_events or self.engine._has_events: self.dispatch.set_connection_execution_options(c, opt) @@ -469,7 +474,7 @@ class Connection(Connectable): if self.__branch_from: return self.__branch_from._revalidate_connection() if self.__can_reconnect and self.__invalid: - if self.__transaction is not None: + if self._transaction is not None: raise exc.InvalidRequestError( "Can't reconnect until invalid " "transaction is rolled back" @@ -640,14 +645,21 @@ class Connection(Connectable): :class:`_engine.Engine` """ - if self.__branch_from: + if self._is_future: + assert not self.__branch_from + elif self.__branch_from: return self.__branch_from.begin() - if self.__transaction is None: - self.__transaction = RootTransaction(self) - return self.__transaction + if self._transaction is None: + self._transaction = RootTransaction(self) + return self._transaction else: - return Transaction(self, self.__transaction) + if self._is_future: + raise exc.InvalidRequestError( + "a transaction is already begun for this connection" + ) + else: + return Transaction(self, self._transaction) def begin_nested(self): """Begin a nested transaction and return a transaction handle. @@ -667,14 +679,22 @@ class Connection(Connectable): :meth:`_engine.Connection.begin_twophase` """ - if self.__branch_from: + if self._is_future: + assert not self.__branch_from + elif self.__branch_from: return self.__branch_from.begin_nested() - if self.__transaction is None: - self.__transaction = RootTransaction(self) - else: - self.__transaction = NestedTransaction(self, self.__transaction) - return self.__transaction + if self._transaction is None: + if self._is_future: + self._autobegin() + else: + self._transaction = RootTransaction(self) + return self._transaction + + trans = NestedTransaction(self, self._transaction) + if not self._is_future: + self._transaction = trans + return trans def begin_twophase(self, xid=None): """Begin a two-phase or XA transaction and return a transaction @@ -699,15 +719,15 @@ class Connection(Connectable): if self.__branch_from: return self.__branch_from.begin_twophase(xid=xid) - if self.__transaction is not None: + if self._transaction is not None: raise exc.InvalidRequestError( "Cannot start a two phase transaction when a transaction " "is already in progress." ) if xid is None: xid = self.engine.dialect.create_xid() - self.__transaction = TwoPhaseTransaction(self, xid) - return self.__transaction + self._transaction = TwoPhaseTransaction(self, xid) + return self._transaction def recover_twophase(self): return self.engine.dialect.do_recover_twophase(self) @@ -721,8 +741,8 @@ class Connection(Connectable): def in_transaction(self): """Return True if a transaction is in progress.""" return ( - self._root.__transaction is not None - and self._root.__transaction.is_active + self._root._transaction is not None + and self._root._transaction.is_active ) def _begin_impl(self, transaction): @@ -736,7 +756,7 @@ class Connection(Connectable): try: self.engine.dialect.do_begin(self.connection) - if self.connection._reset_agent is None: + if not self._is_future and self.connection._reset_agent is None: self.connection._reset_agent = transaction except BaseException as e: self._handle_dbapi_exception(e, None, None, None, None) @@ -757,7 +777,7 @@ class Connection(Connectable): finally: if ( not self.__invalid - and self.connection._reset_agent is self.__transaction + and self.connection._reset_agent is self._transaction ): self.connection._reset_agent = None @@ -776,10 +796,10 @@ class Connection(Connectable): finally: if ( not self.__invalid - and self.connection._reset_agent is self.__transaction + and self.connection._reset_agent is self._transaction ): self.connection._reset_agent = None - self.__transaction = None + self._transaction = None def _savepoint_impl(self, name=None): assert not self.__branch_from @@ -795,13 +815,13 @@ class Connection(Connectable): return name def _discard_transaction(self, trans): - if trans is self.__transaction: + if trans is self._transaction: if trans._is_root: assert trans._parent is trans - self.__transaction = None + self._transaction = None else: assert trans._parent is not trans - self.__transaction = trans._parent + self._transaction = trans._parent def _rollback_to_savepoint_impl( self, name, context, deactivate_only=False @@ -822,7 +842,7 @@ class Connection(Connectable): if self._still_open_and_connection_is_valid: self.engine.dialect.do_release_savepoint(self, name) - self.__transaction = context + self._transaction = context def _begin_twophase_impl(self, transaction): assert not self.__branch_from @@ -835,7 +855,7 @@ class Connection(Connectable): if self._still_open_and_connection_is_valid: self.engine.dialect.do_begin_twophase(self, transaction.xid) - if self.connection._reset_agent is None: + if not self._is_future and self.connection._reset_agent is None: self.connection._reset_agent = transaction def _prepare_twophase_impl(self, xid): @@ -845,7 +865,7 @@ class Connection(Connectable): self.dispatch.prepare_twophase(self, xid) if self._still_open_and_connection_is_valid: - assert isinstance(self.__transaction, TwoPhaseTransaction) + assert isinstance(self._transaction, TwoPhaseTransaction) self.engine.dialect.do_prepare_twophase(self, xid) def _rollback_twophase_impl(self, xid, is_prepared): @@ -855,17 +875,17 @@ class Connection(Connectable): self.dispatch.rollback_twophase(self, xid, is_prepared) if self._still_open_and_connection_is_valid: - assert isinstance(self.__transaction, TwoPhaseTransaction) + assert isinstance(self._transaction, TwoPhaseTransaction) try: self.engine.dialect.do_rollback_twophase( self, xid, is_prepared ) finally: - if self.connection._reset_agent is self.__transaction: + if self.connection._reset_agent is self._transaction: self.connection._reset_agent = None - self.__transaction = None + self._transaction = None else: - self.__transaction = None + self._transaction = None def _commit_twophase_impl(self, xid, is_prepared): assert not self.__branch_from @@ -874,15 +894,20 @@ class Connection(Connectable): self.dispatch.commit_twophase(self, xid, is_prepared) if self._still_open_and_connection_is_valid: - assert isinstance(self.__transaction, TwoPhaseTransaction) + assert isinstance(self._transaction, TwoPhaseTransaction) try: self.engine.dialect.do_commit_twophase(self, xid, is_prepared) finally: - if self.connection._reset_agent is self.__transaction: + if self.connection._reset_agent is self._transaction: self.connection._reset_agent = None - self.__transaction = None + self._transaction = None else: - self.__transaction = None + self._transaction = None + + def _autobegin(self): + assert self._is_future + + return self.begin() def _autorollback(self): if not self._root.in_transaction(): @@ -907,6 +932,8 @@ class Connection(Connectable): and will allow no further operations. """ + assert not self._is_future + if self.__branch_from: util.warn_deprecated_20( "The .close() method on a so-called 'branched' connection is " @@ -929,7 +956,7 @@ class Connection(Connectable): else: conn.close() - if conn._reset_agent is self.__transaction: + if conn._reset_agent is self._transaction: conn._reset_agent = None # the close() process can end up invalidating us, @@ -938,7 +965,7 @@ class Connection(Connectable): if not self.__invalid: del self.__connection self.__can_reconnect = False - self.__transaction = None + self._transaction = None def scalar(self, object_, *multiparams, **params): """Executes and returns the first column of the first row. @@ -1030,8 +1057,11 @@ class Connection(Connectable): "or the Connection.exec_driver_sql() method to invoke a " "driver-level SQL string." ) - distilled_params = _distill_params(multiparams, params) - return self._exec_driver_sql_distilled(object_, distilled_params) + distilled_parameters = _distill_params(multiparams, params) + + return self._exec_driver_sql( + object_, multiparams, params, distilled_parameters + ) try: meth = object_._execute_on_connection except AttributeError as err: @@ -1039,20 +1069,28 @@ class Connection(Connectable): exc.ObjectNotExecutableError(object_), replace_context=err ) else: - return meth(self, multiparams, params) + return meth(self, multiparams, params, util.immutabledict()) - def _execute_function(self, func, multiparams, params): + def _execute_function( + self, func, multiparams, params, execution_options=util.immutabledict() + ): """Execute a sql.FunctionElement object.""" return self._execute_clauseelement(func.select(), multiparams, params) - def _execute_default(self, default, multiparams, params): + def _execute_default( + self, + default, + multiparams, + params, + execution_options=util.immutabledict(), + ): """Execute a schema.ColumnDefault object.""" if self._has_events or self.engine._has_events: for fn in self.dispatch.before_execute: default, multiparams, params = fn( - self, default, multiparams, params + self, default, multiparams, params, execution_options ) try: @@ -1066,7 +1104,9 @@ class Connection(Connectable): conn = self._revalidate_connection() dialect = self.dialect - ctx = dialect.execution_ctx_cls._init_default(dialect, self, conn) + ctx = dialect.execution_ctx_cls._init_default( + dialect, self, conn, execution_options + ) except BaseException as e: self._handle_dbapi_exception(e, None, None, None, None) @@ -1076,17 +1116,21 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.after_execute( - self, default, multiparams, params, ret + self, default, multiparams, params, execution_options, ret ) return ret - def _execute_ddl(self, ddl, multiparams, params): + def _execute_ddl( + self, ddl, multiparams, params, execution_options=util.immutabledict() + ): """Execute a schema.DDL object.""" if self._has_events or self.engine._has_events: for fn in self.dispatch.before_execute: - ddl, multiparams, params = fn(self, ddl, multiparams, params) + ddl, multiparams, params = fn( + self, ddl, multiparams, params, execution_options + ) dialect = self.dialect @@ -1098,18 +1142,25 @@ class Connection(Connectable): dialect.execution_ctx_cls._init_ddl, compiled, None, + execution_options, compiled, ) if self._has_events or self.engine._has_events: - self.dispatch.after_execute(self, ddl, multiparams, params, ret) + self.dispatch.after_execute( + self, ddl, multiparams, params, execution_options, ret + ) return ret - def _execute_clauseelement(self, elem, multiparams, params): + def _execute_clauseelement( + self, elem, multiparams, params, execution_options=util.immutabledict() + ): """Execute a sql.ClauseElement object.""" if self._has_events or self.engine._has_events: for fn in self.dispatch.before_execute: - elem, multiparams, params = fn(self, elem, multiparams, params) + elem, multiparams, params = fn( + self, elem, multiparams, params, execution_options + ) distilled_params = _distill_params(multiparams, params) if distilled_params: @@ -1121,22 +1172,31 @@ class Connection(Connectable): dialect = self.dialect - if "compiled_cache" in self._execution_options: - elem_cache_key, extracted_params = elem._generate_cache_key() + exec_opts = self._execution_options + if execution_options: + exec_opts = exec_opts.union(execution_options) + + if "compiled_cache" in exec_opts: + elem_cache_key = elem._generate_cache_key() + else: + elem_cache_key = None + + if elem_cache_key: + cache_key, extracted_params = elem_cache_key key = ( dialect, - elem_cache_key, + cache_key, tuple(sorted(keys)), bool(self._schema_translate_map), len(distilled_params) > 1, ) - cache = self._execution_options["compiled_cache"] + cache = exec_opts["compiled_cache"] compiled_sql = cache.get(key) if compiled_sql is None: compiled_sql = elem.compile( dialect=dialect, - cache_key=(elem_cache_key, extracted_params), + cache_key=elem_cache_key, column_keys=keys, inline=len(distilled_params) > 1, schema_translate_map=self._schema_translate_map, @@ -1160,22 +1220,31 @@ class Connection(Connectable): dialect.execution_ctx_cls._init_compiled, compiled_sql, distilled_params, + execution_options, compiled_sql, distilled_params, elem, extracted_params, ) if self._has_events or self.engine._has_events: - self.dispatch.after_execute(self, elem, multiparams, params, ret) + self.dispatch.after_execute( + self, elem, multiparams, params, execution_options, ret + ) return ret - def _execute_compiled(self, compiled, multiparams, params): + def _execute_compiled( + self, + compiled, + multiparams, + params, + execution_options=util.immutabledict(), + ): """Execute a sql.Compiled object.""" if self._has_events or self.engine._has_events: for fn in self.dispatch.before_execute: compiled, multiparams, params = fn( - self, compiled, multiparams, params + self, compiled, multiparams, params, execution_options ) dialect = self.dialect @@ -1185,6 +1254,7 @@ class Connection(Connectable): dialect.execution_ctx_cls._init_compiled, compiled, parameters, + execution_options, compiled, parameters, None, @@ -1192,16 +1262,23 @@ class Connection(Connectable): ) if self._has_events or self.engine._has_events: self.dispatch.after_execute( - self, compiled, multiparams, params, ret + self, compiled, multiparams, params, execution_options, ret ) return ret - def _exec_driver_sql_distilled(self, statement, parameters): + def _exec_driver_sql( + self, + statement, + multiparams, + params, + distilled_parameters, + execution_options=util.immutabledict(), + ): if self._has_events or self.engine._has_events: for fn in self.dispatch.before_execute: statement, multiparams, params = fn( - self, statement, parameters, {} + self, statement, multiparams, params, execution_options ) dialect = self.dialect @@ -1209,15 +1286,38 @@ class Connection(Connectable): dialect, dialect.execution_ctx_cls._init_statement, statement, - parameters, + distilled_parameters, + execution_options, statement, - parameters, + distilled_parameters, ) if self._has_events or self.engine._has_events: - self.dispatch.after_execute(self, statement, parameters, {}) + self.dispatch.after_execute( + self, statement, multiparams, params, execution_options, ret + ) return ret - def exec_driver_sql(self, statement, parameters=None): + def _execute_20( + self, + statement, + parameters=None, + execution_options=util.immutabledict(), + ): + multiparams, params, distilled_parameters = _distill_params_20( + parameters + ) + try: + meth = statement._execute_on_connection + except AttributeError as err: + util.raise_( + exc.ObjectNotExecutableError(statement), replace_context=err + ) + else: + return meth(self, multiparams, params, execution_options) + + def exec_driver_sql( + self, statement, parameters=None, execution_options=None + ): r"""Executes a SQL statement construct and returns a :class:`_engine.ResultProxy`. @@ -1258,22 +1358,33 @@ class Connection(Connectable): """ - if isinstance(parameters, list) and parameters: - if not isinstance(parameters[0], (dict, tuple)): - raise exc.ArgumentError( - "List argument must consist only of tuples or dictionaries" - ) - elif isinstance(parameters, (dict, tuple)): - parameters = [parameters] + multiparams, params, distilled_parameters = _distill_params_20( + parameters + ) - return self._exec_driver_sql_distilled(statement, parameters or ()) + return self._exec_driver_sql( + statement, + multiparams, + params, + distilled_parameters, + execution_options, + ) def _execute_context( - self, dialect, constructor, statement, parameters, *args + self, + dialect, + constructor, + statement, + parameters, + execution_options, + *args ): """Create an :class:`.ExecutionContext` and execute, returning a :class:`_engine.ResultProxy`.""" + if execution_options: + dialect.set_exec_execution_options(self, execution_options) + try: try: conn = self.__connection @@ -1284,23 +1395,29 @@ class Connection(Connectable): if conn is None: conn = self._revalidate_connection() - context = constructor(dialect, self, conn, *args) + context = constructor( + dialect, self, conn, execution_options, *args + ) except BaseException as e: self._handle_dbapi_exception( e, util.text_type(statement), parameters, None, None ) - if self._root.__transaction and not self._root.__transaction.is_active: + if self._root._transaction and not self._root._transaction.is_active: raise exc.InvalidRequestError( "This connection is on an inactive %stransaction. " "Please rollback() fully before proceeding." % ( "savepoint " - if isinstance(self.__transaction, NestedTransaction) + if isinstance(self._transaction, NestedTransaction) else "" ), code="8s2a", ) + + if self._is_future and self._root._transaction is None: + self._autobegin() + if context.compiled: context.pre_exec() @@ -1386,12 +1503,17 @@ class Connection(Connectable): result = context._setup_result_proxy() - if context.should_autocommit and self._root.__transaction is None: + if ( + not self._is_future + and context.should_autocommit + and self._root._transaction is None + ): self._root._commit_impl(autocommit=True) # for "connectionless" execution, we have to close this # Connection after the statement is complete. if self.should_close_with_result: + assert not self._is_future assert not context._is_future_result # ResultProxy already exhausted rows / has no rows. @@ -1600,6 +1722,7 @@ class Connection(Connectable): self.engine.pool._invalidate(dbapi_conn_wrapper, e) self.invalidate(e) if self.should_close_with_result: + assert not self._is_future self.close() @classmethod @@ -1991,6 +2114,8 @@ class Engine(Connectable, log.Identified): _execution_options = util.immutabledict() _has_events = False _connection_cls = Connection + _sqla_logger_namespace = "sqlalchemy.engine.Engine" + _is_future = False _schema_translate_map = None @@ -2114,7 +2239,7 @@ class Engine(Connectable, log.Identified): """ - return OptionEngine(self, opt) + return self._option_cls(self, opt) def get_execution_options(self): """ Get the non-SQL options which will take effect during execution. @@ -2200,7 +2325,8 @@ class Engine(Connectable, log.Identified): if type_ is not None: self.transaction.rollback() else: - self.transaction.commit() + if self.transaction.is_active: + self.transaction.commit() if not self.close_with_result: self.conn.close() @@ -2239,7 +2365,10 @@ class Engine(Connectable, log.Identified): for a particular :class:`_engine.Connection`. """ - conn = self.connect(close_with_result=close_with_result) + if self._connection_cls._is_future: + conn = self.connect() + else: + conn = self.connect(close_with_result=close_with_result) try: trans = conn.begin() except: @@ -2477,7 +2606,7 @@ class Engine(Connectable, log.Identified): return self._wrap_pool_connect(self.pool.connect, _connection) -class OptionEngine(Engine): +class OptionEngineMixin(object): _sa_propagate_class_events = False def __init__(self, proxied, execution_options): @@ -2523,3 +2652,10 @@ class OptionEngine(Engine): self.__dict__["_has_events"] = value _has_events = property(_get_has_events, _set_has_events) + + +class OptionEngine(OptionEngineMixin, Engine): + pass + + +Engine._option_cls = OptionEngine diff --git a/lib/sqlalchemy/engine/create.py b/lib/sqlalchemy/engine/create.py index e5b8a87d34..a53e94f1e3 100644 --- a/lib/sqlalchemy/engine/create.py +++ b/lib/sqlalchemy/engine/create.py @@ -532,7 +532,8 @@ def create_engine(url, **kwargs): pool._dialect = dialect # create engine. - engineclass = base.Engine + engineclass = kwargs.pop("_future_engine_class", base.Engine) + engine_args = {} for k in util.get_cls_kwargs(engineclass): if k in kwargs: diff --git a/lib/sqlalchemy/engine/default.py b/lib/sqlalchemy/engine/default.py index 5ec13d1039..865a1160b3 100644 --- a/lib/sqlalchemy/engine/default.py +++ b/lib/sqlalchemy/engine/default.py @@ -500,14 +500,35 @@ class DefaultDialect(interfaces.Dialect): if "schema_translate_map" in opts: connection._schema_translate_map = opts["schema_translate_map"] + def set_exec_execution_options(self, connection, opts): + if "isolation_level" in opts: + raise exc.InvalidRequestError( + "The 'isolation_level' execution " + "option is not supported at the per-statement level" + ) + self._set_connection_isolation(connection, opts["isolation_level"]) + + if "schema_translate_map" in opts: + raise exc.InvalidRequestError( + "The 'schema_translate_map' execution " + "option is not supported at the per-statement level" + ) + def _set_connection_isolation(self, connection, level): if connection.in_transaction(): - util.warn( - "Connection is already established with a Transaction; " - "setting isolation_level may implicitly rollback or commit " - "the existing transaction, or have no effect until " - "next transaction" - ) + if connection._is_future: + raise exc.InvalidRequestError( + "This connection has already begun a transaction; " + "isolation level may not be altered until transaction end" + ) + else: + util.warn( + "Connection is already established with a Transaction; " + "setting isolation_level may implicitly rollback or " + "commit " + "the existing transaction, or have no effect until " + "next transaction" + ) self.set_isolation_level(connection.connection, level) connection.connection._connection_record.finalize_callback.append( self.reset_isolation_level @@ -688,6 +709,7 @@ class DefaultExecutionContext(interfaces.ExecutionContext): statement = None result_column_struct = None returned_defaults = None + execution_options = util.immutabledict() _is_implicit_returning = False _is_explicit_returning = False _is_future_result = False @@ -701,7 +723,14 @@ class DefaultExecutionContext(interfaces.ExecutionContext): _expanded_parameters = util.immutabledict() @classmethod - def _init_ddl(cls, dialect, connection, dbapi_connection, compiled_ddl): + def _init_ddl( + cls, + dialect, + connection, + dbapi_connection, + execution_options, + compiled_ddl, + ): """Initialize execution context for a DDLElement construct.""" self = cls.__new__(cls) @@ -714,8 +743,18 @@ class DefaultExecutionContext(interfaces.ExecutionContext): self.execution_options = compiled.execution_options if connection._execution_options: - self.execution_options = dict(self.execution_options) - self.execution_options.update(connection._execution_options) + self.execution_options = self.execution_options.union( + connection._execution_options + ) + if execution_options: + self.execution_options = self.execution_options.union( + execution_options + ) + + self._is_future_result = ( + connection._is_future + or self.execution_options.get("future_result", False) + ) self.unicode_statement = util.text_type(compiled) if compiled.schema_translate_map: @@ -745,6 +784,7 @@ class DefaultExecutionContext(interfaces.ExecutionContext): dialect, connection, dbapi_connection, + execution_options, compiled, parameters, invoked_statement, @@ -764,11 +804,19 @@ class DefaultExecutionContext(interfaces.ExecutionContext): # we get here assert compiled.can_execute - self._is_future_result = connection._execution_options.get( - "future_result", False - ) - self.execution_options = compiled.execution_options.union( - connection._execution_options + self.execution_options = compiled.execution_options + if connection._execution_options: + self.execution_options = self.execution_options.union( + connection._execution_options + ) + if execution_options: + self.execution_options = self.execution_options.union( + execution_options + ) + + self._is_future_result = ( + connection._is_future + or self.execution_options.get("future_result", False) ) self.result_column_struct = ( @@ -905,7 +953,13 @@ class DefaultExecutionContext(interfaces.ExecutionContext): @classmethod def _init_statement( - cls, dialect, connection, dbapi_connection, statement, parameters + cls, + dialect, + connection, + dbapi_connection, + execution_options, + statement, + parameters, ): """Initialize execution context for a string SQL statement.""" @@ -915,12 +969,19 @@ class DefaultExecutionContext(interfaces.ExecutionContext): self.dialect = connection.dialect self.is_text = True - self._is_future_result = connection._execution_options.get( - "future_result", False - ) + if connection._execution_options: + self.execution_options = self.execution_options.union( + connection._execution_options + ) + if execution_options: + self.execution_options = self.execution_options.union( + execution_options + ) - # plain text statement - self.execution_options = connection._execution_options + self._is_future_result = ( + connection._is_future + or self.execution_options.get("future_result", False) + ) if not parameters: if self.dialect.positional: @@ -956,14 +1017,30 @@ class DefaultExecutionContext(interfaces.ExecutionContext): return self @classmethod - def _init_default(cls, dialect, connection, dbapi_connection): + def _init_default( + cls, dialect, connection, dbapi_connection, execution_options + ): """Initialize execution context for a ColumnDefault construct.""" self = cls.__new__(cls) self.root_connection = connection self._dbapi_connection = dbapi_connection self.dialect = connection.dialect - self.execution_options = connection._execution_options + + if connection._execution_options: + self.execution_options = self.execution_options.union( + connection._execution_options + ) + if execution_options: + self.execution_options = self.execution_options.union( + execution_options + ) + + self._is_future_result = ( + connection._is_future + or self.execution_options.get("future_result", False) + ) + self.cursor = self.create_cursor() return self @@ -1043,7 +1120,11 @@ class DefaultExecutionContext(interfaces.ExecutionContext): @property def connection(self): - return self.root_connection._branch() + conn = self.root_connection + if conn._is_future: + return conn + else: + return conn._branch() def should_autocommit_text(self, statement): return AUTOCOMMIT_REGEXP.match(statement) diff --git a/lib/sqlalchemy/engine/events.py b/lib/sqlalchemy/engine/events.py index 65b73002ce..2ab707b8ab 100644 --- a/lib/sqlalchemy/engine/events.py +++ b/lib/sqlalchemy/engine/events.py @@ -107,9 +107,15 @@ class ConnectionEvents(event.Events): orig_fn = fn def wrap_before_execute( - conn, clauseelement, multiparams, params + conn, clauseelement, multiparams, params, execution_options ): - orig_fn(conn, clauseelement, multiparams, params) + orig_fn( + conn, + clauseelement, + multiparams, + params, + execution_options, + ) return clauseelement, multiparams, params fn = wrap_before_execute @@ -143,7 +149,19 @@ class ConnectionEvents(event.Events): ) event_key.with_wrapper(fn).base_listen() - def before_execute(self, conn, clauseelement, multiparams, params): + @event._legacy_signature( + "1.4", + ["conn", "clauseelement", "multiparams", "params"], + lambda conn, clauseelement, multiparams, params, execution_options: ( + conn, + clauseelement, + multiparams, + params, + ), + ) + def before_execute( + self, conn, clauseelement, multiparams, params, execution_options + ): """Intercept high level execute() events, receiving uncompiled SQL constructs and other objects prior to rendering into SQL. @@ -166,6 +184,17 @@ class ConnectionEvents(event.Events): :meth:`_engine.Connection.execute`. :param multiparams: Multiple parameter sets, a list of dictionaries. :param params: Single parameter set, a single dictionary. + :param execution_options: dictionary of per-execution execution + options passed along with the statement, if any. This only applies to + the the SQLAlchemy 2.0 version of :meth:`_engine.Connection.execute` + . To + view all execution options associated with the connection, access the + :meth:`_engine.Connection.get_execution_options` + method to view the fixed + execution options dictionary, then consider elements within this local + dictionary to be unioned into that dictionary. + + .. versionadded: 1.4 .. seealso:: @@ -173,7 +202,26 @@ class ConnectionEvents(event.Events): """ - def after_execute(self, conn, clauseelement, multiparams, params, result): + @event._legacy_signature( + "1.4", + ["conn", "clauseelement", "multiparams", "params", "result"], + lambda conn, clauseelement, multiparams, params, execution_options, result: ( # noqa + conn, + clauseelement, + multiparams, + params, + result, + ), + ) + def after_execute( + self, + conn, + clauseelement, + multiparams, + params, + execution_options, + result, + ): """Intercept high level execute() events after execute. @@ -183,6 +231,18 @@ class ConnectionEvents(event.Events): :meth:`_engine.Connection.execute`. :param multiparams: Multiple parameter sets, a list of dictionaries. :param params: Single parameter set, a single dictionary. + :param execution_options: dictionary of per-execution execution + options passed along with the statement, if any. This only applies to + the the SQLAlchemy 2.0 version of :meth:`_engine.Connection.execute` + . To + view all execution options associated with the connection, access the + :meth:`_engine.Connection.get_execution_options` + method to view the fixed + execution options dictionary, then consider elements within this local + dictionary to be unioned into that dictionary. + + .. versionadded: 1.4 + :param result: :class:`_engine.ResultProxy` generated by the execution . diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py index e1e5e90160..bc3cdbb9ad 100644 --- a/lib/sqlalchemy/engine/result.py +++ b/lib/sqlalchemy/engine/result.py @@ -78,6 +78,7 @@ class SimpleResultMetaData(ResultMetaData): for index in range(len_keys) } ) + # TODO: negative indexes? test coverage? if extra: for key, ex in zip(keys, extra): rec = self._keymap[key] @@ -639,6 +640,9 @@ class CursorResultMetaData(ResultMetaData): """ indexes = [] for key in keys: + if isinstance(key, int): + indexes.append(key) + continue try: rec = self._keymap[key] except KeyError as ke: diff --git a/lib/sqlalchemy/engine/util.py b/lib/sqlalchemy/engine/util.py index d25927be2c..8fb04646f4 100644 --- a/lib/sqlalchemy/engine/util.py +++ b/lib/sqlalchemy/engine/util.py @@ -5,7 +5,9 @@ # This module is part of SQLAlchemy and is released under # the MIT License: http://www.opensource.org/licenses/mit-license.php +from .. import exc from .. import util +from ..util import collections_abc def connection_memoize(key): @@ -28,6 +30,8 @@ def connection_memoize(key): def py_fallback(): + # TODO: pass the Connection in so that there can be a standard + # method for warning on parameter format def _distill_params(multiparams, params): # noqa r"""Given arguments from the calling form \*multiparams, \**params, return a list of bind parameter structures, usually a list of @@ -40,6 +44,7 @@ def py_fallback(): if not multiparams: if params: + # TODO: parameter format deprecation warning return [params] else: return [] @@ -64,6 +69,7 @@ def py_fallback(): # execute(stmt, "value") return [[zero]] else: + # TODO: parameter format deprecation warning if hasattr(multiparams[0], "__iter__") and not hasattr( multiparams[0], "strip" ): @@ -74,6 +80,32 @@ def py_fallback(): return locals() +_no_tuple = () +_no_kw = util.immutabledict() + + +def _distill_params_20(params): + if params is None: + return _no_tuple, _no_kw, [] + elif isinstance(params, collections_abc.MutableSequence): # list + if params and not isinstance( + params[0], (collections_abc.Mapping, tuple) + ): + raise exc.ArgumentError( + "List argument must consist only of tuples or dictionaries" + ) + + # the tuple is needed atm by the C version of _distill_params... + return tuple(params), _no_kw, params + elif isinstance( + params, + (collections_abc.Sequence, collections_abc.Mapping), # tuple or dict + ): + return _no_tuple, params, [params] + else: + raise exc.ArgumentError("mapping or sequence expected for parameters") + + try: from sqlalchemy.cutils import _distill_params # noqa except ImportError: diff --git a/lib/sqlalchemy/event/attr.py b/lib/sqlalchemy/event/attr.py index cefb640a18..87c6e980f8 100644 --- a/lib/sqlalchemy/event/attr.py +++ b/lib/sqlalchemy/event/attr.py @@ -71,6 +71,7 @@ class _ClsLevelDispatch(RefCollection): """Class-level events on :class:`._Dispatch` classes.""" __slots__ = ( + "clsname", "name", "arg_names", "has_kw", @@ -81,6 +82,7 @@ class _ClsLevelDispatch(RefCollection): def __init__(self, parent_dispatch_cls, fn): self.name = fn.__name__ + self.clsname = parent_dispatch_cls.__name__ argspec = util.inspect_getfullargspec(fn) self.arg_names = argspec.args[1:] self.has_kw = bool(argspec.varkw) diff --git a/lib/sqlalchemy/event/legacy.py b/lib/sqlalchemy/event/legacy.py index 9205cc53a0..f63c7d101c 100644 --- a/lib/sqlalchemy/event/legacy.py +++ b/lib/sqlalchemy/event/legacy.py @@ -35,15 +35,35 @@ def _wrap_fn_for_legacy(dispatch_collection, fn, argspec): argspec.varkw ): + formatted_def = "def %s(%s%s)" % ( + dispatch_collection.name, + ", ".join(dispatch_collection.arg_names), + ", **kw" if has_kw else "", + ) + warning_txt = ( + 'The argument signature for the "%s.%s" event listener ' + "has changed as of version %s, and conversion for " + "the old argument signature will be removed in a " + 'future release. The new signature is "%s"' + % ( + dispatch_collection.clsname, + dispatch_collection.name, + since, + formatted_def, + ) + ) + if conv: assert not has_kw def wrap_leg(*args): + util.warn_deprecated(warning_txt, version=since) return fn(*conv(*args)) else: def wrap_leg(*args, **kw): + util.warn_deprecated(warning_txt, version=since) argdict = dict(zip(dispatch_collection.arg_names, args)) args = [argdict[name] for name in argnames] if has_kw: diff --git a/lib/sqlalchemy/future/__init__.py b/lib/sqlalchemy/future/__init__.py index d38d27d884..02cbd697b9 100644 --- a/lib/sqlalchemy/future/__init__.py +++ b/lib/sqlalchemy/future/__init__.py @@ -8,7 +8,9 @@ """Future 2.0 API features. """ - +from .engine import Connection # noqa +from .engine import create_engine # noqa +from .engine import Engine # noqa from .result import Result # noqa from ..sql.selectable import Select from ..util.langhelpers import public_factory diff --git a/lib/sqlalchemy/future/engine.py b/lib/sqlalchemy/future/engine.py new file mode 100644 index 0000000000..286c83cc40 --- /dev/null +++ b/lib/sqlalchemy/future/engine.py @@ -0,0 +1,434 @@ +from .. import util +from ..engine import Connection as _LegacyConnection +from ..engine import create_engine as _create_engine +from ..engine import Engine as _LegacyEngine +from ..engine.base import OptionEngineMixin + +NO_OPTIONS = util.immutabledict() + + +def create_engine(*arg, **kw): + """Create a new :class:`_future.Engine` instance. + + Arguments passed to :func:`_future.create_engine` are mostly identical + to those passed to the 1.x :func:`_sa.create_engine` function. + The difference is that the object returned is the :class:`._future.Engine` + which has the 2.0 version of the API. + + """ + + kw["_future_engine_class"] = Engine + return _create_engine(*arg, **kw) + + +class Connection(_LegacyConnection): + """Provides high-level functionality for a wrapped DB-API connection. + + **This is the SQLAlchemy 2.0 version** of the :class:`_engine.Connection` + class. The API and behavior of this object is largely the same, with the + following differences in behavior: + + * The result object returned for results is the :class:`_future.Result` + object. This object has a slightly different API and behavior than the + prior :class:`_engine.ResultProxy` object. + + * The object has :meth:`_future.Connection.commit` and + :meth:`_future.Connection.rollback` methods which commit or roll back + the current transaction in progress, if any. + + * The object features "autobegin" behavior, such that any call to + :meth:`_future.Connection.execute` will + unconditionally start a + transaction which can be controlled using the above mentioned + :meth:`_future.Connection.commit` and + :meth:`_future.Connection.rollback` methods. + + * The object does not have any "autocommit" functionality. Any SQL + statement or DDL statement will not be followed by any COMMIT until + the transaction is explicitly committed, either via the + :meth:`_future.Connection.commit` method, or if the connection is + being used in a context manager that commits such as the one + returned by :meth:`_future.Engine.begin`. + + * The SAVEPOINT method :meth:`_future.Connection.begin_nested` returns + a :class:`_engine.NestedTransaction` as was always the case, and the + savepoint can be controlled by invoking + :meth:`_engine.NestedTransaction.commit` or + :meth:`_engine.NestedTransaction.rollback` as was the case before. + However, this savepoint "transaction" is not associated with the + transaction that is controlled by the connection itself; the overall + transaction can be committed or rolled back directly which will not emit + any special instructions for the SAVEPOINT (this will typically have the + effect that one desires). + + * There are no "nested" connections or transactions. + + + + """ + + _is_future = True + + def _branch(self): + raise NotImplementedError( + "sqlalchemy.future.Connection does not support " + "'branching' of new connections." + ) + + def begin(self): + """Begin a transaction prior to autobegin occurring. + + The :meth:`_future.Connection.begin` method in SQLAlchemy 2.0 begins a + transaction that normally will be begun in any case when the connection + is first used to execute a statement. The reason this method might be + used would be to invoke the :meth:`_events.ConnectionEvents.begin` + event at a specific time, or to organize code within the scope of a + connection checkout in terms of context managed blocks, such as:: + + with engine.connect() as conn: + with conn.begin(): + conn.execute(...) + conn.execute(...) + + with conn.begin(): + conn.execute(...) + conn.execute(...) + + The above code is not fundamentally any different in its behavior than + the following code which does not use + :meth:`_future.Connection.begin`:: + + with engine.connect() as conn: + conn.execute(...) + conn.execute(...) + conn.commit() + + conn.execute(...) + conn.execute(...) + conn.commit() + + In both examples, if an exception is raised, the transaction will not + be committed. An explicit rollback of the transaction will occur, + including that the :meth:`_events.ConnectionEvents.rollback` event will + be emitted, as connection's context manager will call + :meth:`_future.Connection.close`, which will call + :meth:`_future.Connection.rollback` for any transaction in place + (excluding that of a SAVEPOINT). + + From a database point of view, the :meth:`_future.Connection.begin` + method does not emit any SQL or change the state of the underlying + DBAPI connection in any way; the Python DBAPI does not have any + concept of explicit transaction begin. + + :return: a :class:`_engine.Transaction` object. This object supports + context-manager operation which will commit a transaction or + emit a rollback in case of error. + + . If this event is not being used, then there is + no real effect from invoking :meth:`_future.Connection.begin` ahead + of time as the Python DBAPI does not implement any explicit BEGIN + + + The returned object is an instance of :class:`_engine.Transaction`. + This object represents the "scope" of the transaction, + which completes when either the :meth:`_engine.Transaction.rollback` + or :meth:`_engine.Transaction.commit` method is called. + + Nested calls to :meth:`_future.Connection.begin` on the same + :class:`_future.Connection` will return new + :class:`_engine.Transaction` objects that represent an emulated + transaction within the scope of the enclosing transaction, that is:: + + trans = conn.begin() # outermost transaction + trans2 = conn.begin() # "nested" + trans2.commit() # does nothing + trans.commit() # actually commits + + Calls to :meth:`_engine.Transaction.commit` only have an effect when + invoked via the outermost :class:`_engine.Transaction` object, though + the :meth:`_engine.Transaction.rollback` method of any of the + :class:`_engine.Transaction` objects will roll back the transaction. + + .. seealso:: + + :meth:`_future.Connection.begin_nested` - use a SAVEPOINT + + :meth:`_future.Connection.begin_twophase` - + use a two phase /XID transaction + + :meth:`_future.Engine.begin` - context manager available from + :class:`_future.Engine` + + """ + return super(Connection, self).begin() + + def begin_nested(self): + """Begin a nested transaction and return a transaction handle. + + The returned object is an instance of + :class:`_engine.NestedTransaction`. + + Nested transactions require SAVEPOINT support in the + underlying database. Any transaction in the hierarchy may + ``commit`` and ``rollback``, however the outermost transaction + still controls the overall ``commit`` or ``rollback`` of the + transaction of a whole. + + In SQLAlchemy 2.0, the :class:`_engine.NestedTransaction` remains + independent of the :class:`_future.Connection` object itself. Calling + the :meth:`_future.Connection.commit` or + :meth:`_future.Connection.rollback` will always affect the actual + containing database transaction itself, and not the SAVEPOINT itself. + When a database transaction is committed, any SAVEPOINTs that have been + established are cleared and the data changes within their scope is also + committed. + + .. seealso:: + + :meth:`_future.Connection.begin` + + + """ + return super(Connection, self).begin_nested() + + def commit(self): + """Commit the transaction that is currently in progress. + + This method commits the current transaction if one has been started. + If no transaction was started, the method has no effect, assuming + the connection is in a non-invalidated state. + + A transaction is begun on a :class:`_future.Connection` automatically + whenever a statement is first executed, or when the + :meth:`_future.Connection.begin` method is called. + + .. note:: The :meth:`_future.Connection.commit` method only acts upon + the primary database transaction that is linked to the + :class:`_future.Connection` object. It does not operate upon a + SAVEPOINT that would have been invoked from the + :meth:`_future.Connection.begin_nested` method; for control of a + SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the + :class:`_engine.NestedTransaction` that is returned by the + :meth:`_future.Connection.begin_nested` method itself. + + + """ + if self._transaction: + self._transaction.commit() + + def rollback(self): + """Roll back the transaction that is currently in progress. + + This method rolls back the current transaction if one has been started. + If no transaction was started, the method has no effect. If a + transaction was started and the connection is in an invalidated state, + the transaction is cleared using this method. + + A transaction is begun on a :class:`_future.Connection` automatically + whenever a statement is first executed, or when the + :meth:`_future.Connection.begin` method is called. + + .. note:: The :meth:`_future.Connection.rollback` method only acts + upon the primary database transaction that is linked to the + :class:`_future.Connection` object. It does not operate upon a + SAVEPOINT that would have been invoked from the + :meth:`_future.Connection.begin_nested` method; for control of a + SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the + :class:`_engine.NestedTransaction` that is returned by the + :meth:`_future.Connection.begin_nested` method itself. + + + """ + if self._transaction: + self._transaction.rollback() + + def close(self): + """Close this :class:`_future.Connection`. + + This has the effect of also calling :meth:`_future.Connection.rollback` + if any transaction is in place. + + """ + + try: + conn = self.__connection + except AttributeError: + pass + else: + # TODO: can we do away with "_reset_agent" stuff now? + if self._transaction: + self._transaction.rollback() + + conn.close() + + # the close() process can end up invalidating us, + # as the pool will call our transaction as the "reset_agent" + # for rollback(), which can then cause an invalidation + if not self.__invalid: + del self.__connection + self.__can_reconnect = False + self._transaction = None + + def execute(self, statement, parameters=None, execution_options=None): + r"""Executes a SQL statement construct and returns a + :class:`_future.Result`. + + :param object: The statement to be executed. This is always + an object that is in both the :class:`_expression.ClauseElement` and + :class:`_expression.Executable` hierarchies, including: + + * :class:`_expression.Select` + * :class:`_expression.Insert`, :class:`_expression.Update`, + :class:`_expression.Delete` + * :class:`_expression.TextClause` and + :class:`_expression.TextualSelect` + * :class:`_schema.DDL` and objects which inherit from + :class:`_schema.DDLElement` + + :param parameters: parameters which will be bound into the statment. + This may be either a dictionary of parameter names to values, + or a mutable sequence (e.g. a list) of dictionaries. When a + list of dictionaries is passed, the underlying statement execution + will make use of the DBAPI ``cursor.executemany()`` method. + When a single dictionary is passed, the DBAPI ``cursor.execute()`` + method will be used. + + :param execution_options: optional dictionary of execution options, + which will be associated with the statement execution. This + dictionary can provide a subset of the options that are accepted + by :meth:`_future.Connection.execution_options`. + + :return: a :class:`_future.Result` object. + + """ + return self._execute_20( + statement, parameters, execution_options or NO_OPTIONS + ) + + def scalar(self, statement, parameters=None, execution_options=None): + r"""Executes a SQL statement construct and returns a scalar object. + + This method is shorthand for invoking the + :meth:`_future.Result.scalar` method after invoking the + :meth:`_future.Connection.execute` method. Parameters are equivalent. + + :return: a scalar Python value representing the first column of the + first row returned. + + """ + return self.execute(statement, parameters, execution_options).scalar() + + +class Engine(_LegacyEngine): + """Connects a :class:`_pool.Pool` and + :class:`_engine.Dialect` together to provide a + source of database connectivity and behavior. + + **This is the SQLAlchemy 2.0 version** of the :class:`~.engine.Engine`. + + An :class:`.future.Engine` object is instantiated publicly using the + :func:`~sqlalchemy.future.create_engine` function. + + .. seealso:: + + :doc:`/core/engines` + + :ref:`connections_toplevel` + + """ + + _connection_cls = Connection + _is_future = True + + def _not_implemented(self, *arg, **kw): + raise NotImplementedError( + "This method is not implemented for SQLAlchemy 2.0." + ) + + transaction = ( + run_callable + ) = ( + execute + ) = ( + scalar + ) = ( + _execute_clauseelement + ) = _execute_compiled = table_names = has_table = _not_implemented + + def _run_ddl_visitor(self, visitorcallable, element, **kwargs): + # TODO: this is for create_all support etc. not clear if we + # want to provide this in 2.0, that is, a way to execute SQL where + # they aren't calling "engine.begin()" explicitly, however, DDL + # may be a special case for which we want to continue doing it this + # way. A big win here is that the full DDL sequence is inside of a + # single transaction rather than COMMIT for each statment. + with self.begin() as conn: + conn._run_ddl_visitor(visitorcallable, element, **kwargs) + + @classmethod + def _future_facade(self, legacy_engine): + return Engine( + legacy_engine.pool, + legacy_engine.dialect, + legacy_engine.url, + logging_name=legacy_engine.logging_name, + echo=legacy_engine.echo, + hide_parameters=legacy_engine.hide_parameters, + execution_options=legacy_engine._execution_options, + ) + + def begin(self): + """Return a :class:`_future.Connection` object with a transaction + begun. + + Use of this method is similar to that of + :meth:`_future.Engine.connect`, typically as a context manager, which + will automatically maintain the state of the transaction when the block + ends, either by calling :meth:`_future.Connection.commit` when the + block succeeds normally, or :meth:`_future.Connection.rollback` when an + exception is raised, before propagating the exception outwards:: + + with engine.begin() as connection: + connection.execute(text("insert into table values ('foo')")) + + + .. seealso:: + + :meth:`_future.Engine.connect` + + :meth:`_future.Connection.begin` + + """ + return super(Engine, self).begin() + + def connect(self): + """Return a new :class:`_future.Connection` object. + + The :class:`_future.Connection` acts as a Python context manager, so + the typical use of this method looks like:: + + with engine.connect() as connection: + connection.execute(text("insert into table values ('foo')")) + connection.commit() + + Where above, after the block is completed, the connection is "closed" + and its underlying DBAPI resources are returned to the connection pool. + This also has the effect of rolling back any transaction that + was explicitly begun or was begun via autobegin, and will + emit the :meth:`_events.ConnectionEvents.rollback` event if one was + started and is still in progress. + + .. seealso:: + + :meth:`_future.Engine.begin` + + + """ + return super(Engine, self).connect() + + +class OptionEngine(OptionEngineMixin, Engine): + pass + + +Engine._option_cls = OptionEngine diff --git a/lib/sqlalchemy/future/result.py b/lib/sqlalchemy/future/result.py index 82d87ddf13..21a42e1f65 100644 --- a/lib/sqlalchemy/future/result.py +++ b/lib/sqlalchemy/future/result.py @@ -1,17 +1,16 @@ import operator from .. import util -from ..engine.result import _baserow_usecext from ..engine.result import BaseResult from ..engine.result import CursorResultMetaData from ..engine.result import DefaultCursorFetchStrategy from ..engine.result import Row from ..sql import util as sql_util from ..sql.base import _generative -from ..sql.base import Generative +from ..sql.base import InPlaceGenerative -class Result(Generative, BaseResult): +class Result(InPlaceGenerative, BaseResult): """Interim "future" result proxy so that dialects can build on upcoming 2.0 patterns. @@ -50,21 +49,76 @@ class Result(Generative, BaseResult): self._soft_close(hard=True) def columns(self, *col_expressions): - indexes = [] - for key in col_expressions: - try: - rec = self._keymap[key] - except KeyError: - rec = self._key_fallback(key, True) - if rec is None: - return None - - index, obj = rec[0:2] - - if index is None: - self._metadata._raise_for_ambiguous_column_name(obj) - indexes.append(index) - return self._column_slices(indexes) + r"""Establish the columns that should be returned in each row. + + This method may be used to limit the columns returned as well + as to reorder them. The given list of expressions are normally + a series of integers or string key names. They may also be + appropriate :class:`.ColumnElement` objects which correspond to + a given statement construct. + + E.g.:: + + statement = select(table.c.x, table.c.y, table.c.z) + result = connection.execute(statement) + + for z, y in result.columns('z', 'y'): + # ... + + + Example of using the column objects from the statement itself:: + + for z, y in result.columns( + statement.selected_columns.c.z, + statement.selected_columns.c.y + ): + # ... + + :param \*col_expressions: indicates columns to be returned. Elements + may be integer row indexes, string column names, or appropriate + :class:`.ColumnElement` objects corresponding to a select construct. + + :return: this :class:`_future.Result` object with the modifications + given. + + """ + return self._column_slices(col_expressions) + + def partitions(self, size=100): + """Iterate through sub-lists of rows of the size given. + + Each list will be of the size given, excluding the last list to + be yielded, which may have a small number of rows. No empty + lists will be yielded. + + The result object is automatically closed when the iterator + is fully consumed. + + Note that the backend driver will usually buffer the entire result + ahead of time unless the + :paramref:`.Connection.execution_options.stream_results` execution + option is used indicating that the driver should not pre-buffer + results, if possible. Not all drivers support this option and + the option is silently ignored for those who do. For a positive + assertion that the driver supports streaming results that will + fail if not supported, use the + :paramref:`.Connection.execution_options.stream_per` + execution option. + + :param size: indicate the maximum number of rows to be present + in each list yielded. + :return: iterator of lists + + """ + getter = self._row_getter() + while True: + partition = [ + getter(r) for r in self._safe_fetchmany_impl(size=size) + ] + if partition: + yield partition + else: + break def scalars(self): result = self._column_slices(0) @@ -73,12 +127,7 @@ class Result(Generative, BaseResult): @_generative def _column_slices(self, indexes): - if _baserow_usecext: - self._column_slice_filter = self._metadata._tuplegetter(*indexes) - else: - self._column_slice_filter = self._metadata._pure_py_tuplegetter( - *indexes - ) + self._column_slice_filter = self._metadata._tuple_getter(indexes) @_generative def mappings(self): @@ -135,7 +184,7 @@ class Result(Generative, BaseResult): def _safe_fetchmany_impl(self, size=None): try: - l = self.process_rows(self.cursor_strategy.fetchmany(size)) + l = self.cursor_strategy.fetchmany(size) if len(l) == 0: self._soft_close() return l @@ -156,11 +205,77 @@ class Result(Generative, BaseResult): else: return getter(row) + @util.deprecated( + "2.0", + "The :meth:`_future.Result.fetchall` " + "method is provided for backwards " + "compatibility and will be removed in a future release.", + ) + def fetchall(self): + """A synonym for the :meth:`_future.Result.all` method.""" + + return self.all() + + @util.deprecated( + "2.0", + "The :meth:`_future.Result.fetchone` " + "method is provided for backwards " + "compatibility and will be removed in a future release.", + ) + def fetchone(self): + """Fetch one row. + + this method is provided for backwards compatibility with + SQLAlchemy 1.x.x. + + To fetch the first row of a result only, use the + :meth:`.future.Result.first` method. To iterate through all + rows, iterate the :class:`_future.Result` object directly. + + """ + return self._onerow() + + @util.deprecated( + "2.0", + "The :meth:`_future.Result.fetchmany` " + "method is provided for backwards " + "compatibility and will be removed in a future release.", + ) + def fetchmany(self, size=None): + """Fetch many rows. + + this method is provided for backwards compatibility with + SQLAlchemy 1.x.x. + + To fetch rows in groups, use the :meth:`.future.Result.partitions` + method, or the :meth:`.future.Result.chunks` method in combination + with the :paramref:`.Connection.execution_options.stream_per` + option which sets up the buffer size before fetching the result. + + """ + getter = self._row_getter() + return [getter(r) for r in self._safe_fetchmany_impl(size=size)] + def all(self): + """Return all rows in a list. + + Closes the result set after invocation. + + :return: a list of :class:`.Row` objects. + + """ getter = self._row_getter() return [getter(r) for r in self._safe_fetchall_impl()] def first(self): + """Fetch the first row or None if no row is present. + + Closes the result set and discards remaining rows. A warning + is emitted if additional rows remain. + + :return: a :class:`.Row` object, or None if no rows remain + + """ getter = self._row_getter() row = self._safe_fetchone_impl() if row is None: @@ -172,3 +287,19 @@ class Result(Generative, BaseResult): self._soft_close() util.warn("Additional rows remain") return row + + def scalar(self): + """Fetch the first column of the first row, and close the result set. + + After calling this method, the object is fully closed, + e.g. the :meth:`_engine.ResultProxy.close` + method will have been called. + + :return: a Python scalar value , or None if no rows remain + + """ + row = self.first() + if row is not None: + return row[0] + else: + return None diff --git a/lib/sqlalchemy/log.py b/lib/sqlalchemy/log.py index 42c8a2c290..44f8c4ff86 100644 --- a/lib/sqlalchemy/log.py +++ b/lib/sqlalchemy/log.py @@ -41,8 +41,15 @@ def _add_default_handler(logger): _logged_classes = set() +def _qual_logger_name_for_cls(cls): + return ( + getattr(cls, "_sqla_logger_namespace", None) + or cls.__module__ + "." + cls.__name__ + ) + + def class_logger(cls): - logger = logging.getLogger(cls.__module__ + "." + cls.__name__) + logger = logging.getLogger(_qual_logger_name_for_cls(cls)) cls._should_log_debug = lambda self: logger.isEnabledFor(logging.DEBUG) cls._should_log_info = lambda self: logger.isEnabledFor(logging.INFO) cls.logger = logger @@ -175,16 +182,12 @@ def instance_logger(instance, echoflag=None): """create a logger for an instance that implements :class:`.Identified`.""" if instance.logging_name: - name = "%s.%s.%s" % ( - instance.__class__.__module__, - instance.__class__.__name__, + name = "%s.%s" % ( + _qual_logger_name_for_cls(instance.__class__), instance.logging_name, ) else: - name = "%s.%s" % ( - instance.__class__.__module__, - instance.__class__.__name__, - ) + name = _qual_logger_name_for_cls(instance.__class__) instance._echo = echoflag diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py index 534d675303..4ca715dd32 100644 --- a/lib/sqlalchemy/orm/session.py +++ b/lib/sqlalchemy/orm/session.py @@ -447,7 +447,10 @@ class SessionTransaction(object): elif self.nested: transaction = conn.begin_nested() else: - transaction = conn.begin() + if conn._is_future and conn.in_transaction(): + transaction = conn._transaction + else: + transaction = conn.begin() except: # connection will not not be associated with this Session; # close it immediately so that it isn't closed under GC @@ -455,10 +458,13 @@ class SessionTransaction(object): conn.close() raise else: + bind_is_connection = isinstance(bind, engine.Connection) + self._connections[conn] = self._connections[conn.engine] = ( conn, transaction, - conn is not bind, + not bind_is_connection or not conn._is_future, + not bind_is_connection, ) self.session.dispatch.after_begin(self.session, self, conn) return conn @@ -509,8 +515,11 @@ class SessionTransaction(object): self._prepare_impl() if self._parent is None or self.nested: - for t in set(self._connections.values()): - t[1].commit() + for conn, trans, should_commit, autoclose in set( + self._connections.values() + ): + if should_commit: + trans.commit() self._state = COMMITTED self.session.dispatch.after_commit(self.session) @@ -579,7 +588,7 @@ class SessionTransaction(object): def close(self, invalidate=False): self.session._transaction = self._parent if self._parent is None: - for connection, transaction, autoclose in set( + for connection, transaction, should_commit, autoclose in set( self._connections.values() ): if invalidate: diff --git a/lib/sqlalchemy/sql/base.py b/lib/sqlalchemy/sql/base.py index d070027c8f..2d023c6a6a 100644 --- a/lib/sqlalchemy/sql/base.py +++ b/lib/sqlalchemy/sql/base.py @@ -461,6 +461,17 @@ class Generative(HasMemoized): return s +class InPlaceGenerative(HasMemoized): + """Provide a method-chaining pattern in conjunction with the + @_generative decorator taht mutates in place.""" + + def _generate(self): + skip = self._memoized_keys + for k in skip: + self.__dict__.pop(k, None) + return self + + class HasCompileState(Generative): """A class that has a :class:`.CompileState` associated with it.""" diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index 7ef9f7e6df..bc16b14296 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -434,9 +434,13 @@ class Compiled(object): self.string, schema_translate_map ) - def _execute_on_connection(self, connection, multiparams, params): + def _execute_on_connection( + self, connection, multiparams, params, execution_options + ): if self.can_execute: - return connection._execute_compiled(self, multiparams, params) + return connection._execute_compiled( + self, multiparams, params, execution_options + ) else: raise exc.ObjectNotExecutableError(self.statement) diff --git a/lib/sqlalchemy/sql/ddl.py b/lib/sqlalchemy/sql/ddl.py index 4c8250e983..51526173f0 100644 --- a/lib/sqlalchemy/sql/ddl.py +++ b/lib/sqlalchemy/sql/ddl.py @@ -68,8 +68,12 @@ class DDLElement(roles.DDLRole, Executable, _DDLCompiles): dialect = None callable_ = None - def _execute_on_connection(self, connection, multiparams, params): - return connection._execute_ddl(self, multiparams, params) + def _execute_on_connection( + self, connection, multiparams, params, execution_options + ): + return connection._execute_ddl( + self, multiparams, params, execution_options + ) def execute(self, bind=None, target=None): """Execute this DDL immediately. diff --git a/lib/sqlalchemy/sql/elements.py b/lib/sqlalchemy/sql/elements.py index 49bb08644b..d8b5a16263 100644 --- a/lib/sqlalchemy/sql/elements.py +++ b/lib/sqlalchemy/sql/elements.py @@ -282,9 +282,13 @@ class ClauseElement( d.pop("_generate_cache_key", None) return d - def _execute_on_connection(self, connection, multiparams, params): + def _execute_on_connection( + self, connection, multiparams, params, execution_options + ): if self.supports_execution: - return connection._execute_clauseelement(self, multiparams, params) + return connection._execute_clauseelement( + self, multiparams, params, execution_options + ) else: raise exc.ObjectNotExecutableError(self) diff --git a/lib/sqlalchemy/sql/functions.py b/lib/sqlalchemy/sql/functions.py index 1b10df9547..cedb76f559 100644 --- a/lib/sqlalchemy/sql/functions.py +++ b/lib/sqlalchemy/sql/functions.py @@ -115,8 +115,12 @@ class FunctionElement(Executable, ColumnElement, FromClause): operator=operators.comma_op, group_contents=True, *args ).self_group() - def _execute_on_connection(self, connection, multiparams, params): - return connection._execute_function(self, multiparams, params) + def _execute_on_connection( + self, connection, multiparams, params, execution_options + ): + return connection._execute_function( + self, multiparams, params, execution_options + ) @property def columns(self): diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py index ec8d5a4585..eddd62d659 100644 --- a/lib/sqlalchemy/sql/schema.py +++ b/lib/sqlalchemy/sql/schema.py @@ -2186,8 +2186,12 @@ class DefaultGenerator(SchemaItem): bind = _bind_or_error(self) return bind.execute(self, **kwargs) - def _execute_on_connection(self, connection, multiparams, params): - return connection._execute_default(self, multiparams, params) + def _execute_on_connection( + self, connection, multiparams, params, execution_options + ): + return connection._execute_default( + self, multiparams, params, execution_options + ) @property def bind(self): diff --git a/lib/sqlalchemy/testing/assertsql.py b/lib/sqlalchemy/testing/assertsql.py index 8876c23041..7988b4ec9c 100644 --- a/lib/sqlalchemy/testing/assertsql.py +++ b/lib/sqlalchemy/testing/assertsql.py @@ -388,7 +388,9 @@ def assert_engine(engine): orig = [] @event.listens_for(engine, "before_execute") - def connection_execute(conn, clauseelement, multiparams, params): + def connection_execute( + conn, clauseelement, multiparams, params, execution_options + ): # grab the original statement + params before any cursor # execution orig[:] = clauseelement, multiparams, params diff --git a/lib/sqlalchemy/testing/config.py b/lib/sqlalchemy/testing/config.py index 140f5f7824..e97821d722 100644 --- a/lib/sqlalchemy/testing/config.py +++ b/lib/sqlalchemy/testing/config.py @@ -149,6 +149,14 @@ class Config(object): cls._stack.append(_current) cls.set_as_current(config, namespace) + @classmethod + def pop(cls, namespace): + if cls._stack: + # a failed test w/ -x option can call reset() ahead of time + _current = cls._stack[-1] + del cls._stack[-1] + cls.set_as_current(_current, namespace) + @classmethod def reset(cls, namespace): if cls._stack: diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py index 910af5876b..280e6901e1 100644 --- a/lib/sqlalchemy/testing/engines.py +++ b/lib/sqlalchemy/testing/engines.py @@ -238,10 +238,13 @@ def reconnecting_engine(url=None, options=None): return engine -def testing_engine(url=None, options=None): +def testing_engine(url=None, options=None, future=False): """Produce an engine configured by --options with optional overrides.""" - from sqlalchemy import create_engine + if future or config.db and config.db._is_future: + from sqlalchemy.future import create_engine + else: + from sqlalchemy import create_engine from sqlalchemy.engine.url import make_url if not options: diff --git a/lib/sqlalchemy/testing/fixtures.py b/lib/sqlalchemy/testing/fixtures.py index e5e6c42fc0..26ae221b89 100644 --- a/lib/sqlalchemy/testing/fixtures.py +++ b/lib/sqlalchemy/testing/fixtures.py @@ -84,6 +84,29 @@ class TestBase(object): # engines.drop_all_tables(metadata, config.db) +class FutureEngineMixin(object): + @classmethod + def setup_class(cls): + super_ = super(FutureEngineMixin, cls) + if hasattr(super_, "setup_class"): + super_.setup_class() + + from ..future.engine import Engine + from sqlalchemy import testing + + config._current.push_engine(Engine._future_facade(config.db), testing) + + @classmethod + def teardown_class(cls): + from sqlalchemy import testing + + config._current.pop(testing) + + super_ = super(FutureEngineMixin, cls) + if hasattr(super_, "teardown_class"): + super_.teardown_class() + + class TablesTest(TestBase): # 'once', None diff --git a/lib/sqlalchemy/testing/suite/test_ddl.py b/lib/sqlalchemy/testing/suite/test_ddl.py index 1f49106fb6..93706338cd 100644 --- a/lib/sqlalchemy/testing/suite/test_ddl.py +++ b/lib/sqlalchemy/testing/suite/test_ddl.py @@ -90,4 +90,8 @@ class TableDDLTest(fixtures.TestBase): ) -__all__ = ("TableDDLTest",) +class FutureTableDDLTest(fixtures.FutureEngineMixin, TableDDLTest): + pass + + +__all__ = ("TableDDLTest", "FutureTableDDLTest") diff --git a/lib/sqlalchemy/testing/warnings.py b/lib/sqlalchemy/testing/warnings.py index 6b42c98cb6..39cffbf158 100644 --- a/lib/sqlalchemy/testing/warnings.py +++ b/lib/sqlalchemy/testing/warnings.py @@ -34,6 +34,13 @@ def setup_filters(): # ignore 2.0 warnings unless we are explicitly testing for them warnings.filterwarnings("ignore", category=sa_exc.RemovedIn20Warning) + # ignore things that are deprecated *as of* 2.0 :) + warnings.filterwarnings( + "ignore", + category=sa_exc.SADeprecationWarning, + message=r".*\(deprecated since: 2.0\)$", + ) + try: import pytest except ImportError: diff --git a/lib/sqlalchemy/util/deprecations.py b/lib/sqlalchemy/util/deprecations.py index ad734a1c32..8ea8e8695c 100644 --- a/lib/sqlalchemy/util/deprecations.py +++ b/lib/sqlalchemy/util/deprecations.py @@ -108,6 +108,8 @@ def deprecated( if warning is None: warning = exc.SADeprecationWarning + message += " (deprecated since: %s)" % version + def decorate(fn): return _decorate_with_warning( fn, warning, message % dict(func=fn.__name__), version, header diff --git a/test/base/test_events.py b/test/base/test_events.py index f13137084a..677cf80b0b 100644 --- a/test/base/test_events.py +++ b/test/base/test_events.py @@ -6,6 +6,7 @@ from sqlalchemy import testing from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import eq_ +from sqlalchemy.testing import expect_deprecated from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.testing import is_not_ @@ -408,7 +409,13 @@ class LegacySignatureTest(fixtures.TestBase): def handler1(x, y): canary(x, y) - self.TargetOne().dispatch.event_three(4, 5, 6, 7) + with expect_deprecated( + 'The argument signature for the "TargetEventsOne.event_three" ' + "event listener has changed as of version 0.9, and conversion " + "for the old argument signature will be removed in a future " + r'release. The new signature is "def event_three\(x, y, z, q\)"' + ): + self.TargetOne().dispatch.event_three(4, 5, 6, 7) eq_(canary.mock_calls, [call(4, 5)]) @@ -451,7 +458,14 @@ class LegacySignatureTest(fixtures.TestBase): eq_(canary.mock_calls, [call(5, 4, 5, foo="bar")]) def _test_legacy_accept_kw(self, target, canary): - target.dispatch.event_four(4, 5, 6, 7, foo="bar") + with expect_deprecated( + 'The argument signature for the "TargetEventsOne.event_four" ' + "event listener has changed as of version 0.9, and conversion " + "for the old argument signature will be removed in a future " + r"release. The new signature is " + r'"def event_four\(x, y, z, q, \*\*kw\)"' + ): + target.dispatch.event_four(4, 5, 6, 7, foo="bar") eq_(canary.mock_calls, [call(4, 5, {"foo": "bar"})]) @@ -462,9 +476,27 @@ class LegacySignatureTest(fixtures.TestBase): def handler1(x, y, z, q): canary(x, y, z, q) - self.TargetOne().dispatch.event_six(4, 5) + with expect_deprecated( + 'The argument signature for the "TargetEventsOne.event_six" ' + "event listener has changed as of version 0.9, and " + "conversion for the old argument signature will be removed in " + "a future release. The new signature is " + r'"def event_six\(x, y\)' + ): + self.TargetOne().dispatch.event_six(4, 5) eq_(canary.mock_calls, [call(4, 5, 9, 20)]) + def test_complex_new_accept(self): + canary = Mock() + + @event.listens_for(self.TargetOne, "event_six") + def handler1(x, y): + canary(x, y) + + # new version does not emit a warning + self.TargetOne().dispatch.event_six(4, 5) + eq_(canary.mock_calls, [call(4, 5)]) + def test_legacy_accept_from_method(self): canary = Mock() @@ -474,7 +506,13 @@ class LegacySignatureTest(fixtures.TestBase): event.listen(self.TargetOne, "event_three", MyClass().handler1) - self.TargetOne().dispatch.event_three(4, 5, 6, 7) + with expect_deprecated( + 'The argument signature for the "TargetEventsOne.event_three" ' + "event listener has changed as of version 0.9, and conversion " + "for the old argument signature will be removed in a future " + r'release. The new signature is "def event_three\(x, y, z, q\)"' + ): + self.TargetOne().dispatch.event_three(4, 5, 6, 7) eq_(canary.mock_calls, [call(4, 5)]) def test_standard_accept_has_legacies(self): diff --git a/test/base/test_tutorials.py b/test/base/test_tutorials.py index 2c1058b9a3..97dca753de 100644 --- a/test/base/test_tutorials.py +++ b/test/base/test_tutorials.py @@ -13,7 +13,7 @@ from sqlalchemy.testing import fixtures class DocTest(fixtures.TestBase): def _setup_logger(self): - rootlogger = logging.getLogger("sqlalchemy.engine.base.Engine") + rootlogger = logging.getLogger("sqlalchemy.engine.Engine") class MyStream(object): def write(self, string): @@ -28,7 +28,7 @@ class DocTest(fixtures.TestBase): rootlogger.addHandler(handler) def _teardown_logger(self): - rootlogger = logging.getLogger("sqlalchemy.engine.base.Engine") + rootlogger = logging.getLogger("sqlalchemy.engine.Engine") rootlogger.removeHandler(self._handler) def _setup_create_table_patcher(self): diff --git a/test/dialect/mysql/test_reflection.py b/test/dialect/mysql/test_reflection.py index 2de530607d..b0d0e41515 100644 --- a/test/dialect/mysql/test_reflection.py +++ b/test/dialect/mysql/test_reflection.py @@ -853,7 +853,7 @@ class ReflectionTest(fixtures.TestBase, AssertsCompiledSQL): dialect._casing = casing dialect.default_schema_name = "Test" connection = mock.Mock( - dialect=dialect, execute=lambda stmt, **params: ischema + dialect=dialect, execute=lambda stmt, params: ischema ) dialect._correct_for_mysql_bugs_88718_96365(fkeys, connection) eq_( diff --git a/test/engine/test_deprecations.py b/test/engine/test_deprecations.py index 53df6c1a8e..20f2b7d74f 100644 --- a/test/engine/test_deprecations.py +++ b/test/engine/test_deprecations.py @@ -1,5 +1,8 @@ +import re + import sqlalchemy as tsa from sqlalchemy import create_engine +from sqlalchemy import event from sqlalchemy import ForeignKey from sqlalchemy import func from sqlalchemy import inspect @@ -13,9 +16,11 @@ from sqlalchemy import testing from sqlalchemy import VARCHAR from sqlalchemy.engine import reflection from sqlalchemy.engine.base import Connection +from sqlalchemy.engine.base import Engine from sqlalchemy.engine.mock import MockConnection from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message +from sqlalchemy.testing import config from sqlalchemy.testing import engines from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures @@ -23,6 +28,7 @@ from sqlalchemy.testing import is_ from sqlalchemy.testing import is_false from sqlalchemy.testing import is_instance_of from sqlalchemy.testing import is_true +from sqlalchemy.testing.engines import testing_engine from sqlalchemy.testing.mock import Mock from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table @@ -766,3 +772,89 @@ class RawExecuteTest(fixtures.TablesTest): (3, "horse"), (4, "sally"), ] + + +class EngineEventsTest(fixtures.TestBase): + __requires__ = ("ad_hoc_engines",) + __backend__ = True + + def tearDown(self): + Engine.dispatch._clear() + Engine._has_events = False + + def _assert_stmts(self, expected, received): + list(received) + for stmt, params, posn in expected: + if not received: + assert False, "Nothing available for stmt: %s" % stmt + while received: + teststmt, testparams, testmultiparams = received.pop(0) + teststmt = ( + re.compile(r"[\n\t ]+", re.M).sub(" ", teststmt).strip() + ) + if teststmt.startswith(stmt) and ( + testparams == params or testparams == posn + ): + break + + def test_retval_flag(self): + canary = [] + + def tracker(name): + def go(conn, *args, **kw): + canary.append(name) + + return go + + def execute(conn, clauseelement, multiparams, params): + canary.append("execute") + return clauseelement, multiparams, params + + def cursor_execute( + conn, cursor, statement, parameters, context, executemany + ): + canary.append("cursor_execute") + return statement, parameters + + engine = engines.testing_engine() + + assert_raises( + tsa.exc.ArgumentError, + event.listen, + engine, + "begin", + tracker("begin"), + retval=True, + ) + + event.listen(engine, "before_execute", execute, retval=True) + event.listen( + engine, "before_cursor_execute", cursor_execute, retval=True + ) + with testing.expect_deprecated( + r"The argument signature for the " + r"\"ConnectionEvents.before_execute\" event listener", + ): + engine.execute(select([1])) + eq_(canary, ["execute", "cursor_execute"]) + + def test_argument_format_execute(self): + def before_execute(conn, clauseelement, multiparams, params): + assert isinstance(multiparams, (list, tuple)) + assert isinstance(params, dict) + + def after_execute(conn, clauseelement, multiparams, params, result): + assert isinstance(multiparams, (list, tuple)) + assert isinstance(params, dict) + + e1 = testing_engine(config.db_url) + event.listen(e1, "before_execute", before_execute) + event.listen(e1, "after_execute", after_execute) + + with testing.expect_deprecated( + r"The argument signature for the " + r"\"ConnectionEvents.before_execute\" event listener", + r"The argument signature for the " + r"\"ConnectionEvents.after_execute\" event listener", + ): + e1.execute(select([1])) diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index e2c009e2d7..f694a251c0 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -24,6 +24,7 @@ from sqlalchemy import TypeDecorator from sqlalchemy import util from sqlalchemy import VARCHAR from sqlalchemy.engine import default +from sqlalchemy.engine.base import Connection from sqlalchemy.engine.base import Engine from sqlalchemy.sql import column from sqlalchemy.sql import literal @@ -112,6 +113,11 @@ class ExecuteTest(fixtures.TablesTest): ) def test_raw_named_invalid(self, connection): + # this is awkward b.c. this is just testing if regular Python + # is raising TypeError if they happened to send arguments that + # look like the legacy ones which also happen to conflict with + # the positional signature for the method. some combinations + # can get through and fail differently assert_raises( TypeError, connection.exec_driver_sql, @@ -119,6 +125,7 @@ class ExecuteTest(fixtures.TablesTest): "values (%(id)s, %(name)s)", {"id": 2, "name": "ed"}, {"id": 3, "name": "horse"}, + {"id": 4, "name": "horse"}, ) assert_raises( TypeError, @@ -1224,6 +1231,7 @@ class EngineEventsTest(fixtures.TestBase): def _assert_stmts(self, expected, received): list(received) + for stmt, params, posn in expected: if not received: assert False, "Nothing available for stmt: %s" % stmt @@ -1245,12 +1253,20 @@ class EngineEventsTest(fixtures.TestBase): event.listen(e1, "before_execute", canary) s1 = select([1]) s2 = select([2]) - e1.execute(s1) - e2.execute(s2) + + with e1.connect() as conn: + conn.execute(s1) + + with e2.connect() as conn: + conn.execute(s2) eq_([arg[1][1] for arg in canary.mock_calls], [s1]) event.listen(e2, "before_execute", canary) - e1.execute(s1) - e2.execute(s2) + + with e1.connect() as conn: + conn.execute(s1) + + with e2.connect() as conn: + conn.execute(s2) eq_([arg[1][1] for arg in canary.mock_calls], [s1, s1, s2]) def test_per_engine_plus_global(self): @@ -1265,11 +1281,13 @@ class EngineEventsTest(fixtures.TestBase): e1.connect() e2.connect() - e1.execute(select([1])) + with e1.connect() as conn: + conn.execute(select([1])) eq_(canary.be1.call_count, 1) eq_(canary.be2.call_count, 1) - e2.execute(select([1])) + with e2.connect() as conn: + conn.execute(select([1])) eq_(canary.be1.call_count, 2) eq_(canary.be2.call_count, 1) @@ -1288,9 +1306,10 @@ class EngineEventsTest(fixtures.TestBase): eq_(canary.be1.call_count, 1) eq_(canary.be2.call_count, 1) - conn._branch().execute(select([1])) - eq_(canary.be1.call_count, 2) - eq_(canary.be2.call_count, 2) + if testing.requires.legacy_engine.enabled: + conn._branch().execute(select([1])) + eq_(canary.be1.call_count, 2) + eq_(canary.be2.call_count, 2) def test_add_event_after_connect(self): # new feature as of #2978 @@ -1339,7 +1358,7 @@ class EngineEventsTest(fixtures.TestBase): dialect = conn.dialect ctx = dialect.execution_ctx_cls._init_statement( - dialect, conn, conn.connection, stmt, {} + dialect, conn, conn.connection, {}, stmt, {} ) ctx._execute_scalar(stmt, Integer()) @@ -1377,11 +1396,15 @@ class EngineEventsTest(fixtures.TestBase): ) def test_argument_format_execute(self): - def before_execute(conn, clauseelement, multiparams, params): + def before_execute( + conn, clauseelement, multiparams, params, execution_options + ): assert isinstance(multiparams, (list, tuple)) assert isinstance(params, dict) - def after_execute(conn, clauseelement, multiparams, params, result): + def after_execute( + conn, clauseelement, multiparams, params, result, execution_options + ): assert isinstance(multiparams, (list, tuple)) assert isinstance(params, dict) @@ -1389,18 +1412,23 @@ class EngineEventsTest(fixtures.TestBase): event.listen(e1, "before_execute", before_execute) event.listen(e1, "after_execute", after_execute) - e1.execute(select([1])) - e1.execute(select([1]).compile(dialect=e1.dialect).statement) - e1.execute(select([1]).compile(dialect=e1.dialect)) - e1._execute_compiled(select([1]).compile(dialect=e1.dialect), (), {}) + with e1.connect() as conn: + conn.execute(select([1])) + conn.execute(select([1]).compile(dialect=e1.dialect).statement) + conn.execute(select([1]).compile(dialect=e1.dialect)) + + conn._execute_compiled( + select([1]).compile(dialect=e1.dialect), (), {} + ) - @testing.fails_on("firebird", "Data type unknown") def test_execute_events(self): stmts = [] cursor_stmts = [] - def execute(conn, clauseelement, multiparams, params): + def execute( + conn, clauseelement, multiparams, params, execution_options + ): stmts.append((str(clauseelement), params, multiparams)) def cursor_execute( @@ -1408,6 +1436,8 @@ class EngineEventsTest(fixtures.TestBase): ): cursor_stmts.append((str(statement), parameters, None)) + # TODO: this test is kind of a mess + for engine in [ engines.testing_engine(options=dict(implicit_returning=False)), engines.testing_engine( @@ -1428,28 +1458,57 @@ class EngineEventsTest(fixtures.TestBase): primary_key=True, ), ) - m.create_all() + + if isinstance(engine, Connection) and engine._is_future: + ctx = None + conn = engine + elif engine._is_future: + ctx = conn = engine.connect() + else: + ctx = None + conn = engine + try: - t1.insert().execute(c1=5, c2="some data") - t1.insert().execute(c1=6) - eq_( - engine.execute(text("select * from t1")).fetchall(), - [(5, "some data"), (6, "foo")], - ) + m.create_all(conn, checkfirst=False) + try: + conn.execute(t1.insert(), dict(c1=5, c2="some data")) + conn.execute(t1.insert(), dict(c1=6)) + eq_( + conn.execute(text("select * from t1")).fetchall(), + [(5, "some data"), (6, "foo")], + ) + finally: + m.drop_all(conn) + if engine._is_future: + conn.commit() finally: - m.drop_all() - - compiled = [ - ("CREATE TABLE t1", {}, None), - ( - "INSERT INTO t1 (c1, c2)", - {"c2": "some data", "c1": 5}, - None, - ), - ("INSERT INTO t1 (c1, c2)", {"c1": 6}, None), - ("select * from t1", {}, None), - ("DROP TABLE t1", {}, None), - ] + if ctx: + ctx.close() + + if engine._is_future: + compiled = [ + ("CREATE TABLE t1", {}, None), + ( + "INSERT INTO t1 (c1, c2)", + {"c2": "some data", "c1": 5}, + None, + ), + ("INSERT INTO t1 (c1, c2)", {"c1": 6}, None), + ("select * from t1", {}, None), + ("DROP TABLE t1", {}, None), + ] + else: + compiled = [ + ("CREATE TABLE t1", {}, None), + ( + "INSERT INTO t1 (c1, c2)", + {}, + ({"c2": "some data", "c1": 5},), + ), + ("INSERT INTO t1 (c1, c2)", {}, ({"c1": 6},)), + ("select * from t1", {}, None), + ("DROP TABLE t1", {}, None), + ] cursor = [ ("CREATE TABLE t1", {}, ()), @@ -1512,11 +1571,13 @@ class EngineEventsTest(fixtures.TestBase): event.listen(eng, "before_execute", l2) event.listen(eng1, "before_execute", l3) - eng.execute(select([1])).close() + with eng.connect() as conn: + conn.execute(select([1])) eq_(canary, ["l1", "l2"]) - eng1.execute(select([1])).close() + with eng1.connect() as conn: + conn.execute(select([1])) eq_(canary, ["l1", "l2", "l3", "l1", "l2"]) @@ -1547,11 +1608,13 @@ class EngineEventsTest(fixtures.TestBase): event.listen(eng, "before_execute", l3) event.listen(eng1, "before_execute", l4) - eng.execute(select([1])).close() + with eng.connect() as conn: + conn.execute(select([1])) eq_(canary, ["l1", "l2", "l3"]) - eng1.execute(select([1])).close() + with eng1.connect() as conn: + conn.execute(select([1])) eq_(canary, ["l1", "l2", "l3", "l4", "l1", "l2", "l3"]) @@ -1561,7 +1624,8 @@ class EngineEventsTest(fixtures.TestBase): event.remove(eng1, "before_execute", l4) event.remove(eng, "before_execute", l3) - eng1.execute(select([1])).close() + with eng1.connect() as conn: + conn.execute(select([1])) eq_(canary, ["l2"]) @testing.requires.ad_hoc_engines @@ -1609,7 +1673,9 @@ class EngineEventsTest(fixtures.TestBase): return go - def execute(conn, clauseelement, multiparams, params): + def execute( + conn, clauseelement, multiparams, params, execution_options + ): canary.append("execute") return clauseelement, multiparams, params @@ -1634,9 +1700,11 @@ class EngineEventsTest(fixtures.TestBase): event.listen( engine, "before_cursor_execute", cursor_execute, retval=True ) - engine.execute(select([1])) + with engine.connect() as conn: + conn.execute(select([1])) eq_(canary, ["execute", "cursor_execute"]) + @testing.requires.legacy_engine def test_engine_connect(self): engine = engines.testing_engine() @@ -1781,7 +1849,15 @@ class EngineEventsTest(fixtures.TestBase): ("begin", set(["conn"])), ( "execute", - set(["conn", "clauseelement", "multiparams", "params"]), + set( + [ + "conn", + "clauseelement", + "multiparams", + "params", + "execution_options", + ] + ), ), ( "cursor_execute", @@ -1800,7 +1876,15 @@ class EngineEventsTest(fixtures.TestBase): ("begin", set(["conn"])), ( "execute", - set(["conn", "clauseelement", "multiparams", "params"]), + set( + [ + "conn", + "clauseelement", + "multiparams", + "params", + "execution_options", + ] + ), ), ( "cursor_execute", @@ -1908,6 +1992,10 @@ class EngineEventsTest(fixtures.TestBase): ) +class FutureEngineEventsTest(fixtures.FutureEngineMixin, EngineEventsTest): + pass + + class HandleErrorTest(fixtures.TestBase): __requires__ = ("ad_hoc_engines",) __backend__ = True @@ -2649,7 +2737,7 @@ class DialectEventTest(fixtures.TestBase): stmt = "insert into table foo" params = {"foo": "bar"} ctx = dialect.execution_ctx_cls._init_statement( - dialect, conn, conn.connection, stmt, [params] + dialect, conn, conn.connection, {}, stmt, [params], ) conn._cursor_execute(ctx.cursor, stmt, params, ctx) @@ -2813,3 +2901,80 @@ class AutocommitTextTest(fixtures.TestBase): def test_select(self): self._test_keyword("SELECT foo FROM table", False) + + +class FutureExecuteTest(fixtures.FutureEngineMixin, fixtures.TablesTest): + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + "users", + metadata, + Column("user_id", INT, primary_key=True, autoincrement=False), + Column("user_name", VARCHAR(20)), + test_needs_acid=True, + ) + Table( + "users_autoinc", + metadata, + Column( + "user_id", INT, primary_key=True, test_needs_autoincrement=True + ), + Column("user_name", VARCHAR(20)), + test_needs_acid=True, + ) + + @testing.combinations( + ({}, {}, {}), + ({"a": "b"}, {}, {"a": "b"}), + ({"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}), + argnames="conn_opts, exec_opts, expected", + ) + def test_execution_opts_per_invoke( + self, connection, conn_opts, exec_opts, expected + ): + opts = [] + + @event.listens_for(connection, "before_cursor_execute") + def before_cursor_execute( + conn, cursor, statement, parameters, context, executemany + ): + opts.append(context.execution_options) + + if conn_opts: + connection = connection.execution_options(**conn_opts) + + if exec_opts: + connection.execute(select([1]), execution_options=exec_opts) + else: + connection.execute(select([1])) + + eq_(opts, [expected]) + + def test_execution_opts_invoke_illegal(self, connection): + assert_raises_message( + tsa.exc.InvalidRequestError, + "The 'isolation_level' execution option is not supported " + "at the per-statement level", + connection.execute, + select([1]), + execution_options={"isolation_level": "AUTOCOMMIT"}, + ) + + assert_raises_message( + tsa.exc.InvalidRequestError, + "The 'schema_translate_map' execution option is not supported " + "at the per-statement level", + connection.execute, + select([1]), + execution_options={"schema_translate_map": {}}, + ) + + def test_no_branching(self, connection): + assert_raises_message( + NotImplementedError, + "sqlalchemy.future.Connection does not support " + "'branching' of new connections.", + connection.connect, + ) diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py index 5d50a010df..b906b87be4 100644 --- a/test/engine/test_logging.py +++ b/test/engine/test_logging.py @@ -491,7 +491,7 @@ class LoggingNameTest(fixtures.TestBase): assert self.buf.buffer for name in [b.name for b in self.buf.buffer]: assert name in ( - "sqlalchemy.engine.base.Engine.%s" % eng_name, + "sqlalchemy.engine.Engine.%s" % eng_name, "sqlalchemy.pool.impl.%s.%s" % (eng.pool.__class__.__name__, pool_name), ) @@ -501,7 +501,7 @@ class LoggingNameTest(fixtures.TestBase): assert self.buf.buffer for name in [b.name for b in self.buf.buffer]: assert name in ( - "sqlalchemy.engine.base.Engine", + "sqlalchemy.engine.Engine", "sqlalchemy.pool.impl.%s" % eng.pool.__class__.__name__, ) diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index 13058fbe1c..1836b2e74a 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -12,16 +12,17 @@ from sqlalchemy import String from sqlalchemy import testing from sqlalchemy import text from sqlalchemy import VARCHAR +from sqlalchemy.future import select as future_select from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import eq_ from sqlalchemy.testing import expect_warnings from sqlalchemy.testing import fixtures +from sqlalchemy.testing import mock from sqlalchemy.testing import ne_ from sqlalchemy.testing.engines import testing_engine from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table - users, metadata = None, None @@ -1102,3 +1103,564 @@ class IsolationLevelTest(fixtures.TestBase): conn.get_isolation_level(), self._non_default_isolation_level() ) eq_(c2.get_isolation_level(), self._non_default_isolation_level()) + + +class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): + """The SQLAlchemy 2.0 Connection ensures its own transaction is rolled + back upon close. Therefore the whole "reset agent" thing can go away. + this suite runs through all the reset agent tests to ensure the state + of the transaction is maintained while the "reset agent" feature is not + needed at all. + + """ + + __backend__ = True + + def test_begin_close(self): + canary = mock.Mock() + with testing.db.connect() as connection: + event.listen(connection, "rollback", canary) + trans = connection.begin() + assert connection.connection._reset_agent is None + assert not trans.is_active + eq_(canary.mock_calls, [mock.call(connection)]) + + def test_begin_rollback(self): + canary = mock.Mock() + with testing.db.connect() as connection: + event.listen(connection, "rollback", canary) + trans = connection.begin() + assert connection.connection._reset_agent is None + trans.rollback() + assert connection.connection._reset_agent is None + assert not trans.is_active + eq_(canary.mock_calls, [mock.call(connection)]) + + def test_begin_commit(self): + canary = mock.Mock() + with testing.db.connect() as connection: + event.listen(connection, "rollback", canary.rollback) + event.listen(connection, "commit", canary.commit) + trans = connection.begin() + assert connection.connection._reset_agent is None + trans.commit() + assert connection.connection._reset_agent is None + assert not trans.is_active + eq_(canary.mock_calls, [mock.call.commit(connection)]) + + @testing.requires.savepoints + def test_begin_nested_close(self): + canary = mock.Mock() + with testing.db.connect() as connection: + event.listen(connection, "rollback", canary.rollback) + event.listen(connection, "commit", canary.commit) + trans = connection.begin_nested() + assert connection.connection._reset_agent is None + assert trans.is_active # it's a savepoint + eq_(canary.mock_calls, [mock.call.rollback(connection)]) + + @testing.requires.savepoints + def test_begin_begin_nested_close(self): + canary = mock.Mock() + with testing.db.connect() as connection: + event.listen(connection, "rollback", canary.rollback) + event.listen(connection, "commit", canary.commit) + trans = connection.begin() + trans2 = connection.begin_nested() + assert connection.connection._reset_agent is None + assert trans2.is_active # was never closed + assert not trans.is_active + eq_(canary.mock_calls, [mock.call.rollback(connection)]) + + @testing.requires.savepoints + def test_begin_begin_nested_rollback_commit(self): + canary = mock.Mock() + with testing.db.connect() as connection: + event.listen( + connection, "rollback_savepoint", canary.rollback_savepoint + ) + event.listen(connection, "rollback", canary.rollback) + event.listen(connection, "commit", canary.commit) + trans = connection.begin() + trans2 = connection.begin_nested() + assert connection.connection._reset_agent is None + trans2.rollback() # this is not a connection level event + assert connection.connection._reset_agent is None + trans.commit() + assert connection.connection._reset_agent is None + eq_( + canary.mock_calls, + [ + mock.call.rollback_savepoint(connection, mock.ANY, trans), + mock.call.commit(connection), + ], + ) + + @testing.requires.savepoints + def test_begin_begin_nested_rollback_rollback(self): + canary = mock.Mock() + with testing.db.connect() as connection: + event.listen(connection, "rollback", canary.rollback) + event.listen(connection, "commit", canary.commit) + trans = connection.begin() + trans2 = connection.begin_nested() + assert connection.connection._reset_agent is None + trans2.rollback() + assert connection.connection._reset_agent is None + trans.rollback() + assert connection.connection._reset_agent is None + eq_(canary.mock_calls, [mock.call.rollback(connection)]) + + @testing.requires.two_phase_transactions + def test_reset_via_agent_begin_twophase(self): + canary = mock.Mock() + with testing.db.connect() as connection: + event.listen(connection, "rollback", canary.rollback) + event.listen( + connection, "rollback_twophase", canary.rollback_twophase + ) + event.listen(connection, "commit", canary.commit) + trans = connection.begin_twophase() + assert connection.connection._reset_agent is None + assert not trans.is_active + eq_( + canary.mock_calls, + [mock.call.rollback_twophase(connection, mock.ANY, False)], + ) + + @testing.requires.two_phase_transactions + def test_reset_via_agent_begin_twophase_commit(self): + canary = mock.Mock() + with testing.db.connect() as connection: + event.listen(connection, "rollback", canary.rollback) + event.listen(connection, "commit", canary.commit) + event.listen(connection, "commit_twophase", canary.commit_twophase) + trans = connection.begin_twophase() + assert connection.connection._reset_agent is None + trans.commit() + assert connection.connection._reset_agent is None + eq_( + canary.mock_calls, + [mock.call.commit_twophase(connection, mock.ANY, False)], + ) + + @testing.requires.two_phase_transactions + def test_reset_via_agent_begin_twophase_rollback(self): + canary = mock.Mock() + with testing.db.connect() as connection: + event.listen(connection, "rollback", canary.rollback) + event.listen( + connection, "rollback_twophase", canary.rollback_twophase + ) + event.listen(connection, "commit", canary.commit) + trans = connection.begin_twophase() + assert connection.connection._reset_agent is None + trans.rollback() + assert connection.connection._reset_agent is None + eq_( + canary.mock_calls, + [mock.call.rollback_twophase(connection, mock.ANY, False)], + ) + + +class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + "users", + metadata, + Column("user_id", INT, primary_key=True, autoincrement=False), + Column("user_name", VARCHAR(20)), + test_needs_acid=True, + ) + Table( + "users_autoinc", + metadata, + Column( + "user_id", INT, primary_key=True, test_needs_autoincrement=True + ), + Column("user_name", VARCHAR(20)), + test_needs_acid=True, + ) + + def test_autobegin_rollback(self): + users = self.tables.users + with testing.db.connect() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + conn.rollback() + + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), 0 + ) + + @testing.requires.autocommit + def test_autocommit_isolation_level(self): + users = self.tables.users + + with testing.db.connect().execution_options( + isolation_level="AUTOCOMMIT" + ) as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + conn.rollback() + + with testing.db.connect() as conn: + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 1, + ) + + @testing.requires.autocommit + def test_no_autocommit_w_begin(self): + + with testing.db.begin() as conn: + assert_raises_message( + exc.InvalidRequestError, + "This connection has already begun a transaction; " + "isolation level may not be altered until transaction end", + conn.execution_options, + isolation_level="AUTOCOMMIT", + ) + + @testing.requires.autocommit + def test_no_autocommit_w_autobegin(self): + + with testing.db.connect() as conn: + conn.execute(future_select(1)) + + assert_raises_message( + exc.InvalidRequestError, + "This connection has already begun a transaction; " + "isolation level may not be altered until transaction end", + conn.execution_options, + isolation_level="AUTOCOMMIT", + ) + + conn.rollback() + + conn.execution_options(isolation_level="AUTOCOMMIT") + + def test_autobegin_commit(self): + users = self.tables.users + + with testing.db.connect() as conn: + + assert not conn.in_transaction() + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + assert conn.in_transaction() + conn.commit() + + assert not conn.in_transaction() + + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 1, + ) + + conn.execute(users.insert(), {"user_id": 2, "user_name": "name 2"}) + + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 2, + ) + + assert conn.in_transaction() + conn.rollback() + assert not conn.in_transaction() + + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 1, + ) + + def test_rollback_on_close(self): + canary = mock.Mock() + with testing.db.connect() as conn: + event.listen(conn, "rollback", canary) + conn.execute(select([1])) + assert conn.in_transaction() + + eq_(canary.mock_calls, [mock.call(conn)]) + + def test_no_on_close_no_transaction(self): + canary = mock.Mock() + with testing.db.connect() as conn: + event.listen(conn, "rollback", canary) + conn.execute(select([1])) + conn.rollback() + assert not conn.in_transaction() + + eq_(canary.mock_calls, [mock.call(conn)]) + + def test_rollback_on_exception(self): + canary = mock.Mock() + try: + with testing.db.connect() as conn: + event.listen(conn, "rollback", canary) + conn.execute(select([1])) + assert conn.in_transaction() + raise Exception("some error") + assert False + except: + pass + + eq_(canary.mock_calls, [mock.call(conn)]) + + def test_rollback_on_exception_if_no_trans(self): + canary = mock.Mock() + try: + with testing.db.connect() as conn: + event.listen(conn, "rollback", canary) + assert not conn.in_transaction() + raise Exception("some error") + assert False + except: + pass + + eq_(canary.mock_calls, []) + + def test_commit_no_begin(self): + with testing.db.connect() as conn: + assert not conn.in_transaction() + conn.commit() + + @testing.requires.independent_connections + def test_commit_inactive(self): + with testing.db.connect() as conn: + conn.begin() + conn.invalidate() + + assert_raises_message( + exc.InvalidRequestError, "Can't reconnect until", conn.commit + ) + + @testing.requires.independent_connections + def test_rollback_inactive(self): + users = self.tables.users + with testing.db.connect() as conn: + + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + conn.commit() + + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + conn.invalidate() + + assert_raises_message( + exc.StatementError, + "Can't reconnect", + conn.execute, + select([1]), + ) + + conn.rollback() + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 1, + ) + + def test_rollback_no_begin(self): + with testing.db.connect() as conn: + assert not conn.in_transaction() + conn.rollback() + + def test_rollback_end_ctx_manager(self): + with testing.db.begin() as conn: + assert conn.in_transaction() + conn.rollback() + + def test_explicit_begin(self): + users = self.tables.users + + with testing.db.connect() as conn: + assert not conn.in_transaction() + conn.begin() + assert conn.in_transaction() + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + conn.commit() + + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 1, + ) + + def test_no_double_begin(self): + with testing.db.connect() as conn: + conn.begin() + + assert_raises_message( + exc.InvalidRequestError, + "a transaction is already begun for this connection", + conn.begin, + ) + + def test_no_autocommit(self): + users = self.tables.users + + with testing.db.connect() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + with testing.db.connect() as conn: + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 0, + ) + + def test_begin_block(self): + users = self.tables.users + + with testing.db.begin() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + with testing.db.connect() as conn: + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 1, + ) + + @testing.requires.savepoints + def test_savepoint_one(self): + users = self.tables.users + + with testing.db.begin() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + savepoint = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 2, + ) + savepoint.rollback() + + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 1, + ) + + with testing.db.connect() as conn: + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 1, + ) + + @testing.requires.savepoints + def test_savepoint_two(self): + users = self.tables.users + + with testing.db.begin() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + savepoint = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 2, + ) + savepoint.commit() + + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 2, + ) + + with testing.db.connect() as conn: + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 2, + ) + + @testing.requires.savepoints + def test_savepoint_three(self): + users = self.tables.users + + with testing.db.begin() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + conn.rollback() + + assert not conn.in_transaction() + + with testing.db.connect() as conn: + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 0, + ) + + @testing.requires.savepoints + def test_savepoint_four(self): + users = self.tables.users + + with testing.db.begin() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + sp2 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) + + sp2.rollback() + + assert conn.in_transaction() + + with testing.db.connect() as conn: + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 2, + ) + + @testing.requires.savepoints + def test_savepoint_five(self): + users = self.tables.users + + with testing.db.begin() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + sp2 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) + + sp2.commit() + + assert conn.in_transaction() + + with testing.db.connect() as conn: + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 3, + ) + + @testing.requires.savepoints + def test_savepoint_six(self): + users = self.tables.users + + with testing.db.begin() as conn: + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + sp1 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + sp2 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) + + sp2.commit() + + sp1.rollback() + + assert conn.in_transaction() + + with testing.db.connect() as conn: + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 1, + ) diff --git a/test/ext/test_baked.py b/test/ext/test_baked.py index 9906339c2b..cdf8414b4b 100644 --- a/test/ext/test_baked.py +++ b/test/ext/test_baked.py @@ -385,7 +385,9 @@ class ResultPostCriteriaTest(BakedTest): with testing.db.connect() as conn: @event.listens_for(conn, "before_execute") - def before_execute(conn, clauseelement, multiparams, params): + def before_execute( + conn, clauseelement, multiparams, params, execution_options + ): assert "yes" in conn._execution_options bq = self.bakery(lambda s: s.query(User.id).order_by(User.id)) diff --git a/test/orm/test_deprecations.py b/test/orm/test_deprecations.py index c63379504b..d1c7a08a93 100644 --- a/test/orm/test_deprecations.py +++ b/test/orm/test_deprecations.py @@ -1,6 +1,7 @@ import sqlalchemy as sa from sqlalchemy import and_ from sqlalchemy import desc +from sqlalchemy import event from sqlalchemy import func from sqlalchemy import Integer from sqlalchemy import select @@ -38,10 +39,13 @@ from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures from sqlalchemy.testing import is_ from sqlalchemy.testing import is_true +from sqlalchemy.testing.mock import call +from sqlalchemy.testing.mock import Mock from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table from . import _fixtures from .inheritance import _poly_fixtures +from .test_events import _RemoveListeners from .test_options import PathTest as OptionsPathTest from .test_query import QueryTest @@ -1604,3 +1608,67 @@ class DistinctOrderByImplicitTest(QueryTest, AssertsCompiledSQL): "addresses_email_address FROM users, addresses " "ORDER BY users.id, users.name, addresses.email_address", ) + + +class SessionEventsTest(_RemoveListeners, _fixtures.FixtureTest): + run_inserts = None + + def test_on_bulk_update_hook(self): + User, users = self.classes.User, self.tables.users + + sess = Session() + canary = Mock() + + event.listen(sess, "after_bulk_update", canary.after_bulk_update) + + def legacy(ses, qry, ctx, res): + canary.after_bulk_update_legacy(ses, qry, ctx, res) + + event.listen(sess, "after_bulk_update", legacy) + + mapper(User, users) + + with testing.expect_deprecated( + 'The argument signature for the "SessionEvents.after_bulk_update" ' + "event listener" + ): + sess.query(User).update({"name": "foo"}) + + eq_(canary.after_bulk_update.call_count, 1) + + upd = canary.after_bulk_update.mock_calls[0][1][0] + eq_(upd.session, sess) + eq_( + canary.after_bulk_update_legacy.mock_calls, + [call(sess, upd.query, upd.context, upd.result)], + ) + + def test_on_bulk_delete_hook(self): + User, users = self.classes.User, self.tables.users + + sess = Session() + canary = Mock() + + event.listen(sess, "after_bulk_delete", canary.after_bulk_delete) + + def legacy(ses, qry, ctx, res): + canary.after_bulk_delete_legacy(ses, qry, ctx, res) + + event.listen(sess, "after_bulk_delete", legacy) + + mapper(User, users) + + with testing.expect_deprecated( + 'The argument signature for the "SessionEvents.after_bulk_delete" ' + "event listener" + ): + sess.query(User).delete() + + eq_(canary.after_bulk_delete.call_count, 1) + + upd = canary.after_bulk_delete.mock_calls[0][1][0] + eq_(upd.session, sess) + eq_( + canary.after_bulk_delete_legacy.mock_calls, + [call(sess, upd.query, upd.context, upd.result)], + ) diff --git a/test/orm/test_events.py b/test/orm/test_events.py index 225468baab..c1457289aa 100644 --- a/test/orm/test_events.py +++ b/test/orm/test_events.py @@ -1759,11 +1759,6 @@ class SessionEventsTest(_RemoveListeners, _fixtures.FixtureTest): event.listen(sess, "after_begin", canary.after_begin) event.listen(sess, "after_bulk_update", canary.after_bulk_update) - def legacy(ses, qry, ctx, res): - canary.after_bulk_update_legacy(ses, qry, ctx, res) - - event.listen(sess, "after_bulk_update", legacy) - mapper(User, users) sess.query(User).update({"name": "foo"}) @@ -1773,10 +1768,6 @@ class SessionEventsTest(_RemoveListeners, _fixtures.FixtureTest): upd = canary.after_bulk_update.mock_calls[0][1][0] eq_(upd.session, sess) - eq_( - canary.after_bulk_update_legacy.mock_calls, - [call(sess, upd.query, upd.context, upd.result)], - ) def test_on_bulk_delete_hook(self): User, users = self.classes.User, self.tables.users @@ -1787,11 +1778,6 @@ class SessionEventsTest(_RemoveListeners, _fixtures.FixtureTest): event.listen(sess, "after_begin", canary.after_begin) event.listen(sess, "after_bulk_delete", canary.after_bulk_delete) - def legacy(ses, qry, ctx, res): - canary.after_bulk_delete_legacy(ses, qry, ctx, res) - - event.listen(sess, "after_bulk_delete", legacy) - mapper(User, users) sess.query(User).delete() @@ -1801,10 +1787,6 @@ class SessionEventsTest(_RemoveListeners, _fixtures.FixtureTest): upd = canary.after_bulk_delete.mock_calls[0][1][0] eq_(upd.session, sess) - eq_( - canary.after_bulk_delete_legacy.mock_calls, - [call(sess, upd.query, upd.context, upd.result)], - ) def test_connection_emits_after_begin(self): sess, canary = self._listener_fixture(bind=testing.db) diff --git a/test/orm/test_transaction.py b/test/orm/test_transaction.py index 15244d9d25..2b32282ba8 100644 --- a/test/orm/test_transaction.py +++ b/test/orm/test_transaction.py @@ -1,5 +1,3 @@ -from __future__ import with_statement - from sqlalchemy import Column from sqlalchemy import event from sqlalchemy import exc as sa_exc @@ -9,6 +7,7 @@ from sqlalchemy import select from sqlalchemy import String from sqlalchemy import Table from sqlalchemy import testing +from sqlalchemy.future import Engine from sqlalchemy.orm import attributes from sqlalchemy.orm import create_session from sqlalchemy.orm import exc as orm_exc @@ -62,7 +61,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest): c.close() @engines.close_open_connections - def test_subtransaction_on_external(self): + def test_subtransaction_on_external_subtrans(self): users, User = self.tables.users, self.classes.User mapper(User, users) @@ -78,6 +77,22 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest): assert len(sess.query(User).all()) == 0 sess.close() + @engines.close_open_connections + def test_subtransaction_on_external_no_begin(self): + users, User = self.tables.users, self.classes.User + + mapper(User, users) + conn = testing.db.connect() + trans = conn.begin() + sess = create_session(bind=conn, autocommit=False, autoflush=True) + u = User(name="ed") + sess.add(u) + sess.flush() + sess.commit() # commit does nothing + trans.rollback() # rolls back + assert len(sess.query(User).all()) == 0 + sess.close() + @testing.requires.savepoints @engines.close_open_connections def test_external_nested_transaction(self): @@ -104,6 +119,71 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest): conn.close() raise + @engines.close_open_connections + def test_subtransaction_on_external_commit_future(self): + users, User = self.tables.users, self.classes.User + + mapper(User, users) + + engine = Engine._future_facade(testing.db) + + conn = engine.connect() + conn.begin() + + sess = create_session(bind=conn, autocommit=False, autoflush=True) + u = User(name="ed") + sess.add(u) + sess.flush() + sess.commit() # commit does nothing + conn.rollback() # rolls back + assert len(sess.query(User).all()) == 0 + sess.close() + + @engines.close_open_connections + def test_subtransaction_on_external_rollback_future(self): + users, User = self.tables.users, self.classes.User + + mapper(User, users) + + engine = Engine._future_facade(testing.db) + + conn = engine.connect() + conn.begin() + + sess = create_session(bind=conn, autocommit=False, autoflush=True) + u = User(name="ed") + sess.add(u) + sess.flush() + sess.rollback() # rolls back + conn.commit() # nothing to commit + assert len(sess.query(User).all()) == 0 + sess.close() + + @testing.requires.savepoints + @engines.close_open_connections + def test_savepoint_on_external_future(self): + users, User = self.tables.users, self.classes.User + + mapper(User, users) + + engine = Engine._future_facade(testing.db) + + with engine.connect() as conn: + conn.begin() + sess = create_session(bind=conn, autocommit=False, autoflush=True) + u1 = User(name="u1") + sess.add(u1) + sess.flush() + + sess.begin_nested() + u2 = User(name="u2") + sess.add(u2) + sess.flush() + sess.rollback() + + conn.commit() + assert len(sess.query(User).all()) == 1 + @testing.requires.savepoints def test_nested_accounting_new_items_removed(self): User, users = self.classes.User, self.tables.users @@ -174,6 +254,40 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest): == 2 ) + @testing.requires.savepoints + def test_heavy_nesting_future(self): + users = self.tables.users + + engine = Engine._future_facade(testing.db) + session = create_session(engine) + + session.begin() + session.connection().execute(users.insert().values(name="user1")) + session.begin(subtransactions=True) + session.begin_nested() + session.connection().execute(users.insert().values(name="user2")) + assert ( + session.connection() + .exec_driver_sql("select count(1) from users") + .scalar() + == 2 + ) + session.rollback() + assert ( + session.connection() + .exec_driver_sql("select count(1) from users") + .scalar() + == 1 + ) + session.connection().execute(users.insert().values(name="user3")) + session.commit() + assert ( + session.connection() + .exec_driver_sql("select count(1) from users") + .scalar() + == 2 + ) + @testing.requires.savepoints def test_dirty_state_transferred_deep_nesting(self): User, users = self.classes.User, self.tables.users @@ -767,17 +881,24 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest): return sess, u1 def test_execution_options_begin_transaction(self): - bind = mock.Mock() + bind = mock.Mock( + connect=mock.Mock( + return_value=mock.Mock( + _is_future=False, + execution_options=mock.Mock( + return_value=mock.Mock(_is_future=False) + ), + ) + ) + ) sess = Session(bind=bind) c1 = sess.connection(execution_options={"isolation_level": "FOO"}) + eq_(bind.mock_calls, [mock.call.connect()]) eq_( - bind.mock_calls, - [ - mock.call.connect(), - mock.call.connect().execution_options(isolation_level="FOO"), - mock.call.connect().execution_options().begin(), - ], + bind.connect().mock_calls, + [mock.call.execution_options(isolation_level="FOO")], ) + eq_(bind.connect().execution_options().mock_calls, [mock.call.begin()]) eq_(c1, bind.connect().execution_options()) def test_execution_options_ignored_mid_transaction(self): @@ -914,9 +1035,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest): with expect_warnings(".*during handling of a previous exception.*"): session.begin_nested() - savepoint = ( - session.connection()._Connection__transaction._savepoint - ) + savepoint = session.connection()._transaction._savepoint # force the savepoint to disappear session.connection().dialect.do_release_savepoint( diff --git a/test/requirements.py b/test/requirements.py index 669e0b7eb1..aac376dba1 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -1373,6 +1373,10 @@ class DefaultRequirements(SuiteRequirements): def mssql_freetds(self): return only_on(["mssql+pymssql"]) + @property + def legacy_engine(self): + return exclusions.skip_if(lambda config: config.db._is_future) + @property def ad_hoc_engines(self): return exclusions.skip_if( diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py index fd5aec5036..7352810ae7 100644 --- a/test/sql/test_defaults.py +++ b/test/sql/test_defaults.py @@ -535,7 +535,7 @@ class DefaultRoundTripTest(fixtures.TablesTest): result = connection.execute(t.select().order_by(t.c.col1)) today = datetime.date.today() eq_( - result.fetchall(), + list(result), [ ( x, @@ -715,9 +715,11 @@ class DefaultRoundTripTest(fixtures.TablesTest): "group 1", connection.execute, t.insert(), - {"col4": 7, "col7": 12, "col8": 19}, - {"col4": 7, "col8": 19}, - {"col4": 7, "col7": 12, "col8": 19}, + [ + {"col4": 7, "col7": 12, "col8": 19}, + {"col4": 7, "col8": 19}, + {"col4": 7, "col7": 12, "col8": 19}, + ], ) def test_insert_values(self, connection): @@ -834,6 +836,13 @@ class DefaultRoundTripTest(fixtures.TablesTest): eq_(55, row._mapping["col3"]) +class FutureDefaultRoundTripTest( + fixtures.FutureEngineMixin, DefaultRoundTripTest +): + + __backend__ = True + + class CTEDefaultTest(fixtures.TablesTest): __requires__ = ("ctes", "returning", "ctes_on_dml") __backend__ = True diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index 470417dd3c..1611dc1ba4 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -29,6 +29,7 @@ from sqlalchemy.engine import default from sqlalchemy.engine import result as _result from sqlalchemy.engine import Row from sqlalchemy.ext.compiler import compiles +from sqlalchemy.future import select as future_select from sqlalchemy.sql import ColumnElement from sqlalchemy.sql import expression from sqlalchemy.sql.selectable import TextualSelect @@ -2237,3 +2238,98 @@ class AlternateResultProxyTest(fixtures.TablesTest): le_(len(result.cursor_strategy._rowbuffer), max_size) eq_(checks, assertion) + + +class FutureResultTest(fixtures.FutureEngineMixin, fixtures.TablesTest): + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + "users", + metadata, + Column("user_id", INT, primary_key=True, autoincrement=False), + Column("user_name", VARCHAR(20)), + Column("x", Integer), + Column("y", Integer), + test_needs_acid=True, + ) + Table( + "users_autoinc", + metadata, + Column( + "user_id", INT, primary_key=True, test_needs_autoincrement=True + ), + Column("user_name", VARCHAR(20)), + test_needs_acid=True, + ) + + def test_fetchall(self, connection): + users = self.tables.users + connection.execute( + users.insert(), + [ + {"user_id": 7, "user_name": "jack", "x": 1, "y": 2}, + {"user_id": 8, "user_name": "ed", "x": 2, "y": 3}, + {"user_id": 9, "user_name": "fred", "x": 15, "y": 20}, + ], + ) + + result = connection.execute( + future_select(users).order_by(users.c.user_id) + ) + eq_( + result.all(), + [(7, "jack", 1, 2), (8, "ed", 2, 3), (9, "fred", 15, 20)], + ) + + @testing.combinations( + ((1, 0), [("jack", 7), ("ed", 8), ("fred", 9)]), + ((3,), [(2,), (3,), (20,)]), + ((-2, -1), [(1, 2), (2, 3), (15, 20)]), + argnames="columns, expected", + ) + def test_columns(self, connection, columns, expected): + users = self.tables.users + connection.execute( + users.insert(), + [ + {"user_id": 7, "user_name": "jack", "x": 1, "y": 2}, + {"user_id": 8, "user_name": "ed", "x": 2, "y": 3}, + {"user_id": 9, "user_name": "fred", "x": 15, "y": 20}, + ], + ) + + result = connection.execute( + future_select(users).order_by(users.c.user_id) + ) + eq_(result.columns(*columns).all(), expected) + + def test_partitions(self, connection): + users = self.tables.users + connection.execute( + users.insert(), + [ + { + "user_id": i, + "user_name": "user %s" % i, + "x": i * 5, + "y": i * 20, + } + for i in range(500) + ], + ) + + result = connection.execute( + future_select(users).order_by(users.c.user_id) + ) + + start = 0 + for partition in result.columns(0, 1).partitions(20): + eq_( + partition, + [(i, "user %s" % i) for i in range(start, start + 20)], + ) + start += 20 + + assert result._soft_closed diff --git a/test/sql/test_sequences.py b/test/sql/test_sequences.py index 8beee514a1..1d78c0904d 100644 --- a/test/sql/test_sequences.py +++ b/test/sql/test_sequences.py @@ -279,6 +279,11 @@ class SequenceExecTest(fixtures.TestBase): self._assert_seq_result(r.inserted_primary_key[0]) +class FutureSequenceExecTest(fixtures.FutureEngineMixin, SequenceExecTest): + __requires__ = ("sequences",) + __backend__ = True + + class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): __requires__ = ("sequences",) __backend__ = True @@ -396,6 +401,11 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): eq_(result.inserted_primary_key, [1]) +class FutureSequenceTest(fixtures.FutureEngineMixin, SequenceTest): + __requires__ = ("sequences",) + __backend__ = True + + class TableBoundSequenceTest(fixtures.TablesTest): __requires__ = ("sequences",) __backend__ = True -- 2.47.2