]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Clean up .execute calls in test_insert.py
authorGord Thompson <gord@gordthompson.com>
Sat, 11 Apr 2020 16:52:15 +0000 (10:52 -0600)
committerGord Thompson <gord@gordthompson.com>
Sat, 11 Apr 2020 17:25:13 +0000 (11:25 -0600)
Change-Id: I87e144d00577b53baab203c675caeaf8ad46cb5c

lib/sqlalchemy/testing/suite/test_insert.py

index 931b0ef651d71ac90bedfe935a3d0a14029cf9a8..f449b2fe61da6f13a6a37591b32e89667bdf8347 100644 (file)
@@ -41,28 +41,28 @@ class LastrowidTest(fixtures.TablesTest):
 
     def _assert_round_trip(self, table, conn):
         row = conn.execute(table.select()).first()
-        eq_(row, (config.db.dialect.default_sequence_base, "some data"))
+        eq_(row, (conn.engine.dialect.default_sequence_base, "some data"))
 
-    def test_autoincrement_on_insert(self):
+    def test_autoincrement_on_insert(self, connection):
 
-        config.db.execute(self.tables.autoinc_pk.insert(), data="some data")
-        self._assert_round_trip(self.tables.autoinc_pk, config.db)
+        connection.execute(self.tables.autoinc_pk.insert(), data="some data")
+        self._assert_round_trip(self.tables.autoinc_pk, connection)
 
-    def test_last_inserted_id(self):
+    def test_last_inserted_id(self, connection):
 
-        r = config.db.execute(
+        r = connection.execute(
             self.tables.autoinc_pk.insert(), data="some data"
         )
-        pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
+        pk = connection.scalar(select([self.tables.autoinc_pk.c.id]))
         eq_(r.inserted_primary_key, [pk])
 
     @requirements.dbapi_lastrowid
-    def test_native_lastrowid_autoinc(self):
-        r = config.db.execute(
+    def test_native_lastrowid_autoinc(self, connection):
+        r = connection.execute(
             self.tables.autoinc_pk.insert(), data="some data"
         )
         lastrowid = r.lastrowid
-        pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
+        pk = connection.scalar(select([self.tables.autoinc_pk.c.id]))
         eq_(lastrowid, pk)
 
 
@@ -117,8 +117,8 @@ class InsertBehaviorTest(fixtures.TablesTest):
         assert not r.returns_rows
 
     @requirements.returning
-    def test_autoclose_on_insert_implicit_returning(self):
-        r = config.db.execute(
+    def test_autoclose_on_insert_implicit_returning(self, connection):
+        r = connection.execute(
             self.tables.autoinc_pk.insert(), data="some data"
         )
         assert r._soft_closed
@@ -127,12 +127,12 @@ class InsertBehaviorTest(fixtures.TablesTest):
         assert not r.returns_rows
 
     @requirements.empty_inserts
-    def test_empty_insert(self):
-        r = config.db.execute(self.tables.autoinc_pk.insert())
+    def test_empty_insert(self, connection):
+        r = connection.execute(self.tables.autoinc_pk.insert())
         assert r._soft_closed
         assert not r.closed
 
-        r = config.db.execute(
+        r = connection.execute(
             self.tables.autoinc_pk.select().where(
                 self.tables.autoinc_pk.c.id != None
             )
@@ -141,10 +141,10 @@ class InsertBehaviorTest(fixtures.TablesTest):
         assert len(r.fetchall())
 
     @requirements.insert_from_select
-    def test_insert_from_select_autoinc(self):
+    def test_insert_from_select_autoinc(self, connection):
         src_table = self.tables.manual_pk
         dest_table = self.tables.autoinc_pk
-        config.db.execute(
+        connection.execute(
             src_table.insert(),
             [
                 dict(id=1, data="data1"),
@@ -153,7 +153,7 @@ class InsertBehaviorTest(fixtures.TablesTest):
             ],
         )
 
-        result = config.db.execute(
+        result = connection.execute(
             dest_table.insert().from_select(
                 ("data",),
                 select([src_table.c.data]).where(
@@ -164,17 +164,17 @@ class InsertBehaviorTest(fixtures.TablesTest):
 
         eq_(result.inserted_primary_key, [None])
 
-        result = config.db.execute(
+        result = connection.execute(
             select([dest_table.c.data]).order_by(dest_table.c.data)
         )
         eq_(result.fetchall(), [("data2",), ("data3",)])
 
     @requirements.insert_from_select
-    def test_insert_from_select_autoinc_no_rows(self):
+    def test_insert_from_select_autoinc_no_rows(self, connection):
         src_table = self.tables.manual_pk
         dest_table = self.tables.autoinc_pk
 
-        result = config.db.execute(
+        result = connection.execute(
             dest_table.insert().from_select(
                 ("data",),
                 select([src_table.c.data]).where(
@@ -184,16 +184,16 @@ class InsertBehaviorTest(fixtures.TablesTest):
         )
         eq_(result.inserted_primary_key, [None])
 
-        result = config.db.execute(
+        result = connection.execute(
             select([dest_table.c.data]).order_by(dest_table.c.data)
         )
 
         eq_(result.fetchall(), [])
 
     @requirements.insert_from_select
-    def test_insert_from_select(self):
+    def test_insert_from_select(self, connection):
         table = self.tables.manual_pk
-        config.db.execute(
+        connection.execute(
             table.insert(),
             [
                 dict(id=1, data="data1"),
@@ -202,7 +202,7 @@ class InsertBehaviorTest(fixtures.TablesTest):
             ],
         )
 
-        config.db.execute(
+        connection.execute(
             table.insert(inline=True).from_select(
                 ("id", "data"),
                 select([table.c.id + 5, table.c.data]).where(
@@ -212,16 +212,16 @@ class InsertBehaviorTest(fixtures.TablesTest):
         )
 
         eq_(
-            config.db.execute(
+            connection.execute(
                 select([table.c.data]).order_by(table.c.data)
             ).fetchall(),
             [("data1",), ("data2",), ("data2",), ("data3",), ("data3",)],
         )
 
     @requirements.insert_from_select
-    def test_insert_from_select_with_defaults(self):
+    def test_insert_from_select_with_defaults(self, connection):
         table = self.tables.includes_defaults
-        config.db.execute(
+        connection.execute(
             table.insert(),
             [
                 dict(id=1, data="data1"),
@@ -230,7 +230,7 @@ class InsertBehaviorTest(fixtures.TablesTest):
             ],
         )
 
-        config.db.execute(
+        connection.execute(
             table.insert(inline=True).from_select(
                 ("id", "data"),
                 select([table.c.id + 5, table.c.data]).where(
@@ -240,7 +240,7 @@ class InsertBehaviorTest(fixtures.TablesTest):
         )
 
         eq_(
-            config.db.execute(
+            connection.execute(
                 select([table]).order_by(table.c.data, table.c.id)
             ).fetchall(),
             [
@@ -262,7 +262,7 @@ class ReturningTest(fixtures.TablesTest):
 
     def _assert_round_trip(self, table, conn):
         row = conn.execute(table.select()).first()
-        eq_(row, (config.db.dialect.default_sequence_base, "some data"))
+        eq_(row, (conn.engine.dialect.default_sequence_base, "some data"))
 
     @classmethod
     def define_tables(cls, metadata):
@@ -276,39 +276,35 @@ class ReturningTest(fixtures.TablesTest):
         )
 
     @requirements.fetch_rows_post_commit
-    def test_explicit_returning_pk_autocommit(self):
-        engine = config.db
+    def test_explicit_returning_pk_autocommit(self, connection):
         table = self.tables.autoinc_pk
-        with engine.begin() as conn:
-            r = conn.execute(
-                table.insert().returning(table.c.id), data="some data"
-            )
+        r = connection.execute(
+            table.insert().returning(table.c.id), data="some data"
+        )
         pk = r.first()[0]
-        fetched_pk = config.db.scalar(select([table.c.id]))
+        fetched_pk = connection.scalar(select([table.c.id]))
         eq_(fetched_pk, pk)
 
-    def test_explicit_returning_pk_no_autocommit(self):
-        engine = config.db
+    def test_explicit_returning_pk_no_autocommit(self, connection):
         table = self.tables.autoinc_pk
-        with engine.begin() as conn:
-            r = conn.execute(
-                table.insert().returning(table.c.id), data="some data"
-            )
-            pk = r.first()[0]
-        fetched_pk = config.db.scalar(select([table.c.id]))
+        r = connection.execute(
+            table.insert().returning(table.c.id), data="some data"
+        )
+        pk = r.first()[0]
+        fetched_pk = connection.scalar(select([table.c.id]))
         eq_(fetched_pk, pk)
 
-    def test_autoincrement_on_insert_implicit_returning(self):
+    def test_autoincrement_on_insert_implicit_returning(self, connection):
 
-        config.db.execute(self.tables.autoinc_pk.insert(), data="some data")
-        self._assert_round_trip(self.tables.autoinc_pk, config.db)
+        connection.execute(self.tables.autoinc_pk.insert(), data="some data")
+        self._assert_round_trip(self.tables.autoinc_pk, connection)
 
-    def test_last_inserted_id_implicit_returning(self):
+    def test_last_inserted_id_implicit_returning(self, connection):
 
-        r = config.db.execute(
+        r = connection.execute(
             self.tables.autoinc_pk.insert(), data="some data"
         )
-        pk = config.db.scalar(select([self.tables.autoinc_pk.c.id]))
+        pk = connection.scalar(select([self.tables.autoinc_pk.c.id]))
         eq_(r.inserted_primary_key, [pk])