def connect(self, **kwargs):
"""Return a :class:`.Connection` object.
-
+
Depending on context, this may be ``self`` if this object
is already an instance of :class:`.Connection`, or a newly
procured :class:`.Connection` if this object is an instance
of :class:`.Engine`.
-
+
"""
def contextual_connect(self):
is already an instance of :class:`.Connection`, or a newly
procured :class:`.Connection` if this object is an instance
of :class:`.Engine`.
-
+
"""
raise NotImplementedError()
c.__dict__ = self.__dict__.copy()
return c
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type, value, traceback):
+ self.close()
+
def execution_options(self, **opt):
""" Set non-SQL options for the connection which take effect
during execution.
:param compiled_cache: Available on: Connection.
A dictionary where :class:`.Compiled` objects
will be cached when the :class:`.Connection` compiles a clause
- expression into a :class:`.Compiled` object.
+ expression into a :class:`.Compiled` object.
It is the user's responsibility to
manage the size of this dictionary, which will have keys
corresponding to the dialect, clause element, the column
some operations, including flush operations. The caching
used by the ORM internally supersedes a cache dictionary
specified here.
-
+
:param isolation_level: Available on: Connection.
Set the transaction isolation level for
the lifespan of this connection. Valid values include
database specific, including those for :ref:`sqlite_toplevel`,
:ref:`postgresql_toplevel` - see those dialect's documentation
for further info.
-
+
Note that this option necessarily affects the underying
DBAPI connection for the lifespan of the originating
:class:`.Connection`, and is not per-execution. This
def begin(self):
"""Begin a transaction and return a transaction handle.
-
+
The returned object is an instance of :class:`.Transaction`.
+ This object represents the "scope" of the transaction,
+ which completes when either the :meth:`.Transaction.rollback`
+ or :meth:`.Transaction.commit` method is called.
+
+ Nested calls to :meth:`.begin` on the same :class:`.Connection`
+ will return new :class:`.Transaction` objects that represent
+ an emulated transaction within the scope of the enclosing
+ transaction, that is::
+
+ trans = conn.begin() # outermost transaction
+ trans2 = conn.begin() # "nested"
+ trans2.commit() # does nothing
+ trans.commit() # actually commits
+
+ Calls to :meth:`.Transaction.commit` only have an effect
+ when invoked via the outermost :class:`.Transaction` object, though the
+ :meth:`.Transaction.rollback` method of any of the
+ :class:`.Transaction` objects will roll back the
+ transaction.
- Repeated calls to ``begin`` on the same Connection will create
- a lightweight, emulated nested transaction. Only the
- outermost transaction may ``commit``. Calls to ``commit`` on
- inner transactions are ignored. Any transaction in the
- hierarchy may ``rollback``, however.
+ See also:
- See also :meth:`.Connection.begin_nested`,
- :meth:`.Connection.begin_twophase`.
+ :meth:`.Connection.begin_nested` - use a SAVEPOINT
+
+ :meth:`.Connection.begin_twophase` - use a two phase /XID transaction
+
+ :meth:`.Engine.begin` - context manager available from :class:`.Engine`.
"""
def begin_twophase(self, xid=None):
"""Begin a two-phase or XA transaction and return a transaction
handle.
-
+
The returned object is an instance of :class:`.TwoPhaseTransaction`,
which in addition to the methods provided by
:class:`.Transaction`, also provides a :meth:`~.TwoPhaseTransaction.prepare`
def close(self):
"""Close this :class:`.Connection`.
-
+
This results in a release of the underlying database
resources, that is, the DBAPI connection referenced
internally. The DBAPI connection is typically restored
the DBAPI connection's ``rollback()`` method, regardless
of any :class:`.Transaction` object that may be
outstanding with regards to this :class:`.Connection`.
-
+
After :meth:`~.Connection.close` is called, the
:class:`.Connection` is permanently in a closed state,
and will allow no further operations.
* a :class:`.DDLElement` object
* a :class:`.DefaultGenerator` object
* a :class:`.Compiled` object
-
+
:param \*multiparams/\**params: represent bound parameter
values to be used in the execution. Typically,
the format is either a collection of one or more
dictionaries passed to \*multiparams::
-
+
conn.execute(
table.insert(),
{"id":1, "value":"v1"},
{"id":2, "value":"v2"}
)
-
+
...or individual key/values interpreted by \**params::
-
+
conn.execute(
table.insert(), id=1, value="v1"
)
-
+
In the case that a plain SQL string is passed, and the underlying
DBAPI accepts positional bind parameters, a collection of tuples
or individual values in \*multiparams may be passed::
"INSERT INTO table (id, value) VALUES (?, ?)",
(1, "v1"), (2, "v2")
)
-
+
conn.execute(
"INSERT INTO table (id, value) VALUES (?, ?)",
1, "v1"
)
-
+
Note above, the usage of a question mark "?" or other
symbol is contingent upon the "paramstyle" accepted by the DBAPI
in use, which may be any of "qmark", "named", "pyformat", "format",
"numeric". See `pep-249 <http://www.python.org/dev/peps/pep-0249/>`_
for details on paramstyle.
-
+
To execute a textual SQL statement which uses bound parameters in a
DBAPI-agnostic way, use the :func:`~.expression.text` construct.
-
+
"""
for c in type(object).__mro__:
if c in Connection.executors:
"""Execute the given function within a transaction boundary.
The function is passed this :class:`.Connection`
- as the first argument, followed by the given \*args and \**kwargs.
-
- This is a shortcut for explicitly invoking
- :meth:`.Connection.begin`, calling :meth:`.Transaction.commit`
- upon success or :meth:`.Transaction.rollback` upon an
- exception raise::
+ as the first argument, followed by the given \*args and \**kwargs,
+ e.g.::
def do_something(conn, x, y):
conn.execute("some statement", {'x':x, 'y':y})
-
+
conn.transaction(do_something, 5, 10)
+
+ The operations inside the function are all invoked within the
+ context of a single :class:`.Transaction`.
+ Upon success, the transaction is committed. If an
+ exception is raised, the transaction is rolled back
+ before propagating the exception.
+
+ .. note::
+
+ The :meth:`.transaction` method is superseded by
+ the usage of the Python ``with:`` statement, which can
+ be used with :meth:`.Connection.begin`::
- Note that context managers (i.e. the ``with`` statement)
- present a more modern way of accomplishing the above,
- using the :class:`.Transaction` object as a base::
+ with conn.begin():
+ conn.execute("some statement", {'x':5, 'y':10})
+
+ As well as with :meth:`.Engine.begin`::
+
+ with engine.begin() as conn:
+ conn.execute("some statement", {'x':5, 'y':10})
- with conn.begin():
- conn.execute("some statement", {'x':5, 'y':10})
-
- One advantage to the :meth:`.Connection.transaction`
- method is that the same method is also available
- on :class:`.Engine` as :meth:`.Engine.transaction` -
- this method procures a :class:`.Connection` and then
- performs the same operation, allowing equivalent
- usage with either a :class:`.Connection` or :class:`.Engine`
- without needing to know what kind of object
- it is.
+ See also:
+
+ :meth:`.Engine.begin` - engine-level transactional
+ context
+
+ :meth:`.Engine.transaction` - engine-level version of
+ :meth:`.Connection.transaction`
"""
def run_callable(self, callable_, *args, **kwargs):
"""Given a callable object or function, execute it, passing
a :class:`.Connection` as the first argument.
-
+
The given \*args and \**kwargs are passed subsequent
to the :class:`.Connection` argument.
-
+
This function, along with :meth:`.Engine.run_callable`,
allows a function to be run with a :class:`.Connection`
or :class:`.Engine` object without the need to know
which one is being dealt with.
-
+
"""
return callable_(self, *args, **kwargs)
class Transaction(object):
"""Represent a database transaction in progress.
-
+
The :class:`.Transaction` object is procured by
calling the :meth:`~.Connection.begin` method of
:class:`.Connection`::
-
+
from sqlalchemy import create_engine
engine = create_engine("postgresql://scott:tiger@localhost/test")
connection = engine.connect()
also implements a context manager interface so that
the Python ``with`` statement can be used with the
:meth:`.Connection.begin` method::
-
+
with connection.begin():
connection.execute("insert into x (a, b) values (1, 2)")
See also: :meth:`.Connection.begin`, :meth:`.Connection.begin_twophase`,
:meth:`.Connection.begin_nested`.
-
+
.. index::
single: thread safety; Transaction
"""
A new :class:`.NestedTransaction` object may be procured
using the :meth:`.Connection.begin_nested` method.
-
+
The interface is the same as that of :class:`.Transaction`.
-
+
"""
def __init__(self, connection, parent):
super(NestedTransaction, self).__init__(connection, parent)
class TwoPhaseTransaction(Transaction):
"""Represent a two-phase transaction.
-
+
A new :class:`.TwoPhaseTransaction` object may be procured
using the :meth:`.Connection.begin_twophase` method.
-
+
The interface is the same as that of :class:`.Transaction`
with the addition of the :meth:`prepare` method.
-
+
"""
def __init__(self, connection, xid):
super(TwoPhaseTransaction, self).__init__(connection, None)
def prepare(self):
"""Prepare this :class:`.TwoPhaseTransaction`.
-
+
After a PREPARE, the transaction can be committed.
-
+
"""
if not self._parent.is_active:
raise exc.InvalidRequestError("This transaction is inactive")
:func:`~sqlalchemy.create_engine` function.
See also:
-
+
:ref:`engines_toplevel`
:ref:`connections_toplevel`
-
+
"""
_execution_options = util.immutabledict()
def update_execution_options(self, **opt):
"""Update the default execution_options dictionary
of this :class:`.Engine`.
-
+
The given keys/values in \**opt are added to the
default execution options that will be used for
all connections. The initial contents of this dictionary
can be sent via the ``execution_options`` paramter
to :func:`.create_engine`.
-
+
See :meth:`.Connection.execution_options` for more
details on execution options.
if connection is None:
conn.close()
+ class _trans_ctx(object):
+ def __init__(self, conn, transaction, close_with_result):
+ self.conn = conn
+ self.transaction = transaction
+ self.close_with_result = close_with_result
+
+ def __enter__(self):
+ return self.conn
+
+ def __exit__(self, type, value, traceback):
+ if type is not None:
+ self.transaction.rollback()
+ else:
+ self.transaction.commit()
+ if not self.close_with_result:
+ self.conn.close()
+
+ def begin(self, close_with_result=False):
+ """Return a context manager delivering a :class:`.Connection`
+ with a :class:`.Transaction` established.
+
+ E.g.::
+
+ with engine.begin() as conn:
+ conn.execute("insert into table (x, y, z) values (1, 2, 3)")
+ conn.execute("my_special_procedure(5)")
+
+ Upon successful operation, the :class:`.Transaction`
+ is committed. If an error is raised, the :class:`.Transaction`
+ is rolled back.
+
+ The ``close_with_result`` flag is normally ``False``, and indicates
+ that the :class:`.Connection` will be closed when the operation
+ is complete. When set to ``True``, it indicates the :class:`.Connection`
+ is in "single use" mode, where the :class:`.ResultProxy`
+ returned by the first call to :meth:`.Connection.execute` will
+ close the :class:`.Connection` when that :class:`.ResultProxy`
+ has exhausted all result rows.
+
+ New in 0.7.6.
+
+ See also:
+
+ :meth:`.Engine.connect` - procure a :class:`.Connection` from
+ an :class:`.Engine`.
+
+ :meth:`.Connection.begin` - start a :class:`.Transaction`
+ for a particular :class:`.Connection`.
+
+ """
+ conn = self.contextual_connect(close_with_result=close_with_result)
+ trans = conn.begin()
+ return Engine._trans_ctx(conn, trans, close_with_result)
+
def transaction(self, callable_, *args, **kwargs):
"""Execute the given function within a transaction boundary.
- The function is passed a newly procured
- :class:`.Connection` as the first argument, followed by
- the given \*args and \**kwargs. The :class:`.Connection`
- is then closed (returned to the pool) when the operation
- is complete.
+ The function is passed a :class:`.Connection` newly procured
+ from :meth:`.Engine.contextual_connect` as the first argument,
+ followed by the given \*args and \**kwargs.
- This method can be used interchangeably with
- :meth:`.Connection.transaction`. See that method for
- more details on usage as well as a modern alternative
- using context managers (i.e. the ``with`` statement).
+ e.g.::
+
+ def do_something(conn, x, y):
+ conn.execute("some statement", {'x':x, 'y':y})
+
+ engine.transaction(do_something, 5, 10)
+
+ The operations inside the function are all invoked within the
+ context of a single :class:`.Transaction`.
+ Upon success, the transaction is committed. If an
+ exception is raised, the transaction is rolled back
+ before propagating the exception.
+
+ .. note::
+
+ The :meth:`.transaction` method is superseded by
+ the usage of the Python ``with:`` statement, which can
+ be used with :meth:`.Engine.begin`::
+
+ with engine.begin() as conn:
+ conn.execute("some statement", {'x':5, 'y':10})
+
+ See also:
+
+ :meth:`.Engine.begin` - engine-level transactional
+ context
+
+ :meth:`.Connection.transaction` - connection-level version of
+ :meth:`.Engine.transaction`
"""
def run_callable(self, callable_, *args, **kwargs):
"""Given a callable object or function, execute it, passing
a :class:`.Connection` as the first argument.
-
+
The given \*args and \**kwargs are passed subsequent
to the :class:`.Connection` argument.
-
+
This function, along with :meth:`.Connection.run_callable`,
allows a function to be run with a :class:`.Connection`
or :class:`.Engine` object without the need to know
which one is being dealt with.
-
+
"""
conn = self.contextual_connect()
try:
def raw_connection(self):
"""Return a "raw" DBAPI connection from the connection pool.
-
+
The returned object is a proxied version of the DBAPI
connection object used by the underlying driver in use.
The object will have all the same behavior as the real DBAPI
connection, except that its ``close()`` method will result in the
connection being returned to the pool, rather than being closed
for real.
-
+
This method provides direct DBAPI connection access for
special situations. In most situations, the :class:`.Connection`
object should be used, which is procured using the
:meth:`.Engine.connect` method.
-
+
"""
return self.pool.unique_connection()
if key._label and key._label.lower() in map:
result = map[key._label.lower()]
elif hasattr(key, 'name') and key.name.lower() in map:
- # match is only on name.
+ # match is only on name.
result = map[key.name.lower()]
# search extra hard to make sure this
# isn't a column/label name overlap.
@property
def returns_rows(self):
"""True if this :class:`.ResultProxy` returns rows.
-
+
I.e. if it is legal to call the methods
:meth:`~.ResultProxy.fetchone`,
:meth:`~.ResultProxy.fetchmany`
"""True if this :class:`.ResultProxy` is the result
of a executing an expression language compiled
:func:`.expression.insert` construct.
-
+
When True, this implies that the
:attr:`inserted_primary_key` attribute is accessible,
assuming the statement did not include
a user defined "returning" construct.
-
+
"""
return self.context.isinsert
@util.memoized_property
def inserted_primary_key(self):
"""Return the primary key for the row just inserted.
-
+
The return value is a list of scalar values
corresponding to the list of primary key columns
in the target table.
This only applies to single row :func:`.insert`
constructs which did not explicitly specify
:meth:`.Insert.returning`.
-
+
Note that primary key columns which specify a
server_default clause,
or otherwise do not qualify as "autoincrement"
assert eng.dialect.returns_unicode_strings in (True, False)
eng.dispose()
+class ConvenienceExecuteTest(fixtures.TablesTest):
+ @classmethod
+ def define_tables(cls, metadata):
+ cls.table = Table('exec_test', metadata,
+ Column('a', Integer),
+ Column('b', Integer),
+ test_needs_acid=True
+ )
+
+ def _trans_fn(self, is_transaction=False):
+ def go(conn, x, value=None):
+ if is_transaction:
+ conn = conn.connection
+ conn.execute(self.table.insert().values(a=x, b=value))
+ return go
+
+ def _trans_rollback_fn(self, is_transaction=False):
+ def go(conn, x, value=None):
+ if is_transaction:
+ conn = conn.connection
+ conn.execute(self.table.insert().values(a=x, b=value))
+ raise Exception("breakage")
+ return go
+
+ def _assert_no_data(self):
+ eq_(
+ testing.db.scalar(self.table.count()), 0
+ )
+
+ def _assert_fn(self, x, value=None):
+ eq_(
+ testing.db.execute(self.table.select()).fetchall(),
+ [(x, value)]
+ )
+
+ def test_transaction_engine_ctx_commit(self):
+ fn = self._trans_fn()
+ ctx = testing.db.begin()
+ testing.run_as_contextmanager(ctx, fn, 5, value=8)
+ self._assert_fn(5, value=8)
+
+ def test_transaction_engine_ctx_rollback(self):
+ fn = self._trans_rollback_fn()
+ ctx = testing.db.begin()
+ assert_raises_message(
+ Exception,
+ "breakage",
+ testing.run_as_contextmanager, ctx, fn, 5, value=8
+ )
+ self._assert_no_data()
+
+ def test_transaction_tlocal_engine_ctx_commit(self):
+ fn = self._trans_fn()
+ engine = engines.testing_engine(options=dict(
+ strategy='threadlocal',
+ pool=testing.db.pool))
+ ctx = engine.begin()
+ testing.run_as_contextmanager(ctx, fn, 5, value=8)
+ self._assert_fn(5, value=8)
+
+ def test_transaction_tlocal_engine_ctx_rollback(self):
+ fn = self._trans_rollback_fn()
+ engine = engines.testing_engine(options=dict(
+ strategy='threadlocal',
+ pool=testing.db.pool))
+ ctx = engine.begin()
+ assert_raises_message(
+ Exception,
+ "breakage",
+ testing.run_as_contextmanager, ctx, fn, 5, value=8
+ )
+ self._assert_no_data()
+
+ def test_transaction_connection_ctx_commit(self):
+ fn = self._trans_fn(True)
+ conn = testing.db.connect()
+ ctx = conn.begin()
+ testing.run_as_contextmanager(ctx, fn, 5, value=8)
+ self._assert_fn(5, value=8)
+
+ def test_transaction_connection_ctx_rollback(self):
+ fn = self._trans_rollback_fn(True)
+ conn = testing.db.connect()
+ ctx = conn.begin()
+ assert_raises_message(
+ Exception,
+ "breakage",
+ testing.run_as_contextmanager, ctx, fn, 5, value=8
+ )
+ self._assert_no_data()
+
+ def test_connection_as_ctx(self):
+ fn = self._trans_fn()
+ ctx = testing.db.connect()
+ testing.run_as_contextmanager(ctx, fn, 5, value=8)
+ # autocommit is on
+ self._assert_fn(5, value=8)
+
+ def test_connect_as_ctx_noautocommit(self):
+ fn = self._trans_fn()
+ ctx = testing.db.connect().execution_options(autocommit=False)
+ testing.run_as_contextmanager(ctx, fn, 5, value=8)
+ # autocommit is off
+ self._assert_no_data()
+
+ def test_transaction_engine_fn_commit(self):
+ fn = self._trans_fn()
+ testing.db.transaction(fn, 5, value=8)
+ self._assert_fn(5, value=8)
+
+ def test_transaction_engine_fn_rollback(self):
+ fn = self._trans_rollback_fn()
+ assert_raises_message(
+ Exception,
+ "breakage",
+ testing.db.transaction, fn, 5, value=8
+ )
+ self._assert_no_data()
+
+ def test_transaction_connection_fn_commit(self):
+ fn = self._trans_fn()
+ conn = testing.db.connect()
+ conn.transaction(fn, 5, value=8)
+ self._assert_fn(5, value=8)
+
+ def test_transaction_connection_fn_rollback(self):
+ fn = self._trans_rollback_fn()
+ conn = testing.db.connect()
+ assert_raises(
+ Exception,
+ conn.transaction, fn, 5, value=8
+ )
+ self._assert_no_data()
+
class CompiledCacheTest(fixtures.TestBase):
@classmethod
def setup_class(cls):