result.inserted_primary_key
* dialect.get_default_schema_name(connection) is now
public via dialect.default_schema_name.
-
+ * the "connection" argument from engine.transaction() and
+ engine.run_callable() is removed - Connection itself
+ now has those methods. All four methods accept
+ *args and **kwargs which are passed to the given callable,
+ as well as the operating connection.
+
- schema
- the `__contains__()` method of `MetaData` now accepts
strings or `Table` objects as arguments. If given
def default_schema_name(self):
return self.engine.dialect.get_default_schema_name(self)
- def run_callable(self, callable_):
- return callable_(self)
+ def transaction(self, callable_, *args, **kwargs):
+ """Execute the given function within a transaction boundary.
+
+ This is a shortcut for explicitly calling `begin()` and `commit()`
+ and optionally `rollback()` when exceptions are raised. The
+ given `*args` and `**kwargs` will be passed to the function.
+
+ See also transaction() on engine.
+
+ """
+
+ trans = self.begin()
+ try:
+ ret = self.run_callable(callable_, *args, **kwargs)
+ trans.commit()
+ return ret
+ except:
+ trans.rollback()
+ raise
+
+ def run_callable(self, callable_, *args, **kwargs):
+ return callable_(self, *args, **kwargs)
class Transaction(object):
if connection is None:
conn.close()
- def transaction(self, callable_, connection=None, *args, **kwargs):
+ def transaction(self, callable_, *args, **kwargs):
"""Execute the given function within a transaction boundary.
This is a shortcut for explicitly calling `begin()` and `commit()`
and optionally `rollback()` when exceptions are raised. The
- given `*args` and `**kwargs` will be passed to the function, as
- well as the Connection used in the transaction.
+ given `*args` and `**kwargs` will be passed to the function.
+
+ The connection used is that of contextual_connect().
+
+ See also the similar method on Connection itself.
+
"""
-
- if connection is None:
- conn = self.contextual_connect()
- else:
- conn = connection
+
+ conn = self.contextual_connect()
try:
- trans = conn.begin()
- try:
- ret = callable_(conn, *args, **kwargs)
- trans.commit()
- return ret
- except:
- trans.rollback()
- raise
+ return conn.transaction(callable_, *args, **kwargs)
finally:
- if connection is None:
- conn.close()
+ conn.close()
- def run_callable(self, callable_, connection=None, *args, **kwargs):
- if connection is None:
- conn = self.contextual_connect()
- else:
- conn = connection
+ def run_callable(self, callable_, *args, **kwargs):
+ conn = self.contextual_connect()
try:
- return callable_(conn, *args, **kwargs)
+ return conn.run_callable(callable_, *args, **kwargs)
finally:
- if connection is None:
- conn.close()
+ conn.close()
def execute(self, statement, *multiparams, **params):
connection = self.contextual_connect(close_with_result=True)
conn.close()
def has_table(self, table_name, schema=None):
- return self.run_callable(lambda c: self.dialect.has_table(c, table_name, schema=schema))
+ return self.run_callable(self.dialect.has_table, table_name, schema)
def raw_connection(self):
"""Return a DB-API connection."""
from sqlalchemy.test.testing import eq_, assert_raises, assert_raises_message
import sys, time, threading
-from sqlalchemy import create_engine, MetaData, INT, VARCHAR, Sequence, select, Integer, String, func, text
+from sqlalchemy import create_engine, MetaData, INT, VARCHAR, Sequence, \
+ select, Integer, String, func, text, exc
from sqlalchemy.test.schema import Table
from sqlalchemy.test.schema import Column
from sqlalchemy.test import TestBase, testing
result = connection.execute("select * from query_users")
assert len(result.fetchall()) == 0
connection.close()
-
+
+ def test_transaction_container(self):
+
+ def go(conn, table, data):
+ for d in data:
+ conn.execute(table.insert(), d)
+
+ testing.db.transaction(go, users, [dict(user_id=1, user_name='user1')])
+ eq_(testing.db.execute(users.select()).fetchall(), [(1, 'user1')])
+
+ assert_raises(exc.DBAPIError,
+ testing.db.transaction, go, users, [
+ {'user_id':2, 'user_name':'user2'},
+ {'user_id':1, 'user_name':'user3'},
+ ]
+ )
+ eq_(testing.db.execute(users.select()).fetchall(), [(1, 'user1')])
+
def test_nested_rollback(self):
connection = testing.db.connect()