]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- add tests for the savepoint recipe
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 9 Aug 2014 18:03:17 +0000 (14:03 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 9 Aug 2014 18:03:48 +0000 (14:03 -0400)
test/dialect/test_sqlite.py

index 4099b54d765a71b0d9273710573a96fe5ff2ea10..e77a039806b3274ddf5152ea5cb6e96985ca7363 100644 (file)
@@ -11,6 +11,7 @@ from sqlalchemy import Table, select, bindparam, Column,\
     UniqueConstraint
 from sqlalchemy.types import Integer, String, Boolean, DateTime, Date, Time
 from sqlalchemy import types as sqltypes
+from sqlalchemy import event
 from sqlalchemy.util import u, ue
 from sqlalchemy import exc, sql, schema, pool, util
 from sqlalchemy.dialects.sqlite import base as sqlite, \
@@ -949,6 +950,83 @@ class ReflectFKConstraintTest(fixtures.TestBase):
         )
 
 
+class SavepointTest(fixtures.TablesTest):
+    """test that savepoints work when we use the correct event setup"""
+    __only_on__ = 'sqlite'
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table(
+            'users', metadata,
+            Column('user_id', Integer, primary_key=True),
+            Column('user_name', String)
+        )
+
+    @classmethod
+    def setup_bind(cls):
+        engine = engines.testing_engine(options={"use_reaper": False})
+
+        @event.listens_for(engine, "connect")
+        def do_connect(dbapi_connection, connection_record):
+            # disable pysqlite's emitting of the BEGIN statement entirely.
+            # also stops it from emitting COMMIT before any DDL.
+            dbapi_connection.isolation_level = None
+
+        @event.listens_for(engine, "begin")
+        def do_begin(conn):
+            # emit our own BEGIN
+            conn.execute("BEGIN")
+
+        return engine
+
+    def test_nested_subtransaction_rollback(self):
+        users = self.tables.users
+        connection = self.bind.connect()
+        transaction = connection.begin()
+        connection.execute(users.insert(), user_id=1, user_name='user1')
+        trans2 = connection.begin_nested()
+        connection.execute(users.insert(), user_id=2, user_name='user2')
+        trans2.rollback()
+        connection.execute(users.insert(), user_id=3, user_name='user3')
+        transaction.commit()
+        eq_(connection.execute(select([users.c.user_id]).
+            order_by(users.c.user_id)).fetchall(),
+            [(1, ), (3, )])
+        connection.close()
+
+    def test_nested_subtransaction_commit(self):
+        users = self.tables.users
+        connection = self.bind.connect()
+        transaction = connection.begin()
+        connection.execute(users.insert(), user_id=1, user_name='user1')
+        trans2 = connection.begin_nested()
+        connection.execute(users.insert(), user_id=2, user_name='user2')
+        trans2.commit()
+        connection.execute(users.insert(), user_id=3, user_name='user3')
+        transaction.commit()
+        eq_(connection.execute(select([users.c.user_id]).
+            order_by(users.c.user_id)).fetchall(),
+            [(1, ), (2, ), (3, )])
+        connection.close()
+
+    def test_rollback_to_subtransaction(self):
+        users = self.tables.users
+        connection = self.bind.connect()
+        transaction = connection.begin()
+        connection.execute(users.insert(), user_id=1, user_name='user1')
+        trans2 = connection.begin_nested()
+        connection.execute(users.insert(), user_id=2, user_name='user2')
+        trans3 = connection.begin()
+        connection.execute(users.insert(), user_id=3, user_name='user3')
+        trans3.rollback()
+        connection.execute(users.insert(), user_id=4, user_name='user4')
+        transaction.commit()
+        eq_(connection.execute(select([users.c.user_id]).
+            order_by(users.c.user_id)).fetchall(),
+            [(1, ), (4, )])
+        connection.close()
+
+
 class TypeReflectionTest(fixtures.TestBase):
 
     __only_on__ = 'sqlite'