Library-level (but not driver level) "Autocommit" removed from both Core and ORM
================================================================================
-.. admonition:: Certainty: almost definitely
+.. admonition:: Certainty: definite
+
+ Review the new future API for engines and connections at:
+
+ :class:`_future.Connection`
+
+ :class:`.future.Engine`
+
+ :func:`_future.create_engine`
"autocommit" at the ORM level is already not a widely used pattern except to
the degree that the ``.begin()`` call is desirable, and a new flag
execute() method more strict, .execution_options() are available on ORM Session
================================================================================
-.. admonition:: Certainty: tentative
+.. admonition:: Certainty: definite
+
+ Review the new future API for connections at:
+
+ :class:`_future.Connection`
- Pending further prototyping, this is part of a larger plan that impacts
- statement compilation, execution, and result processing.
The use of execution options is expected to be more prominent as the Core and
ORM are largely unified at the statement handling level. To suit this,
ResultProxy replaced with Result which has more refined methods and behaviors
=============================================================================
-.. admonition:: Certainty: tentative
+.. admonition:: Certainty: definite
+
+ Review the new future API for result sets:
+
+ :class:`_future.Result`
- This is again part of the rearchitecture of "execute()" internals and is
- pending further prototyping.
A major goal of SQLAlchemy 2.0 is to unify how "results" are handled between
the ORM and Core. Towards this goal, version 1.4 will already standardized
--- /dev/null
+.. change::
+ :tags: feature, engine, alchemy2
+ :tickets: 4644
+
+ Implemented the SQLAlchemy 2 :func:`_future.create_engine` function which
+ is used for forwards compatibility with SQLAlchemy 2. This engine
+ features always-transactional behavior with autobegin.
+
+ .. seealso::
+
+ :ref:`migration_20_toplevel`
.. module:: sqlalchemy.future
+.. autoclass:: sqlalchemy.future.Connection
+ :members:
+
+.. autofunction:: sqlalchemy.future.create_engine
+
+.. autoclass:: sqlalchemy.future.Engine
+ :members:
.. autofunction:: sqlalchemy.future.select
static PyObject *
distill_params(PyObject *self, PyObject *args)
{
+ // TODO: pass the Connection in so that there can be a standard
+ // method for warning on parameter format
+
PyObject *multiparams, *params;
PyObject *enclosing_list, *double_enclosing_list;
PyObject *zero_element, *zero_element_item;
if (multiparam_size == 0) {
if (params != Py_None && PyDict_Size(params) != 0) {
+ // TODO: this is keyword parameters, emit parameter format
+ // deprecation warning
enclosing_list = PyList_New(1);
if (enclosing_list == NULL) {
return NULL;
}
}
else {
+ // TODO: this is multiple positional params, emit parameter format
+ // deprecation warning
zero_element = PyTuple_GetItem(multiparams, 0);
if (PyObject_HasAttrString(zero_element, "__iter__") &&
!PyObject_HasAttrString(zero_element, "strip")
@_db_plus_owner
def has_table(self, connection, tablename, dbname, owner, schema):
- columns = ischema.columns
+ tables = ischema.tables
- whereclause = columns.c.table_name == tablename
+ s = sql.select([tables.c.table_name]).where(
+ sql.and_(
+ tables.c.table_type == "BASE TABLE",
+ tables.c.table_name == tablename,
+ )
+ )
if owner:
- whereclause = sql.and_(
- whereclause, columns.c.table_schema == owner
- )
- s = sql.select([columns], whereclause)
+ s = s.where(tables.c.table_schema == owner)
+
c = connection.execute(s)
+
return c.first() is not None
@reflection.cache
@_db_plus_owner_listing
def get_table_names(self, connection, dbname, owner, schema, **kw):
tables = ischema.tables
- s = sql.select(
- [tables.c.table_name],
- sql.and_(
- tables.c.table_schema == owner,
- tables.c.table_type == "BASE TABLE",
- ),
- order_by=[tables.c.table_name],
+ s = (
+ sql.select([tables.c.table_name])
+ .where(
+ sql.and_(
+ tables.c.table_schema == owner,
+ tables.c.table_type == "BASE TABLE",
+ )
+ )
+ .order_by(tables.c.table_name)
)
table_names = [r[0] for r in connection.execute(s)]
return table_names
raise
def do_begin_twophase(self, connection, xid):
- connection.execute(sql.text("XA BEGIN :xid"), xid=xid)
+ connection.execute(sql.text("XA BEGIN :xid"), dict(xid=xid))
def do_prepare_twophase(self, connection, xid):
- connection.execute(sql.text("XA END :xid"), xid=xid)
- connection.execute(sql.text("XA PREPARE :xid"), xid=xid)
+ connection.execute(sql.text("XA END :xid"), dict(xid=xid))
+ connection.execute(sql.text("XA PREPARE :xid"), dict(xid=xid))
def do_rollback_twophase(
self, connection, xid, is_prepared=True, recover=False
):
if not is_prepared:
- connection.execute(sql.text("XA END :xid"), xid=xid)
- connection.execute(sql.text("XA ROLLBACK :xid"), xid=xid)
+ connection.execute(sql.text("XA END :xid"), dict(xid=xid))
+ connection.execute(sql.text("XA ROLLBACK :xid"), dict(xid=xid))
def do_commit_twophase(
self, connection, xid, is_prepared=True, recover=False
):
if not is_prepared:
self.do_prepare_twophase(connection, xid)
- connection.execute(sql.text("XA COMMIT :xid"), xid=xid)
+ connection.execute(sql.text("XA COMMIT :xid"), dict(xid=xid))
def do_recover_twophase(self, connection):
resultset = connection.exec_driver_sql("XA RECOVER")
"WHERE TABLE_NAME=:name AND "
"TABLE_SCHEMA=:schema_name"
),
- name=sequence_name,
- schema_name=schema,
+ dict(name=sequence_name, schema_name=schema),
)
return cursor.first() is not None
:table_data;
"""
).bindparams(sql.bindparam("table_data", expanding=True)),
- table_data=col_tuples,
+ dict(table_data=col_tuples),
)
# in casing=0, table name and schema name come back in their
"SELECT table_name FROM all_tables "
"WHERE table_name = :name AND owner = :schema_name"
),
- name=self.denormalize_name(table_name),
- schema_name=self.denormalize_name(schema),
+ dict(
+ name=self.denormalize_name(table_name),
+ schema_name=self.denormalize_name(schema),
+ ),
)
return cursor.first() is not None
"WHERE sequence_name = :name AND "
"sequence_owner = :schema_name"
),
- name=self.denormalize_name(sequence_name),
- schema_name=self.denormalize_name(schema),
+ dict(
+ name=self.denormalize_name(sequence_name),
+ schema_name=self.denormalize_name(schema),
+ ),
)
return cursor.first() is not None
q += " AND ".join(clauses)
result = connection.execution_options(future_result=True).execute(
- sql.text(q), **params
+ sql.text(q), params
)
if desired_owner:
row = result.mappings().first()
"OWNER = :owner " "AND IOT_NAME IS NULL " "AND DURATION IS NULL"
)
- cursor = connection.execute(sql.text(sql_str), owner=schema)
+ cursor = connection.execute(sql.text(sql_str), dict(owner=schema))
return [self.normalize_name(row[0]) for row in cursor]
@reflection.cache
"AND DURATION IS NOT NULL"
)
- cursor = connection.execute(sql.text(sql_str), owner=schema)
+ cursor = connection.execute(sql.text(sql_str), dict(owner=schema))
return [self.normalize_name(row[0]) for row in cursor]
@reflection.cache
def get_view_names(self, connection, schema=None, **kw):
schema = self.denormalize_name(schema or self.default_schema_name)
s = sql.text("SELECT view_name FROM all_views WHERE owner = :owner")
- cursor = connection.execute(s, owner=self.denormalize_name(schema))
+ cursor = connection.execute(
+ s, dict(owner=self.denormalize_name(schema))
+ )
return [self.normalize_name(row[0]) for row in cursor]
@reflection.cache
text += " AND owner = :owner "
text = text % {"dblink": dblink, "columns": ", ".join(columns)}
- result = connection.execute(sql.text(text), **params)
+ result = connection.execute(sql.text(text), params)
enabled = dict(DISABLED=False, ENABLED=True)
text += " ORDER BY col.column_id"
text = text % {"dblink": dblink, "char_length_col": char_length_col}
- c = connection.execute(sql.text(text), **params)
+ c = connection.execute(sql.text(text), params)
for row in c:
colname = self.normalize_name(row[0])
"""
c = connection.execute(
- sql.text(COMMENT_SQL), table_name=table_name, schema_name=schema
+ sql.text(COMMENT_SQL),
+ dict(table_name=table_name, schema_name=schema),
)
return {"text": c.scalar()}
text = text % {"dblink": dblink}
q = sql.text(text)
- rp = connection.execute(q, **params)
+ rp = connection.execute(q, params)
indexes = []
last_index_name = None
pk_constraint = self.get_pk_constraint(
)
text = text % {"dblink": dblink}
- rp = connection.execute(sql.text(text), **params)
+ rp = connection.execute(sql.text(text), params)
constraint_data = rp.fetchall()
return constraint_data
text += " AND owner = :schema"
params["schema"] = schema
- rp = connection.execute(sql.text(text), **params).scalar()
+ rp = connection.execute(sql.text(text), params).scalar()
if rp:
if util.py2k:
rp = rp.decode(self.encoding)
s = s.columns(oid=sqltypes.Integer)
if schema:
s = s.bindparams(sql.bindparam("schema", type_=sqltypes.Unicode))
- c = connection.execute(s, table_name=table_name, schema=schema)
+ c = connection.execute(s, dict(table_name=table_name, schema=schema))
table_oid = c.scalar()
if table_oid is None:
raise exc.NoSuchTableError(table_name)
pgd.objoid = :table_oid
"""
- c = connection.execute(sql.text(COMMENT_SQL), table_oid=table_oid)
+ c = connection.execute(
+ sql.text(COMMENT_SQL), dict(table_oid=table_oid)
+ )
return {"text": c.scalar()}
@reflection.cache
from .interfaces import Connectable
from .interfaces import ExceptionContext
from .util import _distill_params
+from .util import _distill_params_20
from .. import exc
from .. import inspection
from .. import log
"""
_schema_translate_map = None
+ _is_future = False
+ _sqla_logger_namespace = "sqlalchemy.engine.Connection"
def __init__(
self,
if connection is not None
else engine.raw_connection()
)
- self.__transaction = None
+ self._transaction = None
self.__savepoint_seq = 0
self.should_close_with_result = close_with_result
else:
return self
- def _clone(self):
- """Create a shallow copy of this Connection.
+ def _generate_for_options(self):
+ """define connection method chaining behavior for execution_options"""
- """
- c = self.__class__.__new__(self.__class__)
- c.__dict__ = self.__dict__.copy()
- return c
+ if self._is_future:
+ return self
+ else:
+ c = self.__class__.__new__(self.__class__)
+ c.__dict__ = self.__dict__.copy()
+ return c
def __enter__(self):
return self
""" # noqa
- c = self._clone()
+ c = self._generate_for_options()
c._execution_options = c._execution_options.union(opt)
if self._has_events or self.engine._has_events:
self.dispatch.set_connection_execution_options(c, opt)
if self.__branch_from:
return self.__branch_from._revalidate_connection()
if self.__can_reconnect and self.__invalid:
- if self.__transaction is not None:
+ if self._transaction is not None:
raise exc.InvalidRequestError(
"Can't reconnect until invalid "
"transaction is rolled back"
:class:`_engine.Engine`
"""
- if self.__branch_from:
+ if self._is_future:
+ assert not self.__branch_from
+ elif self.__branch_from:
return self.__branch_from.begin()
- if self.__transaction is None:
- self.__transaction = RootTransaction(self)
- return self.__transaction
+ if self._transaction is None:
+ self._transaction = RootTransaction(self)
+ return self._transaction
else:
- return Transaction(self, self.__transaction)
+ if self._is_future:
+ raise exc.InvalidRequestError(
+ "a transaction is already begun for this connection"
+ )
+ else:
+ return Transaction(self, self._transaction)
def begin_nested(self):
"""Begin a nested transaction and return a transaction handle.
:meth:`_engine.Connection.begin_twophase`
"""
- if self.__branch_from:
+ if self._is_future:
+ assert not self.__branch_from
+ elif self.__branch_from:
return self.__branch_from.begin_nested()
- if self.__transaction is None:
- self.__transaction = RootTransaction(self)
- else:
- self.__transaction = NestedTransaction(self, self.__transaction)
- return self.__transaction
+ if self._transaction is None:
+ if self._is_future:
+ self._autobegin()
+ else:
+ self._transaction = RootTransaction(self)
+ return self._transaction
+
+ trans = NestedTransaction(self, self._transaction)
+ if not self._is_future:
+ self._transaction = trans
+ return trans
def begin_twophase(self, xid=None):
"""Begin a two-phase or XA transaction and return a transaction
if self.__branch_from:
return self.__branch_from.begin_twophase(xid=xid)
- if self.__transaction is not None:
+ if self._transaction is not None:
raise exc.InvalidRequestError(
"Cannot start a two phase transaction when a transaction "
"is already in progress."
)
if xid is None:
xid = self.engine.dialect.create_xid()
- self.__transaction = TwoPhaseTransaction(self, xid)
- return self.__transaction
+ self._transaction = TwoPhaseTransaction(self, xid)
+ return self._transaction
def recover_twophase(self):
return self.engine.dialect.do_recover_twophase(self)
def in_transaction(self):
"""Return True if a transaction is in progress."""
return (
- self._root.__transaction is not None
- and self._root.__transaction.is_active
+ self._root._transaction is not None
+ and self._root._transaction.is_active
)
def _begin_impl(self, transaction):
try:
self.engine.dialect.do_begin(self.connection)
- if self.connection._reset_agent is None:
+ if not self._is_future and self.connection._reset_agent is None:
self.connection._reset_agent = transaction
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
finally:
if (
not self.__invalid
- and self.connection._reset_agent is self.__transaction
+ and self.connection._reset_agent is self._transaction
):
self.connection._reset_agent = None
finally:
if (
not self.__invalid
- and self.connection._reset_agent is self.__transaction
+ and self.connection._reset_agent is self._transaction
):
self.connection._reset_agent = None
- self.__transaction = None
+ self._transaction = None
def _savepoint_impl(self, name=None):
assert not self.__branch_from
return name
def _discard_transaction(self, trans):
- if trans is self.__transaction:
+ if trans is self._transaction:
if trans._is_root:
assert trans._parent is trans
- self.__transaction = None
+ self._transaction = None
else:
assert trans._parent is not trans
- self.__transaction = trans._parent
+ self._transaction = trans._parent
def _rollback_to_savepoint_impl(
self, name, context, deactivate_only=False
if self._still_open_and_connection_is_valid:
self.engine.dialect.do_release_savepoint(self, name)
- self.__transaction = context
+ self._transaction = context
def _begin_twophase_impl(self, transaction):
assert not self.__branch_from
if self._still_open_and_connection_is_valid:
self.engine.dialect.do_begin_twophase(self, transaction.xid)
- if self.connection._reset_agent is None:
+ if not self._is_future and self.connection._reset_agent is None:
self.connection._reset_agent = transaction
def _prepare_twophase_impl(self, xid):
self.dispatch.prepare_twophase(self, xid)
if self._still_open_and_connection_is_valid:
- assert isinstance(self.__transaction, TwoPhaseTransaction)
+ assert isinstance(self._transaction, TwoPhaseTransaction)
self.engine.dialect.do_prepare_twophase(self, xid)
def _rollback_twophase_impl(self, xid, is_prepared):
self.dispatch.rollback_twophase(self, xid, is_prepared)
if self._still_open_and_connection_is_valid:
- assert isinstance(self.__transaction, TwoPhaseTransaction)
+ assert isinstance(self._transaction, TwoPhaseTransaction)
try:
self.engine.dialect.do_rollback_twophase(
self, xid, is_prepared
)
finally:
- if self.connection._reset_agent is self.__transaction:
+ if self.connection._reset_agent is self._transaction:
self.connection._reset_agent = None
- self.__transaction = None
+ self._transaction = None
else:
- self.__transaction = None
+ self._transaction = None
def _commit_twophase_impl(self, xid, is_prepared):
assert not self.__branch_from
self.dispatch.commit_twophase(self, xid, is_prepared)
if self._still_open_and_connection_is_valid:
- assert isinstance(self.__transaction, TwoPhaseTransaction)
+ assert isinstance(self._transaction, TwoPhaseTransaction)
try:
self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
finally:
- if self.connection._reset_agent is self.__transaction:
+ if self.connection._reset_agent is self._transaction:
self.connection._reset_agent = None
- self.__transaction = None
+ self._transaction = None
else:
- self.__transaction = None
+ self._transaction = None
+
+ def _autobegin(self):
+ assert self._is_future
+
+ return self.begin()
def _autorollback(self):
if not self._root.in_transaction():
and will allow no further operations.
"""
+ assert not self._is_future
+
if self.__branch_from:
util.warn_deprecated_20(
"The .close() method on a so-called 'branched' connection is "
else:
conn.close()
- if conn._reset_agent is self.__transaction:
+ if conn._reset_agent is self._transaction:
conn._reset_agent = None
# the close() process can end up invalidating us,
if not self.__invalid:
del self.__connection
self.__can_reconnect = False
- self.__transaction = None
+ self._transaction = None
def scalar(self, object_, *multiparams, **params):
"""Executes and returns the first column of the first row.
"or the Connection.exec_driver_sql() method to invoke a "
"driver-level SQL string."
)
- distilled_params = _distill_params(multiparams, params)
- return self._exec_driver_sql_distilled(object_, distilled_params)
+ distilled_parameters = _distill_params(multiparams, params)
+
+ return self._exec_driver_sql(
+ object_, multiparams, params, distilled_parameters
+ )
try:
meth = object_._execute_on_connection
except AttributeError as err:
exc.ObjectNotExecutableError(object_), replace_context=err
)
else:
- return meth(self, multiparams, params)
+ return meth(self, multiparams, params, util.immutabledict())
- def _execute_function(self, func, multiparams, params):
+ def _execute_function(
+ self, func, multiparams, params, execution_options=util.immutabledict()
+ ):
"""Execute a sql.FunctionElement object."""
return self._execute_clauseelement(func.select(), multiparams, params)
- def _execute_default(self, default, multiparams, params):
+ def _execute_default(
+ self,
+ default,
+ multiparams,
+ params,
+ execution_options=util.immutabledict(),
+ ):
"""Execute a schema.ColumnDefault object."""
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_execute:
default, multiparams, params = fn(
- self, default, multiparams, params
+ self, default, multiparams, params, execution_options
)
try:
conn = self._revalidate_connection()
dialect = self.dialect
- ctx = dialect.execution_ctx_cls._init_default(dialect, self, conn)
+ ctx = dialect.execution_ctx_cls._init_default(
+ dialect, self, conn, execution_options
+ )
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
if self._has_events or self.engine._has_events:
self.dispatch.after_execute(
- self, default, multiparams, params, ret
+ self, default, multiparams, params, execution_options, ret
)
return ret
- def _execute_ddl(self, ddl, multiparams, params):
+ def _execute_ddl(
+ self, ddl, multiparams, params, execution_options=util.immutabledict()
+ ):
"""Execute a schema.DDL object."""
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_execute:
- ddl, multiparams, params = fn(self, ddl, multiparams, params)
+ ddl, multiparams, params = fn(
+ self, ddl, multiparams, params, execution_options
+ )
dialect = self.dialect
dialect.execution_ctx_cls._init_ddl,
compiled,
None,
+ execution_options,
compiled,
)
if self._has_events or self.engine._has_events:
- self.dispatch.after_execute(self, ddl, multiparams, params, ret)
+ self.dispatch.after_execute(
+ self, ddl, multiparams, params, execution_options, ret
+ )
return ret
- def _execute_clauseelement(self, elem, multiparams, params):
+ def _execute_clauseelement(
+ self, elem, multiparams, params, execution_options=util.immutabledict()
+ ):
"""Execute a sql.ClauseElement object."""
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_execute:
- elem, multiparams, params = fn(self, elem, multiparams, params)
+ elem, multiparams, params = fn(
+ self, elem, multiparams, params, execution_options
+ )
distilled_params = _distill_params(multiparams, params)
if distilled_params:
dialect = self.dialect
- if "compiled_cache" in self._execution_options:
- elem_cache_key, extracted_params = elem._generate_cache_key()
+ exec_opts = self._execution_options
+ if execution_options:
+ exec_opts = exec_opts.union(execution_options)
+
+ if "compiled_cache" in exec_opts:
+ elem_cache_key = elem._generate_cache_key()
+ else:
+ elem_cache_key = None
+
+ if elem_cache_key:
+ cache_key, extracted_params = elem_cache_key
key = (
dialect,
- elem_cache_key,
+ cache_key,
tuple(sorted(keys)),
bool(self._schema_translate_map),
len(distilled_params) > 1,
)
- cache = self._execution_options["compiled_cache"]
+ cache = exec_opts["compiled_cache"]
compiled_sql = cache.get(key)
if compiled_sql is None:
compiled_sql = elem.compile(
dialect=dialect,
- cache_key=(elem_cache_key, extracted_params),
+ cache_key=elem_cache_key,
column_keys=keys,
inline=len(distilled_params) > 1,
schema_translate_map=self._schema_translate_map,
dialect.execution_ctx_cls._init_compiled,
compiled_sql,
distilled_params,
+ execution_options,
compiled_sql,
distilled_params,
elem,
extracted_params,
)
if self._has_events or self.engine._has_events:
- self.dispatch.after_execute(self, elem, multiparams, params, ret)
+ self.dispatch.after_execute(
+ self, elem, multiparams, params, execution_options, ret
+ )
return ret
- def _execute_compiled(self, compiled, multiparams, params):
+ def _execute_compiled(
+ self,
+ compiled,
+ multiparams,
+ params,
+ execution_options=util.immutabledict(),
+ ):
"""Execute a sql.Compiled object."""
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_execute:
compiled, multiparams, params = fn(
- self, compiled, multiparams, params
+ self, compiled, multiparams, params, execution_options
)
dialect = self.dialect
dialect.execution_ctx_cls._init_compiled,
compiled,
parameters,
+ execution_options,
compiled,
parameters,
None,
)
if self._has_events or self.engine._has_events:
self.dispatch.after_execute(
- self, compiled, multiparams, params, ret
+ self, compiled, multiparams, params, execution_options, ret
)
return ret
- def _exec_driver_sql_distilled(self, statement, parameters):
+ def _exec_driver_sql(
+ self,
+ statement,
+ multiparams,
+ params,
+ distilled_parameters,
+ execution_options=util.immutabledict(),
+ ):
if self._has_events or self.engine._has_events:
for fn in self.dispatch.before_execute:
statement, multiparams, params = fn(
- self, statement, parameters, {}
+ self, statement, multiparams, params, execution_options
)
dialect = self.dialect
dialect,
dialect.execution_ctx_cls._init_statement,
statement,
- parameters,
+ distilled_parameters,
+ execution_options,
statement,
- parameters,
+ distilled_parameters,
)
if self._has_events or self.engine._has_events:
- self.dispatch.after_execute(self, statement, parameters, {})
+ self.dispatch.after_execute(
+ self, statement, multiparams, params, execution_options, ret
+ )
return ret
- def exec_driver_sql(self, statement, parameters=None):
+ def _execute_20(
+ self,
+ statement,
+ parameters=None,
+ execution_options=util.immutabledict(),
+ ):
+ multiparams, params, distilled_parameters = _distill_params_20(
+ parameters
+ )
+ try:
+ meth = statement._execute_on_connection
+ except AttributeError as err:
+ util.raise_(
+ exc.ObjectNotExecutableError(statement), replace_context=err
+ )
+ else:
+ return meth(self, multiparams, params, execution_options)
+
+ def exec_driver_sql(
+ self, statement, parameters=None, execution_options=None
+ ):
r"""Executes a SQL statement construct and returns a
:class:`_engine.ResultProxy`.
"""
- if isinstance(parameters, list) and parameters:
- if not isinstance(parameters[0], (dict, tuple)):
- raise exc.ArgumentError(
- "List argument must consist only of tuples or dictionaries"
- )
- elif isinstance(parameters, (dict, tuple)):
- parameters = [parameters]
+ multiparams, params, distilled_parameters = _distill_params_20(
+ parameters
+ )
- return self._exec_driver_sql_distilled(statement, parameters or ())
+ return self._exec_driver_sql(
+ statement,
+ multiparams,
+ params,
+ distilled_parameters,
+ execution_options,
+ )
def _execute_context(
- self, dialect, constructor, statement, parameters, *args
+ self,
+ dialect,
+ constructor,
+ statement,
+ parameters,
+ execution_options,
+ *args
):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.ResultProxy`."""
+ if execution_options:
+ dialect.set_exec_execution_options(self, execution_options)
+
try:
try:
conn = self.__connection
if conn is None:
conn = self._revalidate_connection()
- context = constructor(dialect, self, conn, *args)
+ context = constructor(
+ dialect, self, conn, execution_options, *args
+ )
except BaseException as e:
self._handle_dbapi_exception(
e, util.text_type(statement), parameters, None, None
)
- if self._root.__transaction and not self._root.__transaction.is_active:
+ if self._root._transaction and not self._root._transaction.is_active:
raise exc.InvalidRequestError(
"This connection is on an inactive %stransaction. "
"Please rollback() fully before proceeding."
% (
"savepoint "
- if isinstance(self.__transaction, NestedTransaction)
+ if isinstance(self._transaction, NestedTransaction)
else ""
),
code="8s2a",
)
+
+ if self._is_future and self._root._transaction is None:
+ self._autobegin()
+
if context.compiled:
context.pre_exec()
result = context._setup_result_proxy()
- if context.should_autocommit and self._root.__transaction is None:
+ if (
+ not self._is_future
+ and context.should_autocommit
+ and self._root._transaction is None
+ ):
self._root._commit_impl(autocommit=True)
# for "connectionless" execution, we have to close this
# Connection after the statement is complete.
if self.should_close_with_result:
+ assert not self._is_future
assert not context._is_future_result
# ResultProxy already exhausted rows / has no rows.
self.engine.pool._invalidate(dbapi_conn_wrapper, e)
self.invalidate(e)
if self.should_close_with_result:
+ assert not self._is_future
self.close()
@classmethod
_execution_options = util.immutabledict()
_has_events = False
_connection_cls = Connection
+ _sqla_logger_namespace = "sqlalchemy.engine.Engine"
+ _is_future = False
_schema_translate_map = None
"""
- return OptionEngine(self, opt)
+ return self._option_cls(self, opt)
def get_execution_options(self):
""" Get the non-SQL options which will take effect during execution.
if type_ is not None:
self.transaction.rollback()
else:
- self.transaction.commit()
+ if self.transaction.is_active:
+ self.transaction.commit()
if not self.close_with_result:
self.conn.close()
for a particular :class:`_engine.Connection`.
"""
- conn = self.connect(close_with_result=close_with_result)
+ if self._connection_cls._is_future:
+ conn = self.connect()
+ else:
+ conn = self.connect(close_with_result=close_with_result)
try:
trans = conn.begin()
except:
return self._wrap_pool_connect(self.pool.connect, _connection)
-class OptionEngine(Engine):
+class OptionEngineMixin(object):
_sa_propagate_class_events = False
def __init__(self, proxied, execution_options):
self.__dict__["_has_events"] = value
_has_events = property(_get_has_events, _set_has_events)
+
+
+class OptionEngine(OptionEngineMixin, Engine):
+ pass
+
+
+Engine._option_cls = OptionEngine
pool._dialect = dialect
# create engine.
- engineclass = base.Engine
+ engineclass = kwargs.pop("_future_engine_class", base.Engine)
+
engine_args = {}
for k in util.get_cls_kwargs(engineclass):
if k in kwargs:
if "schema_translate_map" in opts:
connection._schema_translate_map = opts["schema_translate_map"]
+ def set_exec_execution_options(self, connection, opts):
+ if "isolation_level" in opts:
+ raise exc.InvalidRequestError(
+ "The 'isolation_level' execution "
+ "option is not supported at the per-statement level"
+ )
+ self._set_connection_isolation(connection, opts["isolation_level"])
+
+ if "schema_translate_map" in opts:
+ raise exc.InvalidRequestError(
+ "The 'schema_translate_map' execution "
+ "option is not supported at the per-statement level"
+ )
+
def _set_connection_isolation(self, connection, level):
if connection.in_transaction():
- util.warn(
- "Connection is already established with a Transaction; "
- "setting isolation_level may implicitly rollback or commit "
- "the existing transaction, or have no effect until "
- "next transaction"
- )
+ if connection._is_future:
+ raise exc.InvalidRequestError(
+ "This connection has already begun a transaction; "
+ "isolation level may not be altered until transaction end"
+ )
+ else:
+ util.warn(
+ "Connection is already established with a Transaction; "
+ "setting isolation_level may implicitly rollback or "
+ "commit "
+ "the existing transaction, or have no effect until "
+ "next transaction"
+ )
self.set_isolation_level(connection.connection, level)
connection.connection._connection_record.finalize_callback.append(
self.reset_isolation_level
statement = None
result_column_struct = None
returned_defaults = None
+ execution_options = util.immutabledict()
_is_implicit_returning = False
_is_explicit_returning = False
_is_future_result = False
_expanded_parameters = util.immutabledict()
@classmethod
- def _init_ddl(cls, dialect, connection, dbapi_connection, compiled_ddl):
+ def _init_ddl(
+ cls,
+ dialect,
+ connection,
+ dbapi_connection,
+ execution_options,
+ compiled_ddl,
+ ):
"""Initialize execution context for a DDLElement construct."""
self = cls.__new__(cls)
self.execution_options = compiled.execution_options
if connection._execution_options:
- self.execution_options = dict(self.execution_options)
- self.execution_options.update(connection._execution_options)
+ self.execution_options = self.execution_options.union(
+ connection._execution_options
+ )
+ if execution_options:
+ self.execution_options = self.execution_options.union(
+ execution_options
+ )
+
+ self._is_future_result = (
+ connection._is_future
+ or self.execution_options.get("future_result", False)
+ )
self.unicode_statement = util.text_type(compiled)
if compiled.schema_translate_map:
dialect,
connection,
dbapi_connection,
+ execution_options,
compiled,
parameters,
invoked_statement,
# we get here
assert compiled.can_execute
- self._is_future_result = connection._execution_options.get(
- "future_result", False
- )
- self.execution_options = compiled.execution_options.union(
- connection._execution_options
+ self.execution_options = compiled.execution_options
+ if connection._execution_options:
+ self.execution_options = self.execution_options.union(
+ connection._execution_options
+ )
+ if execution_options:
+ self.execution_options = self.execution_options.union(
+ execution_options
+ )
+
+ self._is_future_result = (
+ connection._is_future
+ or self.execution_options.get("future_result", False)
)
self.result_column_struct = (
@classmethod
def _init_statement(
- cls, dialect, connection, dbapi_connection, statement, parameters
+ cls,
+ dialect,
+ connection,
+ dbapi_connection,
+ execution_options,
+ statement,
+ parameters,
):
"""Initialize execution context for a string SQL statement."""
self.dialect = connection.dialect
self.is_text = True
- self._is_future_result = connection._execution_options.get(
- "future_result", False
- )
+ if connection._execution_options:
+ self.execution_options = self.execution_options.union(
+ connection._execution_options
+ )
+ if execution_options:
+ self.execution_options = self.execution_options.union(
+ execution_options
+ )
- # plain text statement
- self.execution_options = connection._execution_options
+ self._is_future_result = (
+ connection._is_future
+ or self.execution_options.get("future_result", False)
+ )
if not parameters:
if self.dialect.positional:
return self
@classmethod
- def _init_default(cls, dialect, connection, dbapi_connection):
+ def _init_default(
+ cls, dialect, connection, dbapi_connection, execution_options
+ ):
"""Initialize execution context for a ColumnDefault construct."""
self = cls.__new__(cls)
self.root_connection = connection
self._dbapi_connection = dbapi_connection
self.dialect = connection.dialect
- self.execution_options = connection._execution_options
+
+ if connection._execution_options:
+ self.execution_options = self.execution_options.union(
+ connection._execution_options
+ )
+ if execution_options:
+ self.execution_options = self.execution_options.union(
+ execution_options
+ )
+
+ self._is_future_result = (
+ connection._is_future
+ or self.execution_options.get("future_result", False)
+ )
+
self.cursor = self.create_cursor()
return self
@property
def connection(self):
- return self.root_connection._branch()
+ conn = self.root_connection
+ if conn._is_future:
+ return conn
+ else:
+ return conn._branch()
def should_autocommit_text(self, statement):
return AUTOCOMMIT_REGEXP.match(statement)
orig_fn = fn
def wrap_before_execute(
- conn, clauseelement, multiparams, params
+ conn, clauseelement, multiparams, params, execution_options
):
- orig_fn(conn, clauseelement, multiparams, params)
+ orig_fn(
+ conn,
+ clauseelement,
+ multiparams,
+ params,
+ execution_options,
+ )
return clauseelement, multiparams, params
fn = wrap_before_execute
)
event_key.with_wrapper(fn).base_listen()
- def before_execute(self, conn, clauseelement, multiparams, params):
+ @event._legacy_signature(
+ "1.4",
+ ["conn", "clauseelement", "multiparams", "params"],
+ lambda conn, clauseelement, multiparams, params, execution_options: (
+ conn,
+ clauseelement,
+ multiparams,
+ params,
+ ),
+ )
+ def before_execute(
+ self, conn, clauseelement, multiparams, params, execution_options
+ ):
"""Intercept high level execute() events, receiving uncompiled
SQL constructs and other objects prior to rendering into SQL.
:meth:`_engine.Connection.execute`.
:param multiparams: Multiple parameter sets, a list of dictionaries.
:param params: Single parameter set, a single dictionary.
+ :param execution_options: dictionary of per-execution execution
+ options passed along with the statement, if any. This only applies to
+ the the SQLAlchemy 2.0 version of :meth:`_engine.Connection.execute`
+ . To
+ view all execution options associated with the connection, access the
+ :meth:`_engine.Connection.get_execution_options`
+ method to view the fixed
+ execution options dictionary, then consider elements within this local
+ dictionary to be unioned into that dictionary.
+
+ .. versionadded: 1.4
.. seealso::
"""
- def after_execute(self, conn, clauseelement, multiparams, params, result):
+ @event._legacy_signature(
+ "1.4",
+ ["conn", "clauseelement", "multiparams", "params", "result"],
+ lambda conn, clauseelement, multiparams, params, execution_options, result: ( # noqa
+ conn,
+ clauseelement,
+ multiparams,
+ params,
+ result,
+ ),
+ )
+ def after_execute(
+ self,
+ conn,
+ clauseelement,
+ multiparams,
+ params,
+ execution_options,
+ result,
+ ):
"""Intercept high level execute() events after execute.
:meth:`_engine.Connection.execute`.
:param multiparams: Multiple parameter sets, a list of dictionaries.
:param params: Single parameter set, a single dictionary.
+ :param execution_options: dictionary of per-execution execution
+ options passed along with the statement, if any. This only applies to
+ the the SQLAlchemy 2.0 version of :meth:`_engine.Connection.execute`
+ . To
+ view all execution options associated with the connection, access the
+ :meth:`_engine.Connection.get_execution_options`
+ method to view the fixed
+ execution options dictionary, then consider elements within this local
+ dictionary to be unioned into that dictionary.
+
+ .. versionadded: 1.4
+
:param result: :class:`_engine.ResultProxy` generated by the execution
.
for index in range(len_keys)
}
)
+ # TODO: negative indexes? test coverage?
if extra:
for key, ex in zip(keys, extra):
rec = self._keymap[key]
"""
indexes = []
for key in keys:
+ if isinstance(key, int):
+ indexes.append(key)
+ continue
try:
rec = self._keymap[key]
except KeyError as ke:
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
+from .. import exc
from .. import util
+from ..util import collections_abc
def connection_memoize(key):
def py_fallback():
+ # TODO: pass the Connection in so that there can be a standard
+ # method for warning on parameter format
def _distill_params(multiparams, params): # noqa
r"""Given arguments from the calling form \*multiparams, \**params,
return a list of bind parameter structures, usually a list of
if not multiparams:
if params:
+ # TODO: parameter format deprecation warning
return [params]
else:
return []
# execute(stmt, "value")
return [[zero]]
else:
+ # TODO: parameter format deprecation warning
if hasattr(multiparams[0], "__iter__") and not hasattr(
multiparams[0], "strip"
):
return locals()
+_no_tuple = ()
+_no_kw = util.immutabledict()
+
+
+def _distill_params_20(params):
+ if params is None:
+ return _no_tuple, _no_kw, []
+ elif isinstance(params, collections_abc.MutableSequence): # list
+ if params and not isinstance(
+ params[0], (collections_abc.Mapping, tuple)
+ ):
+ raise exc.ArgumentError(
+ "List argument must consist only of tuples or dictionaries"
+ )
+
+ # the tuple is needed atm by the C version of _distill_params...
+ return tuple(params), _no_kw, params
+ elif isinstance(
+ params,
+ (collections_abc.Sequence, collections_abc.Mapping), # tuple or dict
+ ):
+ return _no_tuple, params, [params]
+ else:
+ raise exc.ArgumentError("mapping or sequence expected for parameters")
+
+
try:
from sqlalchemy.cutils import _distill_params # noqa
except ImportError:
"""Class-level events on :class:`._Dispatch` classes."""
__slots__ = (
+ "clsname",
"name",
"arg_names",
"has_kw",
def __init__(self, parent_dispatch_cls, fn):
self.name = fn.__name__
+ self.clsname = parent_dispatch_cls.__name__
argspec = util.inspect_getfullargspec(fn)
self.arg_names = argspec.args[1:]
self.has_kw = bool(argspec.varkw)
argspec.varkw
):
+ formatted_def = "def %s(%s%s)" % (
+ dispatch_collection.name,
+ ", ".join(dispatch_collection.arg_names),
+ ", **kw" if has_kw else "",
+ )
+ warning_txt = (
+ 'The argument signature for the "%s.%s" event listener '
+ "has changed as of version %s, and conversion for "
+ "the old argument signature will be removed in a "
+ 'future release. The new signature is "%s"'
+ % (
+ dispatch_collection.clsname,
+ dispatch_collection.name,
+ since,
+ formatted_def,
+ )
+ )
+
if conv:
assert not has_kw
def wrap_leg(*args):
+ util.warn_deprecated(warning_txt, version=since)
return fn(*conv(*args))
else:
def wrap_leg(*args, **kw):
+ util.warn_deprecated(warning_txt, version=since)
argdict = dict(zip(dispatch_collection.arg_names, args))
args = [argdict[name] for name in argnames]
if has_kw:
"""Future 2.0 API features.
"""
-
+from .engine import Connection # noqa
+from .engine import create_engine # noqa
+from .engine import Engine # noqa
from .result import Result # noqa
from ..sql.selectable import Select
from ..util.langhelpers import public_factory
--- /dev/null
+from .. import util
+from ..engine import Connection as _LegacyConnection
+from ..engine import create_engine as _create_engine
+from ..engine import Engine as _LegacyEngine
+from ..engine.base import OptionEngineMixin
+
+NO_OPTIONS = util.immutabledict()
+
+
+def create_engine(*arg, **kw):
+ """Create a new :class:`_future.Engine` instance.
+
+ Arguments passed to :func:`_future.create_engine` are mostly identical
+ to those passed to the 1.x :func:`_sa.create_engine` function.
+ The difference is that the object returned is the :class:`._future.Engine`
+ which has the 2.0 version of the API.
+
+ """
+
+ kw["_future_engine_class"] = Engine
+ return _create_engine(*arg, **kw)
+
+
+class Connection(_LegacyConnection):
+ """Provides high-level functionality for a wrapped DB-API connection.
+
+ **This is the SQLAlchemy 2.0 version** of the :class:`_engine.Connection`
+ class. The API and behavior of this object is largely the same, with the
+ following differences in behavior:
+
+ * The result object returned for results is the :class:`_future.Result`
+ object. This object has a slightly different API and behavior than the
+ prior :class:`_engine.ResultProxy` object.
+
+ * The object has :meth:`_future.Connection.commit` and
+ :meth:`_future.Connection.rollback` methods which commit or roll back
+ the current transaction in progress, if any.
+
+ * The object features "autobegin" behavior, such that any call to
+ :meth:`_future.Connection.execute` will
+ unconditionally start a
+ transaction which can be controlled using the above mentioned
+ :meth:`_future.Connection.commit` and
+ :meth:`_future.Connection.rollback` methods.
+
+ * The object does not have any "autocommit" functionality. Any SQL
+ statement or DDL statement will not be followed by any COMMIT until
+ the transaction is explicitly committed, either via the
+ :meth:`_future.Connection.commit` method, or if the connection is
+ being used in a context manager that commits such as the one
+ returned by :meth:`_future.Engine.begin`.
+
+ * The SAVEPOINT method :meth:`_future.Connection.begin_nested` returns
+ a :class:`_engine.NestedTransaction` as was always the case, and the
+ savepoint can be controlled by invoking
+ :meth:`_engine.NestedTransaction.commit` or
+ :meth:`_engine.NestedTransaction.rollback` as was the case before.
+ However, this savepoint "transaction" is not associated with the
+ transaction that is controlled by the connection itself; the overall
+ transaction can be committed or rolled back directly which will not emit
+ any special instructions for the SAVEPOINT (this will typically have the
+ effect that one desires).
+
+ * There are no "nested" connections or transactions.
+
+
+
+ """
+
+ _is_future = True
+
+ def _branch(self):
+ raise NotImplementedError(
+ "sqlalchemy.future.Connection does not support "
+ "'branching' of new connections."
+ )
+
+ def begin(self):
+ """Begin a transaction prior to autobegin occurring.
+
+ The :meth:`_future.Connection.begin` method in SQLAlchemy 2.0 begins a
+ transaction that normally will be begun in any case when the connection
+ is first used to execute a statement. The reason this method might be
+ used would be to invoke the :meth:`_events.ConnectionEvents.begin`
+ event at a specific time, or to organize code within the scope of a
+ connection checkout in terms of context managed blocks, such as::
+
+ with engine.connect() as conn:
+ with conn.begin():
+ conn.execute(...)
+ conn.execute(...)
+
+ with conn.begin():
+ conn.execute(...)
+ conn.execute(...)
+
+ The above code is not fundamentally any different in its behavior than
+ the following code which does not use
+ :meth:`_future.Connection.begin`::
+
+ with engine.connect() as conn:
+ conn.execute(...)
+ conn.execute(...)
+ conn.commit()
+
+ conn.execute(...)
+ conn.execute(...)
+ conn.commit()
+
+ In both examples, if an exception is raised, the transaction will not
+ be committed. An explicit rollback of the transaction will occur,
+ including that the :meth:`_events.ConnectionEvents.rollback` event will
+ be emitted, as connection's context manager will call
+ :meth:`_future.Connection.close`, which will call
+ :meth:`_future.Connection.rollback` for any transaction in place
+ (excluding that of a SAVEPOINT).
+
+ From a database point of view, the :meth:`_future.Connection.begin`
+ method does not emit any SQL or change the state of the underlying
+ DBAPI connection in any way; the Python DBAPI does not have any
+ concept of explicit transaction begin.
+
+ :return: a :class:`_engine.Transaction` object. This object supports
+ context-manager operation which will commit a transaction or
+ emit a rollback in case of error.
+
+ . If this event is not being used, then there is
+ no real effect from invoking :meth:`_future.Connection.begin` ahead
+ of time as the Python DBAPI does not implement any explicit BEGIN
+
+
+ The returned object is an instance of :class:`_engine.Transaction`.
+ This object represents the "scope" of the transaction,
+ which completes when either the :meth:`_engine.Transaction.rollback`
+ or :meth:`_engine.Transaction.commit` method is called.
+
+ Nested calls to :meth:`_future.Connection.begin` on the same
+ :class:`_future.Connection` will return new
+ :class:`_engine.Transaction` objects that represent an emulated
+ transaction within the scope of the enclosing transaction, that is::
+
+ trans = conn.begin() # outermost transaction
+ trans2 = conn.begin() # "nested"
+ trans2.commit() # does nothing
+ trans.commit() # actually commits
+
+ Calls to :meth:`_engine.Transaction.commit` only have an effect when
+ invoked via the outermost :class:`_engine.Transaction` object, though
+ the :meth:`_engine.Transaction.rollback` method of any of the
+ :class:`_engine.Transaction` objects will roll back the transaction.
+
+ .. seealso::
+
+ :meth:`_future.Connection.begin_nested` - use a SAVEPOINT
+
+ :meth:`_future.Connection.begin_twophase` -
+ use a two phase /XID transaction
+
+ :meth:`_future.Engine.begin` - context manager available from
+ :class:`_future.Engine`
+
+ """
+ return super(Connection, self).begin()
+
+ def begin_nested(self):
+ """Begin a nested transaction and return a transaction handle.
+
+ The returned object is an instance of
+ :class:`_engine.NestedTransaction`.
+
+ Nested transactions require SAVEPOINT support in the
+ underlying database. Any transaction in the hierarchy may
+ ``commit`` and ``rollback``, however the outermost transaction
+ still controls the overall ``commit`` or ``rollback`` of the
+ transaction of a whole.
+
+ In SQLAlchemy 2.0, the :class:`_engine.NestedTransaction` remains
+ independent of the :class:`_future.Connection` object itself. Calling
+ the :meth:`_future.Connection.commit` or
+ :meth:`_future.Connection.rollback` will always affect the actual
+ containing database transaction itself, and not the SAVEPOINT itself.
+ When a database transaction is committed, any SAVEPOINTs that have been
+ established are cleared and the data changes within their scope is also
+ committed.
+
+ .. seealso::
+
+ :meth:`_future.Connection.begin`
+
+
+ """
+ return super(Connection, self).begin_nested()
+
+ def commit(self):
+ """Commit the transaction that is currently in progress.
+
+ This method commits the current transaction if one has been started.
+ If no transaction was started, the method has no effect, assuming
+ the connection is in a non-invalidated state.
+
+ A transaction is begun on a :class:`_future.Connection` automatically
+ whenever a statement is first executed, or when the
+ :meth:`_future.Connection.begin` method is called.
+
+ .. note:: The :meth:`_future.Connection.commit` method only acts upon
+ the primary database transaction that is linked to the
+ :class:`_future.Connection` object. It does not operate upon a
+ SAVEPOINT that would have been invoked from the
+ :meth:`_future.Connection.begin_nested` method; for control of a
+ SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the
+ :class:`_engine.NestedTransaction` that is returned by the
+ :meth:`_future.Connection.begin_nested` method itself.
+
+
+ """
+ if self._transaction:
+ self._transaction.commit()
+
+ def rollback(self):
+ """Roll back the transaction that is currently in progress.
+
+ This method rolls back the current transaction if one has been started.
+ If no transaction was started, the method has no effect. If a
+ transaction was started and the connection is in an invalidated state,
+ the transaction is cleared using this method.
+
+ A transaction is begun on a :class:`_future.Connection` automatically
+ whenever a statement is first executed, or when the
+ :meth:`_future.Connection.begin` method is called.
+
+ .. note:: The :meth:`_future.Connection.rollback` method only acts
+ upon the primary database transaction that is linked to the
+ :class:`_future.Connection` object. It does not operate upon a
+ SAVEPOINT that would have been invoked from the
+ :meth:`_future.Connection.begin_nested` method; for control of a
+ SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the
+ :class:`_engine.NestedTransaction` that is returned by the
+ :meth:`_future.Connection.begin_nested` method itself.
+
+
+ """
+ if self._transaction:
+ self._transaction.rollback()
+
+ def close(self):
+ """Close this :class:`_future.Connection`.
+
+ This has the effect of also calling :meth:`_future.Connection.rollback`
+ if any transaction is in place.
+
+ """
+
+ try:
+ conn = self.__connection
+ except AttributeError:
+ pass
+ else:
+ # TODO: can we do away with "_reset_agent" stuff now?
+ if self._transaction:
+ self._transaction.rollback()
+
+ conn.close()
+
+ # the close() process can end up invalidating us,
+ # as the pool will call our transaction as the "reset_agent"
+ # for rollback(), which can then cause an invalidation
+ if not self.__invalid:
+ del self.__connection
+ self.__can_reconnect = False
+ self._transaction = None
+
+ def execute(self, statement, parameters=None, execution_options=None):
+ r"""Executes a SQL statement construct and returns a
+ :class:`_future.Result`.
+
+ :param object: The statement to be executed. This is always
+ an object that is in both the :class:`_expression.ClauseElement` and
+ :class:`_expression.Executable` hierarchies, including:
+
+ * :class:`_expression.Select`
+ * :class:`_expression.Insert`, :class:`_expression.Update`,
+ :class:`_expression.Delete`
+ * :class:`_expression.TextClause` and
+ :class:`_expression.TextualSelect`
+ * :class:`_schema.DDL` and objects which inherit from
+ :class:`_schema.DDLElement`
+
+ :param parameters: parameters which will be bound into the statment.
+ This may be either a dictionary of parameter names to values,
+ or a mutable sequence (e.g. a list) of dictionaries. When a
+ list of dictionaries is passed, the underlying statement execution
+ will make use of the DBAPI ``cursor.executemany()`` method.
+ When a single dictionary is passed, the DBAPI ``cursor.execute()``
+ method will be used.
+
+ :param execution_options: optional dictionary of execution options,
+ which will be associated with the statement execution. This
+ dictionary can provide a subset of the options that are accepted
+ by :meth:`_future.Connection.execution_options`.
+
+ :return: a :class:`_future.Result` object.
+
+ """
+ return self._execute_20(
+ statement, parameters, execution_options or NO_OPTIONS
+ )
+
+ def scalar(self, statement, parameters=None, execution_options=None):
+ r"""Executes a SQL statement construct and returns a scalar object.
+
+ This method is shorthand for invoking the
+ :meth:`_future.Result.scalar` method after invoking the
+ :meth:`_future.Connection.execute` method. Parameters are equivalent.
+
+ :return: a scalar Python value representing the first column of the
+ first row returned.
+
+ """
+ return self.execute(statement, parameters, execution_options).scalar()
+
+
+class Engine(_LegacyEngine):
+ """Connects a :class:`_pool.Pool` and
+ :class:`_engine.Dialect` together to provide a
+ source of database connectivity and behavior.
+
+ **This is the SQLAlchemy 2.0 version** of the :class:`~.engine.Engine`.
+
+ An :class:`.future.Engine` object is instantiated publicly using the
+ :func:`~sqlalchemy.future.create_engine` function.
+
+ .. seealso::
+
+ :doc:`/core/engines`
+
+ :ref:`connections_toplevel`
+
+ """
+
+ _connection_cls = Connection
+ _is_future = True
+
+ def _not_implemented(self, *arg, **kw):
+ raise NotImplementedError(
+ "This method is not implemented for SQLAlchemy 2.0."
+ )
+
+ transaction = (
+ run_callable
+ ) = (
+ execute
+ ) = (
+ scalar
+ ) = (
+ _execute_clauseelement
+ ) = _execute_compiled = table_names = has_table = _not_implemented
+
+ def _run_ddl_visitor(self, visitorcallable, element, **kwargs):
+ # TODO: this is for create_all support etc. not clear if we
+ # want to provide this in 2.0, that is, a way to execute SQL where
+ # they aren't calling "engine.begin()" explicitly, however, DDL
+ # may be a special case for which we want to continue doing it this
+ # way. A big win here is that the full DDL sequence is inside of a
+ # single transaction rather than COMMIT for each statment.
+ with self.begin() as conn:
+ conn._run_ddl_visitor(visitorcallable, element, **kwargs)
+
+ @classmethod
+ def _future_facade(self, legacy_engine):
+ return Engine(
+ legacy_engine.pool,
+ legacy_engine.dialect,
+ legacy_engine.url,
+ logging_name=legacy_engine.logging_name,
+ echo=legacy_engine.echo,
+ hide_parameters=legacy_engine.hide_parameters,
+ execution_options=legacy_engine._execution_options,
+ )
+
+ def begin(self):
+ """Return a :class:`_future.Connection` object with a transaction
+ begun.
+
+ Use of this method is similar to that of
+ :meth:`_future.Engine.connect`, typically as a context manager, which
+ will automatically maintain the state of the transaction when the block
+ ends, either by calling :meth:`_future.Connection.commit` when the
+ block succeeds normally, or :meth:`_future.Connection.rollback` when an
+ exception is raised, before propagating the exception outwards::
+
+ with engine.begin() as connection:
+ connection.execute(text("insert into table values ('foo')"))
+
+
+ .. seealso::
+
+ :meth:`_future.Engine.connect`
+
+ :meth:`_future.Connection.begin`
+
+ """
+ return super(Engine, self).begin()
+
+ def connect(self):
+ """Return a new :class:`_future.Connection` object.
+
+ The :class:`_future.Connection` acts as a Python context manager, so
+ the typical use of this method looks like::
+
+ with engine.connect() as connection:
+ connection.execute(text("insert into table values ('foo')"))
+ connection.commit()
+
+ Where above, after the block is completed, the connection is "closed"
+ and its underlying DBAPI resources are returned to the connection pool.
+ This also has the effect of rolling back any transaction that
+ was explicitly begun or was begun via autobegin, and will
+ emit the :meth:`_events.ConnectionEvents.rollback` event if one was
+ started and is still in progress.
+
+ .. seealso::
+
+ :meth:`_future.Engine.begin`
+
+
+ """
+ return super(Engine, self).connect()
+
+
+class OptionEngine(OptionEngineMixin, Engine):
+ pass
+
+
+Engine._option_cls = OptionEngine
import operator
from .. import util
-from ..engine.result import _baserow_usecext
from ..engine.result import BaseResult
from ..engine.result import CursorResultMetaData
from ..engine.result import DefaultCursorFetchStrategy
from ..engine.result import Row
from ..sql import util as sql_util
from ..sql.base import _generative
-from ..sql.base import Generative
+from ..sql.base import InPlaceGenerative
-class Result(Generative, BaseResult):
+class Result(InPlaceGenerative, BaseResult):
"""Interim "future" result proxy so that dialects can build on
upcoming 2.0 patterns.
self._soft_close(hard=True)
def columns(self, *col_expressions):
- indexes = []
- for key in col_expressions:
- try:
- rec = self._keymap[key]
- except KeyError:
- rec = self._key_fallback(key, True)
- if rec is None:
- return None
-
- index, obj = rec[0:2]
-
- if index is None:
- self._metadata._raise_for_ambiguous_column_name(obj)
- indexes.append(index)
- return self._column_slices(indexes)
+ r"""Establish the columns that should be returned in each row.
+
+ This method may be used to limit the columns returned as well
+ as to reorder them. The given list of expressions are normally
+ a series of integers or string key names. They may also be
+ appropriate :class:`.ColumnElement` objects which correspond to
+ a given statement construct.
+
+ E.g.::
+
+ statement = select(table.c.x, table.c.y, table.c.z)
+ result = connection.execute(statement)
+
+ for z, y in result.columns('z', 'y'):
+ # ...
+
+
+ Example of using the column objects from the statement itself::
+
+ for z, y in result.columns(
+ statement.selected_columns.c.z,
+ statement.selected_columns.c.y
+ ):
+ # ...
+
+ :param \*col_expressions: indicates columns to be returned. Elements
+ may be integer row indexes, string column names, or appropriate
+ :class:`.ColumnElement` objects corresponding to a select construct.
+
+ :return: this :class:`_future.Result` object with the modifications
+ given.
+
+ """
+ return self._column_slices(col_expressions)
+
+ def partitions(self, size=100):
+ """Iterate through sub-lists of rows of the size given.
+
+ Each list will be of the size given, excluding the last list to
+ be yielded, which may have a small number of rows. No empty
+ lists will be yielded.
+
+ The result object is automatically closed when the iterator
+ is fully consumed.
+
+ Note that the backend driver will usually buffer the entire result
+ ahead of time unless the
+ :paramref:`.Connection.execution_options.stream_results` execution
+ option is used indicating that the driver should not pre-buffer
+ results, if possible. Not all drivers support this option and
+ the option is silently ignored for those who do. For a positive
+ assertion that the driver supports streaming results that will
+ fail if not supported, use the
+ :paramref:`.Connection.execution_options.stream_per`
+ execution option.
+
+ :param size: indicate the maximum number of rows to be present
+ in each list yielded.
+ :return: iterator of lists
+
+ """
+ getter = self._row_getter()
+ while True:
+ partition = [
+ getter(r) for r in self._safe_fetchmany_impl(size=size)
+ ]
+ if partition:
+ yield partition
+ else:
+ break
def scalars(self):
result = self._column_slices(0)
@_generative
def _column_slices(self, indexes):
- if _baserow_usecext:
- self._column_slice_filter = self._metadata._tuplegetter(*indexes)
- else:
- self._column_slice_filter = self._metadata._pure_py_tuplegetter(
- *indexes
- )
+ self._column_slice_filter = self._metadata._tuple_getter(indexes)
@_generative
def mappings(self):
def _safe_fetchmany_impl(self, size=None):
try:
- l = self.process_rows(self.cursor_strategy.fetchmany(size))
+ l = self.cursor_strategy.fetchmany(size)
if len(l) == 0:
self._soft_close()
return l
else:
return getter(row)
+ @util.deprecated(
+ "2.0",
+ "The :meth:`_future.Result.fetchall` "
+ "method is provided for backwards "
+ "compatibility and will be removed in a future release.",
+ )
+ def fetchall(self):
+ """A synonym for the :meth:`_future.Result.all` method."""
+
+ return self.all()
+
+ @util.deprecated(
+ "2.0",
+ "The :meth:`_future.Result.fetchone` "
+ "method is provided for backwards "
+ "compatibility and will be removed in a future release.",
+ )
+ def fetchone(self):
+ """Fetch one row.
+
+ this method is provided for backwards compatibility with
+ SQLAlchemy 1.x.x.
+
+ To fetch the first row of a result only, use the
+ :meth:`.future.Result.first` method. To iterate through all
+ rows, iterate the :class:`_future.Result` object directly.
+
+ """
+ return self._onerow()
+
+ @util.deprecated(
+ "2.0",
+ "The :meth:`_future.Result.fetchmany` "
+ "method is provided for backwards "
+ "compatibility and will be removed in a future release.",
+ )
+ def fetchmany(self, size=None):
+ """Fetch many rows.
+
+ this method is provided for backwards compatibility with
+ SQLAlchemy 1.x.x.
+
+ To fetch rows in groups, use the :meth:`.future.Result.partitions`
+ method, or the :meth:`.future.Result.chunks` method in combination
+ with the :paramref:`.Connection.execution_options.stream_per`
+ option which sets up the buffer size before fetching the result.
+
+ """
+ getter = self._row_getter()
+ return [getter(r) for r in self._safe_fetchmany_impl(size=size)]
+
def all(self):
+ """Return all rows in a list.
+
+ Closes the result set after invocation.
+
+ :return: a list of :class:`.Row` objects.
+
+ """
getter = self._row_getter()
return [getter(r) for r in self._safe_fetchall_impl()]
def first(self):
+ """Fetch the first row or None if no row is present.
+
+ Closes the result set and discards remaining rows. A warning
+ is emitted if additional rows remain.
+
+ :return: a :class:`.Row` object, or None if no rows remain
+
+ """
getter = self._row_getter()
row = self._safe_fetchone_impl()
if row is None:
self._soft_close()
util.warn("Additional rows remain")
return row
+
+ def scalar(self):
+ """Fetch the first column of the first row, and close the result set.
+
+ After calling this method, the object is fully closed,
+ e.g. the :meth:`_engine.ResultProxy.close`
+ method will have been called.
+
+ :return: a Python scalar value , or None if no rows remain
+
+ """
+ row = self.first()
+ if row is not None:
+ return row[0]
+ else:
+ return None
_logged_classes = set()
+def _qual_logger_name_for_cls(cls):
+ return (
+ getattr(cls, "_sqla_logger_namespace", None)
+ or cls.__module__ + "." + cls.__name__
+ )
+
+
def class_logger(cls):
- logger = logging.getLogger(cls.__module__ + "." + cls.__name__)
+ logger = logging.getLogger(_qual_logger_name_for_cls(cls))
cls._should_log_debug = lambda self: logger.isEnabledFor(logging.DEBUG)
cls._should_log_info = lambda self: logger.isEnabledFor(logging.INFO)
cls.logger = logger
"""create a logger for an instance that implements :class:`.Identified`."""
if instance.logging_name:
- name = "%s.%s.%s" % (
- instance.__class__.__module__,
- instance.__class__.__name__,
+ name = "%s.%s" % (
+ _qual_logger_name_for_cls(instance.__class__),
instance.logging_name,
)
else:
- name = "%s.%s" % (
- instance.__class__.__module__,
- instance.__class__.__name__,
- )
+ name = _qual_logger_name_for_cls(instance.__class__)
instance._echo = echoflag
elif self.nested:
transaction = conn.begin_nested()
else:
- transaction = conn.begin()
+ if conn._is_future and conn.in_transaction():
+ transaction = conn._transaction
+ else:
+ transaction = conn.begin()
except:
# connection will not not be associated with this Session;
# close it immediately so that it isn't closed under GC
conn.close()
raise
else:
+ bind_is_connection = isinstance(bind, engine.Connection)
+
self._connections[conn] = self._connections[conn.engine] = (
conn,
transaction,
- conn is not bind,
+ not bind_is_connection or not conn._is_future,
+ not bind_is_connection,
)
self.session.dispatch.after_begin(self.session, self, conn)
return conn
self._prepare_impl()
if self._parent is None or self.nested:
- for t in set(self._connections.values()):
- t[1].commit()
+ for conn, trans, should_commit, autoclose in set(
+ self._connections.values()
+ ):
+ if should_commit:
+ trans.commit()
self._state = COMMITTED
self.session.dispatch.after_commit(self.session)
def close(self, invalidate=False):
self.session._transaction = self._parent
if self._parent is None:
- for connection, transaction, autoclose in set(
+ for connection, transaction, should_commit, autoclose in set(
self._connections.values()
):
if invalidate:
return s
+class InPlaceGenerative(HasMemoized):
+ """Provide a method-chaining pattern in conjunction with the
+ @_generative decorator taht mutates in place."""
+
+ def _generate(self):
+ skip = self._memoized_keys
+ for k in skip:
+ self.__dict__.pop(k, None)
+ return self
+
+
class HasCompileState(Generative):
"""A class that has a :class:`.CompileState` associated with it."""
self.string, schema_translate_map
)
- def _execute_on_connection(self, connection, multiparams, params):
+ def _execute_on_connection(
+ self, connection, multiparams, params, execution_options
+ ):
if self.can_execute:
- return connection._execute_compiled(self, multiparams, params)
+ return connection._execute_compiled(
+ self, multiparams, params, execution_options
+ )
else:
raise exc.ObjectNotExecutableError(self.statement)
dialect = None
callable_ = None
- def _execute_on_connection(self, connection, multiparams, params):
- return connection._execute_ddl(self, multiparams, params)
+ def _execute_on_connection(
+ self, connection, multiparams, params, execution_options
+ ):
+ return connection._execute_ddl(
+ self, multiparams, params, execution_options
+ )
def execute(self, bind=None, target=None):
"""Execute this DDL immediately.
d.pop("_generate_cache_key", None)
return d
- def _execute_on_connection(self, connection, multiparams, params):
+ def _execute_on_connection(
+ self, connection, multiparams, params, execution_options
+ ):
if self.supports_execution:
- return connection._execute_clauseelement(self, multiparams, params)
+ return connection._execute_clauseelement(
+ self, multiparams, params, execution_options
+ )
else:
raise exc.ObjectNotExecutableError(self)
operator=operators.comma_op, group_contents=True, *args
).self_group()
- def _execute_on_connection(self, connection, multiparams, params):
- return connection._execute_function(self, multiparams, params)
+ def _execute_on_connection(
+ self, connection, multiparams, params, execution_options
+ ):
+ return connection._execute_function(
+ self, multiparams, params, execution_options
+ )
@property
def columns(self):
bind = _bind_or_error(self)
return bind.execute(self, **kwargs)
- def _execute_on_connection(self, connection, multiparams, params):
- return connection._execute_default(self, multiparams, params)
+ def _execute_on_connection(
+ self, connection, multiparams, params, execution_options
+ ):
+ return connection._execute_default(
+ self, multiparams, params, execution_options
+ )
@property
def bind(self):
orig = []
@event.listens_for(engine, "before_execute")
- def connection_execute(conn, clauseelement, multiparams, params):
+ def connection_execute(
+ conn, clauseelement, multiparams, params, execution_options
+ ):
# grab the original statement + params before any cursor
# execution
orig[:] = clauseelement, multiparams, params
cls._stack.append(_current)
cls.set_as_current(config, namespace)
+ @classmethod
+ def pop(cls, namespace):
+ if cls._stack:
+ # a failed test w/ -x option can call reset() ahead of time
+ _current = cls._stack[-1]
+ del cls._stack[-1]
+ cls.set_as_current(_current, namespace)
+
@classmethod
def reset(cls, namespace):
if cls._stack:
return engine
-def testing_engine(url=None, options=None):
+def testing_engine(url=None, options=None, future=False):
"""Produce an engine configured by --options with optional overrides."""
- from sqlalchemy import create_engine
+ if future or config.db and config.db._is_future:
+ from sqlalchemy.future import create_engine
+ else:
+ from sqlalchemy import create_engine
from sqlalchemy.engine.url import make_url
if not options:
# engines.drop_all_tables(metadata, config.db)
+class FutureEngineMixin(object):
+ @classmethod
+ def setup_class(cls):
+ super_ = super(FutureEngineMixin, cls)
+ if hasattr(super_, "setup_class"):
+ super_.setup_class()
+
+ from ..future.engine import Engine
+ from sqlalchemy import testing
+
+ config._current.push_engine(Engine._future_facade(config.db), testing)
+
+ @classmethod
+ def teardown_class(cls):
+ from sqlalchemy import testing
+
+ config._current.pop(testing)
+
+ super_ = super(FutureEngineMixin, cls)
+ if hasattr(super_, "teardown_class"):
+ super_.teardown_class()
+
+
class TablesTest(TestBase):
# 'once', None
)
-__all__ = ("TableDDLTest",)
+class FutureTableDDLTest(fixtures.FutureEngineMixin, TableDDLTest):
+ pass
+
+
+__all__ = ("TableDDLTest", "FutureTableDDLTest")
# ignore 2.0 warnings unless we are explicitly testing for them
warnings.filterwarnings("ignore", category=sa_exc.RemovedIn20Warning)
+ # ignore things that are deprecated *as of* 2.0 :)
+ warnings.filterwarnings(
+ "ignore",
+ category=sa_exc.SADeprecationWarning,
+ message=r".*\(deprecated since: 2.0\)$",
+ )
+
try:
import pytest
except ImportError:
if warning is None:
warning = exc.SADeprecationWarning
+ message += " (deprecated since: %s)" % version
+
def decorate(fn):
return _decorate_with_warning(
fn, warning, message % dict(func=fn.__name__), version, header
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
+from sqlalchemy.testing import expect_deprecated
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_not_
def handler1(x, y):
canary(x, y)
- self.TargetOne().dispatch.event_three(4, 5, 6, 7)
+ with expect_deprecated(
+ 'The argument signature for the "TargetEventsOne.event_three" '
+ "event listener has changed as of version 0.9, and conversion "
+ "for the old argument signature will be removed in a future "
+ r'release. The new signature is "def event_three\(x, y, z, q\)"'
+ ):
+ self.TargetOne().dispatch.event_three(4, 5, 6, 7)
eq_(canary.mock_calls, [call(4, 5)])
eq_(canary.mock_calls, [call(5, 4, 5, foo="bar")])
def _test_legacy_accept_kw(self, target, canary):
- target.dispatch.event_four(4, 5, 6, 7, foo="bar")
+ with expect_deprecated(
+ 'The argument signature for the "TargetEventsOne.event_four" '
+ "event listener has changed as of version 0.9, and conversion "
+ "for the old argument signature will be removed in a future "
+ r"release. The new signature is "
+ r'"def event_four\(x, y, z, q, \*\*kw\)"'
+ ):
+ target.dispatch.event_four(4, 5, 6, 7, foo="bar")
eq_(canary.mock_calls, [call(4, 5, {"foo": "bar"})])
def handler1(x, y, z, q):
canary(x, y, z, q)
- self.TargetOne().dispatch.event_six(4, 5)
+ with expect_deprecated(
+ 'The argument signature for the "TargetEventsOne.event_six" '
+ "event listener has changed as of version 0.9, and "
+ "conversion for the old argument signature will be removed in "
+ "a future release. The new signature is "
+ r'"def event_six\(x, y\)'
+ ):
+ self.TargetOne().dispatch.event_six(4, 5)
eq_(canary.mock_calls, [call(4, 5, 9, 20)])
+ def test_complex_new_accept(self):
+ canary = Mock()
+
+ @event.listens_for(self.TargetOne, "event_six")
+ def handler1(x, y):
+ canary(x, y)
+
+ # new version does not emit a warning
+ self.TargetOne().dispatch.event_six(4, 5)
+ eq_(canary.mock_calls, [call(4, 5)])
+
def test_legacy_accept_from_method(self):
canary = Mock()
event.listen(self.TargetOne, "event_three", MyClass().handler1)
- self.TargetOne().dispatch.event_three(4, 5, 6, 7)
+ with expect_deprecated(
+ 'The argument signature for the "TargetEventsOne.event_three" '
+ "event listener has changed as of version 0.9, and conversion "
+ "for the old argument signature will be removed in a future "
+ r'release. The new signature is "def event_three\(x, y, z, q\)"'
+ ):
+ self.TargetOne().dispatch.event_three(4, 5, 6, 7)
eq_(canary.mock_calls, [call(4, 5)])
def test_standard_accept_has_legacies(self):
class DocTest(fixtures.TestBase):
def _setup_logger(self):
- rootlogger = logging.getLogger("sqlalchemy.engine.base.Engine")
+ rootlogger = logging.getLogger("sqlalchemy.engine.Engine")
class MyStream(object):
def write(self, string):
rootlogger.addHandler(handler)
def _teardown_logger(self):
- rootlogger = logging.getLogger("sqlalchemy.engine.base.Engine")
+ rootlogger = logging.getLogger("sqlalchemy.engine.Engine")
rootlogger.removeHandler(self._handler)
def _setup_create_table_patcher(self):
dialect._casing = casing
dialect.default_schema_name = "Test"
connection = mock.Mock(
- dialect=dialect, execute=lambda stmt, **params: ischema
+ dialect=dialect, execute=lambda stmt, params: ischema
)
dialect._correct_for_mysql_bugs_88718_96365(fkeys, connection)
eq_(
+import re
+
import sqlalchemy as tsa
from sqlalchemy import create_engine
+from sqlalchemy import event
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import inspect
from sqlalchemy import VARCHAR
from sqlalchemy.engine import reflection
from sqlalchemy.engine.base import Connection
+from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.mock import MockConnection
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
+from sqlalchemy.testing import config
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_instance_of
from sqlalchemy.testing import is_true
+from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.mock import Mock
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
(3, "horse"),
(4, "sally"),
]
+
+
+class EngineEventsTest(fixtures.TestBase):
+ __requires__ = ("ad_hoc_engines",)
+ __backend__ = True
+
+ def tearDown(self):
+ Engine.dispatch._clear()
+ Engine._has_events = False
+
+ def _assert_stmts(self, expected, received):
+ list(received)
+ for stmt, params, posn in expected:
+ if not received:
+ assert False, "Nothing available for stmt: %s" % stmt
+ while received:
+ teststmt, testparams, testmultiparams = received.pop(0)
+ teststmt = (
+ re.compile(r"[\n\t ]+", re.M).sub(" ", teststmt).strip()
+ )
+ if teststmt.startswith(stmt) and (
+ testparams == params or testparams == posn
+ ):
+ break
+
+ def test_retval_flag(self):
+ canary = []
+
+ def tracker(name):
+ def go(conn, *args, **kw):
+ canary.append(name)
+
+ return go
+
+ def execute(conn, clauseelement, multiparams, params):
+ canary.append("execute")
+ return clauseelement, multiparams, params
+
+ def cursor_execute(
+ conn, cursor, statement, parameters, context, executemany
+ ):
+ canary.append("cursor_execute")
+ return statement, parameters
+
+ engine = engines.testing_engine()
+
+ assert_raises(
+ tsa.exc.ArgumentError,
+ event.listen,
+ engine,
+ "begin",
+ tracker("begin"),
+ retval=True,
+ )
+
+ event.listen(engine, "before_execute", execute, retval=True)
+ event.listen(
+ engine, "before_cursor_execute", cursor_execute, retval=True
+ )
+ with testing.expect_deprecated(
+ r"The argument signature for the "
+ r"\"ConnectionEvents.before_execute\" event listener",
+ ):
+ engine.execute(select([1]))
+ eq_(canary, ["execute", "cursor_execute"])
+
+ def test_argument_format_execute(self):
+ def before_execute(conn, clauseelement, multiparams, params):
+ assert isinstance(multiparams, (list, tuple))
+ assert isinstance(params, dict)
+
+ def after_execute(conn, clauseelement, multiparams, params, result):
+ assert isinstance(multiparams, (list, tuple))
+ assert isinstance(params, dict)
+
+ e1 = testing_engine(config.db_url)
+ event.listen(e1, "before_execute", before_execute)
+ event.listen(e1, "after_execute", after_execute)
+
+ with testing.expect_deprecated(
+ r"The argument signature for the "
+ r"\"ConnectionEvents.before_execute\" event listener",
+ r"The argument signature for the "
+ r"\"ConnectionEvents.after_execute\" event listener",
+ ):
+ e1.execute(select([1]))
from sqlalchemy import util
from sqlalchemy import VARCHAR
from sqlalchemy.engine import default
+from sqlalchemy.engine.base import Connection
from sqlalchemy.engine.base import Engine
from sqlalchemy.sql import column
from sqlalchemy.sql import literal
)
def test_raw_named_invalid(self, connection):
+ # this is awkward b.c. this is just testing if regular Python
+ # is raising TypeError if they happened to send arguments that
+ # look like the legacy ones which also happen to conflict with
+ # the positional signature for the method. some combinations
+ # can get through and fail differently
assert_raises(
TypeError,
connection.exec_driver_sql,
"values (%(id)s, %(name)s)",
{"id": 2, "name": "ed"},
{"id": 3, "name": "horse"},
+ {"id": 4, "name": "horse"},
)
assert_raises(
TypeError,
def _assert_stmts(self, expected, received):
list(received)
+
for stmt, params, posn in expected:
if not received:
assert False, "Nothing available for stmt: %s" % stmt
event.listen(e1, "before_execute", canary)
s1 = select([1])
s2 = select([2])
- e1.execute(s1)
- e2.execute(s2)
+
+ with e1.connect() as conn:
+ conn.execute(s1)
+
+ with e2.connect() as conn:
+ conn.execute(s2)
eq_([arg[1][1] for arg in canary.mock_calls], [s1])
event.listen(e2, "before_execute", canary)
- e1.execute(s1)
- e2.execute(s2)
+
+ with e1.connect() as conn:
+ conn.execute(s1)
+
+ with e2.connect() as conn:
+ conn.execute(s2)
eq_([arg[1][1] for arg in canary.mock_calls], [s1, s1, s2])
def test_per_engine_plus_global(self):
e1.connect()
e2.connect()
- e1.execute(select([1]))
+ with e1.connect() as conn:
+ conn.execute(select([1]))
eq_(canary.be1.call_count, 1)
eq_(canary.be2.call_count, 1)
- e2.execute(select([1]))
+ with e2.connect() as conn:
+ conn.execute(select([1]))
eq_(canary.be1.call_count, 2)
eq_(canary.be2.call_count, 1)
eq_(canary.be1.call_count, 1)
eq_(canary.be2.call_count, 1)
- conn._branch().execute(select([1]))
- eq_(canary.be1.call_count, 2)
- eq_(canary.be2.call_count, 2)
+ if testing.requires.legacy_engine.enabled:
+ conn._branch().execute(select([1]))
+ eq_(canary.be1.call_count, 2)
+ eq_(canary.be2.call_count, 2)
def test_add_event_after_connect(self):
# new feature as of #2978
dialect = conn.dialect
ctx = dialect.execution_ctx_cls._init_statement(
- dialect, conn, conn.connection, stmt, {}
+ dialect, conn, conn.connection, {}, stmt, {}
)
ctx._execute_scalar(stmt, Integer())
)
def test_argument_format_execute(self):
- def before_execute(conn, clauseelement, multiparams, params):
+ def before_execute(
+ conn, clauseelement, multiparams, params, execution_options
+ ):
assert isinstance(multiparams, (list, tuple))
assert isinstance(params, dict)
- def after_execute(conn, clauseelement, multiparams, params, result):
+ def after_execute(
+ conn, clauseelement, multiparams, params, result, execution_options
+ ):
assert isinstance(multiparams, (list, tuple))
assert isinstance(params, dict)
event.listen(e1, "before_execute", before_execute)
event.listen(e1, "after_execute", after_execute)
- e1.execute(select([1]))
- e1.execute(select([1]).compile(dialect=e1.dialect).statement)
- e1.execute(select([1]).compile(dialect=e1.dialect))
- e1._execute_compiled(select([1]).compile(dialect=e1.dialect), (), {})
+ with e1.connect() as conn:
+ conn.execute(select([1]))
+ conn.execute(select([1]).compile(dialect=e1.dialect).statement)
+ conn.execute(select([1]).compile(dialect=e1.dialect))
+
+ conn._execute_compiled(
+ select([1]).compile(dialect=e1.dialect), (), {}
+ )
- @testing.fails_on("firebird", "Data type unknown")
def test_execute_events(self):
stmts = []
cursor_stmts = []
- def execute(conn, clauseelement, multiparams, params):
+ def execute(
+ conn, clauseelement, multiparams, params, execution_options
+ ):
stmts.append((str(clauseelement), params, multiparams))
def cursor_execute(
):
cursor_stmts.append((str(statement), parameters, None))
+ # TODO: this test is kind of a mess
+
for engine in [
engines.testing_engine(options=dict(implicit_returning=False)),
engines.testing_engine(
primary_key=True,
),
)
- m.create_all()
+
+ if isinstance(engine, Connection) and engine._is_future:
+ ctx = None
+ conn = engine
+ elif engine._is_future:
+ ctx = conn = engine.connect()
+ else:
+ ctx = None
+ conn = engine
+
try:
- t1.insert().execute(c1=5, c2="some data")
- t1.insert().execute(c1=6)
- eq_(
- engine.execute(text("select * from t1")).fetchall(),
- [(5, "some data"), (6, "foo")],
- )
+ m.create_all(conn, checkfirst=False)
+ try:
+ conn.execute(t1.insert(), dict(c1=5, c2="some data"))
+ conn.execute(t1.insert(), dict(c1=6))
+ eq_(
+ conn.execute(text("select * from t1")).fetchall(),
+ [(5, "some data"), (6, "foo")],
+ )
+ finally:
+ m.drop_all(conn)
+ if engine._is_future:
+ conn.commit()
finally:
- m.drop_all()
-
- compiled = [
- ("CREATE TABLE t1", {}, None),
- (
- "INSERT INTO t1 (c1, c2)",
- {"c2": "some data", "c1": 5},
- None,
- ),
- ("INSERT INTO t1 (c1, c2)", {"c1": 6}, None),
- ("select * from t1", {}, None),
- ("DROP TABLE t1", {}, None),
- ]
+ if ctx:
+ ctx.close()
+
+ if engine._is_future:
+ compiled = [
+ ("CREATE TABLE t1", {}, None),
+ (
+ "INSERT INTO t1 (c1, c2)",
+ {"c2": "some data", "c1": 5},
+ None,
+ ),
+ ("INSERT INTO t1 (c1, c2)", {"c1": 6}, None),
+ ("select * from t1", {}, None),
+ ("DROP TABLE t1", {}, None),
+ ]
+ else:
+ compiled = [
+ ("CREATE TABLE t1", {}, None),
+ (
+ "INSERT INTO t1 (c1, c2)",
+ {},
+ ({"c2": "some data", "c1": 5},),
+ ),
+ ("INSERT INTO t1 (c1, c2)", {}, ({"c1": 6},)),
+ ("select * from t1", {}, None),
+ ("DROP TABLE t1", {}, None),
+ ]
cursor = [
("CREATE TABLE t1", {}, ()),
event.listen(eng, "before_execute", l2)
event.listen(eng1, "before_execute", l3)
- eng.execute(select([1])).close()
+ with eng.connect() as conn:
+ conn.execute(select([1]))
eq_(canary, ["l1", "l2"])
- eng1.execute(select([1])).close()
+ with eng1.connect() as conn:
+ conn.execute(select([1]))
eq_(canary, ["l1", "l2", "l3", "l1", "l2"])
event.listen(eng, "before_execute", l3)
event.listen(eng1, "before_execute", l4)
- eng.execute(select([1])).close()
+ with eng.connect() as conn:
+ conn.execute(select([1]))
eq_(canary, ["l1", "l2", "l3"])
- eng1.execute(select([1])).close()
+ with eng1.connect() as conn:
+ conn.execute(select([1]))
eq_(canary, ["l1", "l2", "l3", "l4", "l1", "l2", "l3"])
event.remove(eng1, "before_execute", l4)
event.remove(eng, "before_execute", l3)
- eng1.execute(select([1])).close()
+ with eng1.connect() as conn:
+ conn.execute(select([1]))
eq_(canary, ["l2"])
@testing.requires.ad_hoc_engines
return go
- def execute(conn, clauseelement, multiparams, params):
+ def execute(
+ conn, clauseelement, multiparams, params, execution_options
+ ):
canary.append("execute")
return clauseelement, multiparams, params
event.listen(
engine, "before_cursor_execute", cursor_execute, retval=True
)
- engine.execute(select([1]))
+ with engine.connect() as conn:
+ conn.execute(select([1]))
eq_(canary, ["execute", "cursor_execute"])
+ @testing.requires.legacy_engine
def test_engine_connect(self):
engine = engines.testing_engine()
("begin", set(["conn"])),
(
"execute",
- set(["conn", "clauseelement", "multiparams", "params"]),
+ set(
+ [
+ "conn",
+ "clauseelement",
+ "multiparams",
+ "params",
+ "execution_options",
+ ]
+ ),
),
(
"cursor_execute",
("begin", set(["conn"])),
(
"execute",
- set(["conn", "clauseelement", "multiparams", "params"]),
+ set(
+ [
+ "conn",
+ "clauseelement",
+ "multiparams",
+ "params",
+ "execution_options",
+ ]
+ ),
),
(
"cursor_execute",
)
+class FutureEngineEventsTest(fixtures.FutureEngineMixin, EngineEventsTest):
+ pass
+
+
class HandleErrorTest(fixtures.TestBase):
__requires__ = ("ad_hoc_engines",)
__backend__ = True
stmt = "insert into table foo"
params = {"foo": "bar"}
ctx = dialect.execution_ctx_cls._init_statement(
- dialect, conn, conn.connection, stmt, [params]
+ dialect, conn, conn.connection, {}, stmt, [params],
)
conn._cursor_execute(ctx.cursor, stmt, params, ctx)
def test_select(self):
self._test_keyword("SELECT foo FROM table", False)
+
+
+class FutureExecuteTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "users",
+ metadata,
+ Column("user_id", INT, primary_key=True, autoincrement=False),
+ Column("user_name", VARCHAR(20)),
+ test_needs_acid=True,
+ )
+ Table(
+ "users_autoinc",
+ metadata,
+ Column(
+ "user_id", INT, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("user_name", VARCHAR(20)),
+ test_needs_acid=True,
+ )
+
+ @testing.combinations(
+ ({}, {}, {}),
+ ({"a": "b"}, {}, {"a": "b"}),
+ ({"a": "b", "d": "e"}, {"a": "c"}, {"a": "c", "d": "e"}),
+ argnames="conn_opts, exec_opts, expected",
+ )
+ def test_execution_opts_per_invoke(
+ self, connection, conn_opts, exec_opts, expected
+ ):
+ opts = []
+
+ @event.listens_for(connection, "before_cursor_execute")
+ def before_cursor_execute(
+ conn, cursor, statement, parameters, context, executemany
+ ):
+ opts.append(context.execution_options)
+
+ if conn_opts:
+ connection = connection.execution_options(**conn_opts)
+
+ if exec_opts:
+ connection.execute(select([1]), execution_options=exec_opts)
+ else:
+ connection.execute(select([1]))
+
+ eq_(opts, [expected])
+
+ def test_execution_opts_invoke_illegal(self, connection):
+ assert_raises_message(
+ tsa.exc.InvalidRequestError,
+ "The 'isolation_level' execution option is not supported "
+ "at the per-statement level",
+ connection.execute,
+ select([1]),
+ execution_options={"isolation_level": "AUTOCOMMIT"},
+ )
+
+ assert_raises_message(
+ tsa.exc.InvalidRequestError,
+ "The 'schema_translate_map' execution option is not supported "
+ "at the per-statement level",
+ connection.execute,
+ select([1]),
+ execution_options={"schema_translate_map": {}},
+ )
+
+ def test_no_branching(self, connection):
+ assert_raises_message(
+ NotImplementedError,
+ "sqlalchemy.future.Connection does not support "
+ "'branching' of new connections.",
+ connection.connect,
+ )
assert self.buf.buffer
for name in [b.name for b in self.buf.buffer]:
assert name in (
- "sqlalchemy.engine.base.Engine.%s" % eng_name,
+ "sqlalchemy.engine.Engine.%s" % eng_name,
"sqlalchemy.pool.impl.%s.%s"
% (eng.pool.__class__.__name__, pool_name),
)
assert self.buf.buffer
for name in [b.name for b in self.buf.buffer]:
assert name in (
- "sqlalchemy.engine.base.Engine",
+ "sqlalchemy.engine.Engine",
"sqlalchemy.pool.impl.%s" % eng.pool.__class__.__name__,
)
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import VARCHAR
+from sqlalchemy.future import select as future_select
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import mock
from sqlalchemy.testing import ne_
from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
-
users, metadata = None, None
conn.get_isolation_level(), self._non_default_isolation_level()
)
eq_(c2.get_isolation_level(), self._non_default_isolation_level())
+
+
+class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
+ """The SQLAlchemy 2.0 Connection ensures its own transaction is rolled
+ back upon close. Therefore the whole "reset agent" thing can go away.
+ this suite runs through all the reset agent tests to ensure the state
+ of the transaction is maintained while the "reset agent" feature is not
+ needed at all.
+
+ """
+
+ __backend__ = True
+
+ def test_begin_close(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary)
+ trans = connection.begin()
+ assert connection.connection._reset_agent is None
+ assert not trans.is_active
+ eq_(canary.mock_calls, [mock.call(connection)])
+
+ def test_begin_rollback(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary)
+ trans = connection.begin()
+ assert connection.connection._reset_agent is None
+ trans.rollback()
+ assert connection.connection._reset_agent is None
+ assert not trans.is_active
+ eq_(canary.mock_calls, [mock.call(connection)])
+
+ def test_begin_commit(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary.rollback)
+ event.listen(connection, "commit", canary.commit)
+ trans = connection.begin()
+ assert connection.connection._reset_agent is None
+ trans.commit()
+ assert connection.connection._reset_agent is None
+ assert not trans.is_active
+ eq_(canary.mock_calls, [mock.call.commit(connection)])
+
+ @testing.requires.savepoints
+ def test_begin_nested_close(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary.rollback)
+ event.listen(connection, "commit", canary.commit)
+ trans = connection.begin_nested()
+ assert connection.connection._reset_agent is None
+ assert trans.is_active # it's a savepoint
+ eq_(canary.mock_calls, [mock.call.rollback(connection)])
+
+ @testing.requires.savepoints
+ def test_begin_begin_nested_close(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary.rollback)
+ event.listen(connection, "commit", canary.commit)
+ trans = connection.begin()
+ trans2 = connection.begin_nested()
+ assert connection.connection._reset_agent is None
+ assert trans2.is_active # was never closed
+ assert not trans.is_active
+ eq_(canary.mock_calls, [mock.call.rollback(connection)])
+
+ @testing.requires.savepoints
+ def test_begin_begin_nested_rollback_commit(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(
+ connection, "rollback_savepoint", canary.rollback_savepoint
+ )
+ event.listen(connection, "rollback", canary.rollback)
+ event.listen(connection, "commit", canary.commit)
+ trans = connection.begin()
+ trans2 = connection.begin_nested()
+ assert connection.connection._reset_agent is None
+ trans2.rollback() # this is not a connection level event
+ assert connection.connection._reset_agent is None
+ trans.commit()
+ assert connection.connection._reset_agent is None
+ eq_(
+ canary.mock_calls,
+ [
+ mock.call.rollback_savepoint(connection, mock.ANY, trans),
+ mock.call.commit(connection),
+ ],
+ )
+
+ @testing.requires.savepoints
+ def test_begin_begin_nested_rollback_rollback(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary.rollback)
+ event.listen(connection, "commit", canary.commit)
+ trans = connection.begin()
+ trans2 = connection.begin_nested()
+ assert connection.connection._reset_agent is None
+ trans2.rollback()
+ assert connection.connection._reset_agent is None
+ trans.rollback()
+ assert connection.connection._reset_agent is None
+ eq_(canary.mock_calls, [mock.call.rollback(connection)])
+
+ @testing.requires.two_phase_transactions
+ def test_reset_via_agent_begin_twophase(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary.rollback)
+ event.listen(
+ connection, "rollback_twophase", canary.rollback_twophase
+ )
+ event.listen(connection, "commit", canary.commit)
+ trans = connection.begin_twophase()
+ assert connection.connection._reset_agent is None
+ assert not trans.is_active
+ eq_(
+ canary.mock_calls,
+ [mock.call.rollback_twophase(connection, mock.ANY, False)],
+ )
+
+ @testing.requires.two_phase_transactions
+ def test_reset_via_agent_begin_twophase_commit(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary.rollback)
+ event.listen(connection, "commit", canary.commit)
+ event.listen(connection, "commit_twophase", canary.commit_twophase)
+ trans = connection.begin_twophase()
+ assert connection.connection._reset_agent is None
+ trans.commit()
+ assert connection.connection._reset_agent is None
+ eq_(
+ canary.mock_calls,
+ [mock.call.commit_twophase(connection, mock.ANY, False)],
+ )
+
+ @testing.requires.two_phase_transactions
+ def test_reset_via_agent_begin_twophase_rollback(self):
+ canary = mock.Mock()
+ with testing.db.connect() as connection:
+ event.listen(connection, "rollback", canary.rollback)
+ event.listen(
+ connection, "rollback_twophase", canary.rollback_twophase
+ )
+ event.listen(connection, "commit", canary.commit)
+ trans = connection.begin_twophase()
+ assert connection.connection._reset_agent is None
+ trans.rollback()
+ assert connection.connection._reset_agent is None
+ eq_(
+ canary.mock_calls,
+ [mock.call.rollback_twophase(connection, mock.ANY, False)],
+ )
+
+
+class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "users",
+ metadata,
+ Column("user_id", INT, primary_key=True, autoincrement=False),
+ Column("user_name", VARCHAR(20)),
+ test_needs_acid=True,
+ )
+ Table(
+ "users_autoinc",
+ metadata,
+ Column(
+ "user_id", INT, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("user_name", VARCHAR(20)),
+ test_needs_acid=True,
+ )
+
+ def test_autobegin_rollback(self):
+ users = self.tables.users
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ conn.rollback()
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)), 0
+ )
+
+ @testing.requires.autocommit
+ def test_autocommit_isolation_level(self):
+ users = self.tables.users
+
+ with testing.db.connect().execution_options(
+ isolation_level="AUTOCOMMIT"
+ ) as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ conn.rollback()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ @testing.requires.autocommit
+ def test_no_autocommit_w_begin(self):
+
+ with testing.db.begin() as conn:
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This connection has already begun a transaction; "
+ "isolation level may not be altered until transaction end",
+ conn.execution_options,
+ isolation_level="AUTOCOMMIT",
+ )
+
+ @testing.requires.autocommit
+ def test_no_autocommit_w_autobegin(self):
+
+ with testing.db.connect() as conn:
+ conn.execute(future_select(1))
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This connection has already begun a transaction; "
+ "isolation level may not be altered until transaction end",
+ conn.execution_options,
+ isolation_level="AUTOCOMMIT",
+ )
+
+ conn.rollback()
+
+ conn.execution_options(isolation_level="AUTOCOMMIT")
+
+ def test_autobegin_commit(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+
+ assert not conn.in_transaction()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ assert conn.in_transaction()
+ conn.commit()
+
+ assert not conn.in_transaction()
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name 2"})
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 2,
+ )
+
+ assert conn.in_transaction()
+ conn.rollback()
+ assert not conn.in_transaction()
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ def test_rollback_on_close(self):
+ canary = mock.Mock()
+ with testing.db.connect() as conn:
+ event.listen(conn, "rollback", canary)
+ conn.execute(select([1]))
+ assert conn.in_transaction()
+
+ eq_(canary.mock_calls, [mock.call(conn)])
+
+ def test_no_on_close_no_transaction(self):
+ canary = mock.Mock()
+ with testing.db.connect() as conn:
+ event.listen(conn, "rollback", canary)
+ conn.execute(select([1]))
+ conn.rollback()
+ assert not conn.in_transaction()
+
+ eq_(canary.mock_calls, [mock.call(conn)])
+
+ def test_rollback_on_exception(self):
+ canary = mock.Mock()
+ try:
+ with testing.db.connect() as conn:
+ event.listen(conn, "rollback", canary)
+ conn.execute(select([1]))
+ assert conn.in_transaction()
+ raise Exception("some error")
+ assert False
+ except:
+ pass
+
+ eq_(canary.mock_calls, [mock.call(conn)])
+
+ def test_rollback_on_exception_if_no_trans(self):
+ canary = mock.Mock()
+ try:
+ with testing.db.connect() as conn:
+ event.listen(conn, "rollback", canary)
+ assert not conn.in_transaction()
+ raise Exception("some error")
+ assert False
+ except:
+ pass
+
+ eq_(canary.mock_calls, [])
+
+ def test_commit_no_begin(self):
+ with testing.db.connect() as conn:
+ assert not conn.in_transaction()
+ conn.commit()
+
+ @testing.requires.independent_connections
+ def test_commit_inactive(self):
+ with testing.db.connect() as conn:
+ conn.begin()
+ conn.invalidate()
+
+ assert_raises_message(
+ exc.InvalidRequestError, "Can't reconnect until", conn.commit
+ )
+
+ @testing.requires.independent_connections
+ def test_rollback_inactive(self):
+ users = self.tables.users
+ with testing.db.connect() as conn:
+
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ conn.commit()
+
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ conn.invalidate()
+
+ assert_raises_message(
+ exc.StatementError,
+ "Can't reconnect",
+ conn.execute,
+ select([1]),
+ )
+
+ conn.rollback()
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ def test_rollback_no_begin(self):
+ with testing.db.connect() as conn:
+ assert not conn.in_transaction()
+ conn.rollback()
+
+ def test_rollback_end_ctx_manager(self):
+ with testing.db.begin() as conn:
+ assert conn.in_transaction()
+ conn.rollback()
+
+ def test_explicit_begin(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+ assert not conn.in_transaction()
+ conn.begin()
+ assert conn.in_transaction()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+ conn.commit()
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ def test_no_double_begin(self):
+ with testing.db.connect() as conn:
+ conn.begin()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "a transaction is already begun for this connection",
+ conn.begin,
+ )
+
+ def test_no_autocommit(self):
+ users = self.tables.users
+
+ with testing.db.connect() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 0,
+ )
+
+ def test_begin_block(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_one(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ savepoint = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 2,
+ )
+ savepoint.rollback()
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_two(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ savepoint = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 2,
+ )
+ savepoint.commit()
+
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 2,
+ )
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 2,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_three(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ conn.rollback()
+
+ assert not conn.in_transaction()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 0,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_four(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+
+ sp2.rollback()
+
+ assert conn.in_transaction()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 2,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_five(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+
+ sp2.commit()
+
+ assert conn.in_transaction()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 3,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_six(self):
+ users = self.tables.users
+
+ with testing.db.begin() as conn:
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ sp1 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+
+ sp2.commit()
+
+ sp1.rollback()
+
+ assert conn.in_transaction()
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 1,
+ )
with testing.db.connect() as conn:
@event.listens_for(conn, "before_execute")
- def before_execute(conn, clauseelement, multiparams, params):
+ def before_execute(
+ conn, clauseelement, multiparams, params, execution_options
+ ):
assert "yes" in conn._execution_options
bq = self.bakery(lambda s: s.query(User.id).order_by(User.id))
import sqlalchemy as sa
from sqlalchemy import and_
from sqlalchemy import desc
+from sqlalchemy import event
from sqlalchemy import func
from sqlalchemy import Integer
from sqlalchemy import select
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
from sqlalchemy.testing import is_true
+from sqlalchemy.testing.mock import call
+from sqlalchemy.testing.mock import Mock
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from . import _fixtures
from .inheritance import _poly_fixtures
+from .test_events import _RemoveListeners
from .test_options import PathTest as OptionsPathTest
from .test_query import QueryTest
"addresses_email_address FROM users, addresses "
"ORDER BY users.id, users.name, addresses.email_address",
)
+
+
+class SessionEventsTest(_RemoveListeners, _fixtures.FixtureTest):
+ run_inserts = None
+
+ def test_on_bulk_update_hook(self):
+ User, users = self.classes.User, self.tables.users
+
+ sess = Session()
+ canary = Mock()
+
+ event.listen(sess, "after_bulk_update", canary.after_bulk_update)
+
+ def legacy(ses, qry, ctx, res):
+ canary.after_bulk_update_legacy(ses, qry, ctx, res)
+
+ event.listen(sess, "after_bulk_update", legacy)
+
+ mapper(User, users)
+
+ with testing.expect_deprecated(
+ 'The argument signature for the "SessionEvents.after_bulk_update" '
+ "event listener"
+ ):
+ sess.query(User).update({"name": "foo"})
+
+ eq_(canary.after_bulk_update.call_count, 1)
+
+ upd = canary.after_bulk_update.mock_calls[0][1][0]
+ eq_(upd.session, sess)
+ eq_(
+ canary.after_bulk_update_legacy.mock_calls,
+ [call(sess, upd.query, upd.context, upd.result)],
+ )
+
+ def test_on_bulk_delete_hook(self):
+ User, users = self.classes.User, self.tables.users
+
+ sess = Session()
+ canary = Mock()
+
+ event.listen(sess, "after_bulk_delete", canary.after_bulk_delete)
+
+ def legacy(ses, qry, ctx, res):
+ canary.after_bulk_delete_legacy(ses, qry, ctx, res)
+
+ event.listen(sess, "after_bulk_delete", legacy)
+
+ mapper(User, users)
+
+ with testing.expect_deprecated(
+ 'The argument signature for the "SessionEvents.after_bulk_delete" '
+ "event listener"
+ ):
+ sess.query(User).delete()
+
+ eq_(canary.after_bulk_delete.call_count, 1)
+
+ upd = canary.after_bulk_delete.mock_calls[0][1][0]
+ eq_(upd.session, sess)
+ eq_(
+ canary.after_bulk_delete_legacy.mock_calls,
+ [call(sess, upd.query, upd.context, upd.result)],
+ )
event.listen(sess, "after_begin", canary.after_begin)
event.listen(sess, "after_bulk_update", canary.after_bulk_update)
- def legacy(ses, qry, ctx, res):
- canary.after_bulk_update_legacy(ses, qry, ctx, res)
-
- event.listen(sess, "after_bulk_update", legacy)
-
mapper(User, users)
sess.query(User).update({"name": "foo"})
upd = canary.after_bulk_update.mock_calls[0][1][0]
eq_(upd.session, sess)
- eq_(
- canary.after_bulk_update_legacy.mock_calls,
- [call(sess, upd.query, upd.context, upd.result)],
- )
def test_on_bulk_delete_hook(self):
User, users = self.classes.User, self.tables.users
event.listen(sess, "after_begin", canary.after_begin)
event.listen(sess, "after_bulk_delete", canary.after_bulk_delete)
- def legacy(ses, qry, ctx, res):
- canary.after_bulk_delete_legacy(ses, qry, ctx, res)
-
- event.listen(sess, "after_bulk_delete", legacy)
-
mapper(User, users)
sess.query(User).delete()
upd = canary.after_bulk_delete.mock_calls[0][1][0]
eq_(upd.session, sess)
- eq_(
- canary.after_bulk_delete_legacy.mock_calls,
- [call(sess, upd.query, upd.context, upd.result)],
- )
def test_connection_emits_after_begin(self):
sess, canary = self._listener_fixture(bind=testing.db)
-from __future__ import with_statement
-
from sqlalchemy import Column
from sqlalchemy import event
from sqlalchemy import exc as sa_exc
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
+from sqlalchemy.future import Engine
from sqlalchemy.orm import attributes
from sqlalchemy.orm import create_session
from sqlalchemy.orm import exc as orm_exc
c.close()
@engines.close_open_connections
- def test_subtransaction_on_external(self):
+ def test_subtransaction_on_external_subtrans(self):
users, User = self.tables.users, self.classes.User
mapper(User, users)
assert len(sess.query(User).all()) == 0
sess.close()
+ @engines.close_open_connections
+ def test_subtransaction_on_external_no_begin(self):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+ conn = testing.db.connect()
+ trans = conn.begin()
+ sess = create_session(bind=conn, autocommit=False, autoflush=True)
+ u = User(name="ed")
+ sess.add(u)
+ sess.flush()
+ sess.commit() # commit does nothing
+ trans.rollback() # rolls back
+ assert len(sess.query(User).all()) == 0
+ sess.close()
+
@testing.requires.savepoints
@engines.close_open_connections
def test_external_nested_transaction(self):
conn.close()
raise
+ @engines.close_open_connections
+ def test_subtransaction_on_external_commit_future(self):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+
+ engine = Engine._future_facade(testing.db)
+
+ conn = engine.connect()
+ conn.begin()
+
+ sess = create_session(bind=conn, autocommit=False, autoflush=True)
+ u = User(name="ed")
+ sess.add(u)
+ sess.flush()
+ sess.commit() # commit does nothing
+ conn.rollback() # rolls back
+ assert len(sess.query(User).all()) == 0
+ sess.close()
+
+ @engines.close_open_connections
+ def test_subtransaction_on_external_rollback_future(self):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+
+ engine = Engine._future_facade(testing.db)
+
+ conn = engine.connect()
+ conn.begin()
+
+ sess = create_session(bind=conn, autocommit=False, autoflush=True)
+ u = User(name="ed")
+ sess.add(u)
+ sess.flush()
+ sess.rollback() # rolls back
+ conn.commit() # nothing to commit
+ assert len(sess.query(User).all()) == 0
+ sess.close()
+
+ @testing.requires.savepoints
+ @engines.close_open_connections
+ def test_savepoint_on_external_future(self):
+ users, User = self.tables.users, self.classes.User
+
+ mapper(User, users)
+
+ engine = Engine._future_facade(testing.db)
+
+ with engine.connect() as conn:
+ conn.begin()
+ sess = create_session(bind=conn, autocommit=False, autoflush=True)
+ u1 = User(name="u1")
+ sess.add(u1)
+ sess.flush()
+
+ sess.begin_nested()
+ u2 = User(name="u2")
+ sess.add(u2)
+ sess.flush()
+ sess.rollback()
+
+ conn.commit()
+ assert len(sess.query(User).all()) == 1
+
@testing.requires.savepoints
def test_nested_accounting_new_items_removed(self):
User, users = self.classes.User, self.tables.users
== 2
)
+ @testing.requires.savepoints
+ def test_heavy_nesting_future(self):
+ users = self.tables.users
+
+ engine = Engine._future_facade(testing.db)
+ session = create_session(engine)
+
+ session.begin()
+ session.connection().execute(users.insert().values(name="user1"))
+ session.begin(subtransactions=True)
+ session.begin_nested()
+ session.connection().execute(users.insert().values(name="user2"))
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 2
+ )
+ session.rollback()
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 1
+ )
+ session.connection().execute(users.insert().values(name="user3"))
+ session.commit()
+ assert (
+ session.connection()
+ .exec_driver_sql("select count(1) from users")
+ .scalar()
+ == 2
+ )
+
@testing.requires.savepoints
def test_dirty_state_transferred_deep_nesting(self):
User, users = self.classes.User, self.tables.users
return sess, u1
def test_execution_options_begin_transaction(self):
- bind = mock.Mock()
+ bind = mock.Mock(
+ connect=mock.Mock(
+ return_value=mock.Mock(
+ _is_future=False,
+ execution_options=mock.Mock(
+ return_value=mock.Mock(_is_future=False)
+ ),
+ )
+ )
+ )
sess = Session(bind=bind)
c1 = sess.connection(execution_options={"isolation_level": "FOO"})
+ eq_(bind.mock_calls, [mock.call.connect()])
eq_(
- bind.mock_calls,
- [
- mock.call.connect(),
- mock.call.connect().execution_options(isolation_level="FOO"),
- mock.call.connect().execution_options().begin(),
- ],
+ bind.connect().mock_calls,
+ [mock.call.execution_options(isolation_level="FOO")],
)
+ eq_(bind.connect().execution_options().mock_calls, [mock.call.begin()])
eq_(c1, bind.connect().execution_options())
def test_execution_options_ignored_mid_transaction(self):
with expect_warnings(".*during handling of a previous exception.*"):
session.begin_nested()
- savepoint = (
- session.connection()._Connection__transaction._savepoint
- )
+ savepoint = session.connection()._transaction._savepoint
# force the savepoint to disappear
session.connection().dialect.do_release_savepoint(
def mssql_freetds(self):
return only_on(["mssql+pymssql"])
+ @property
+ def legacy_engine(self):
+ return exclusions.skip_if(lambda config: config.db._is_future)
+
@property
def ad_hoc_engines(self):
return exclusions.skip_if(
result = connection.execute(t.select().order_by(t.c.col1))
today = datetime.date.today()
eq_(
- result.fetchall(),
+ list(result),
[
(
x,
"group 1",
connection.execute,
t.insert(),
- {"col4": 7, "col7": 12, "col8": 19},
- {"col4": 7, "col8": 19},
- {"col4": 7, "col7": 12, "col8": 19},
+ [
+ {"col4": 7, "col7": 12, "col8": 19},
+ {"col4": 7, "col8": 19},
+ {"col4": 7, "col7": 12, "col8": 19},
+ ],
)
def test_insert_values(self, connection):
eq_(55, row._mapping["col3"])
+class FutureDefaultRoundTripTest(
+ fixtures.FutureEngineMixin, DefaultRoundTripTest
+):
+
+ __backend__ = True
+
+
class CTEDefaultTest(fixtures.TablesTest):
__requires__ = ("ctes", "returning", "ctes_on_dml")
__backend__ = True
from sqlalchemy.engine import result as _result
from sqlalchemy.engine import Row
from sqlalchemy.ext.compiler import compiles
+from sqlalchemy.future import select as future_select
from sqlalchemy.sql import ColumnElement
from sqlalchemy.sql import expression
from sqlalchemy.sql.selectable import TextualSelect
le_(len(result.cursor_strategy._rowbuffer), max_size)
eq_(checks, assertion)
+
+
+class FutureResultTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "users",
+ metadata,
+ Column("user_id", INT, primary_key=True, autoincrement=False),
+ Column("user_name", VARCHAR(20)),
+ Column("x", Integer),
+ Column("y", Integer),
+ test_needs_acid=True,
+ )
+ Table(
+ "users_autoinc",
+ metadata,
+ Column(
+ "user_id", INT, primary_key=True, test_needs_autoincrement=True
+ ),
+ Column("user_name", VARCHAR(20)),
+ test_needs_acid=True,
+ )
+
+ def test_fetchall(self, connection):
+ users = self.tables.users
+ connection.execute(
+ users.insert(),
+ [
+ {"user_id": 7, "user_name": "jack", "x": 1, "y": 2},
+ {"user_id": 8, "user_name": "ed", "x": 2, "y": 3},
+ {"user_id": 9, "user_name": "fred", "x": 15, "y": 20},
+ ],
+ )
+
+ result = connection.execute(
+ future_select(users).order_by(users.c.user_id)
+ )
+ eq_(
+ result.all(),
+ [(7, "jack", 1, 2), (8, "ed", 2, 3), (9, "fred", 15, 20)],
+ )
+
+ @testing.combinations(
+ ((1, 0), [("jack", 7), ("ed", 8), ("fred", 9)]),
+ ((3,), [(2,), (3,), (20,)]),
+ ((-2, -1), [(1, 2), (2, 3), (15, 20)]),
+ argnames="columns, expected",
+ )
+ def test_columns(self, connection, columns, expected):
+ users = self.tables.users
+ connection.execute(
+ users.insert(),
+ [
+ {"user_id": 7, "user_name": "jack", "x": 1, "y": 2},
+ {"user_id": 8, "user_name": "ed", "x": 2, "y": 3},
+ {"user_id": 9, "user_name": "fred", "x": 15, "y": 20},
+ ],
+ )
+
+ result = connection.execute(
+ future_select(users).order_by(users.c.user_id)
+ )
+ eq_(result.columns(*columns).all(), expected)
+
+ def test_partitions(self, connection):
+ users = self.tables.users
+ connection.execute(
+ users.insert(),
+ [
+ {
+ "user_id": i,
+ "user_name": "user %s" % i,
+ "x": i * 5,
+ "y": i * 20,
+ }
+ for i in range(500)
+ ],
+ )
+
+ result = connection.execute(
+ future_select(users).order_by(users.c.user_id)
+ )
+
+ start = 0
+ for partition in result.columns(0, 1).partitions(20):
+ eq_(
+ partition,
+ [(i, "user %s" % i) for i in range(start, start + 20)],
+ )
+ start += 20
+
+ assert result._soft_closed
self._assert_seq_result(r.inserted_primary_key[0])
+class FutureSequenceExecTest(fixtures.FutureEngineMixin, SequenceExecTest):
+ __requires__ = ("sequences",)
+ __backend__ = True
+
+
class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
__requires__ = ("sequences",)
__backend__ = True
eq_(result.inserted_primary_key, [1])
+class FutureSequenceTest(fixtures.FutureEngineMixin, SequenceTest):
+ __requires__ = ("sequences",)
+ __backend__ = True
+
+
class TableBoundSequenceTest(fixtures.TablesTest):
__requires__ = ("sequences",)
__backend__ = True