]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
simplify AttachedDB test for SQLite
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 13 Mar 2023 16:19:03 +0000 (12:19 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 13 Mar 2023 16:19:03 +0000 (12:19 -0400)
This test is failing on windows due to the new SQlite
provisioning that seemed to be failing to delete schema files
as they are still used by the main connection.

Change-Id: I51093212ebfe1053f26b279c56c8fec0408806da

test/dialect/test_sqlite.py

index 49c57c854c93004cfdc9341bfc8d9564172a60c0..0817bdbf36339a8b5ef2d45b02fb2f6e3d2fae0c 100644 (file)
@@ -35,7 +35,6 @@ from sqlalchemy import types as sqltypes
 from sqlalchemy import UniqueConstraint
 from sqlalchemy.dialects.sqlite import base as sqlite
 from sqlalchemy.dialects.sqlite import insert
-from sqlalchemy.dialects.sqlite import provision
 from sqlalchemy.dialects.sqlite import pysqlite as pysqlite_dialect
 from sqlalchemy.engine.url import make_url
 from sqlalchemy.schema import CreateTable
@@ -46,7 +45,6 @@ from sqlalchemy.testing import assert_raises_message
 from sqlalchemy.testing import AssertsCompiledSQL
 from sqlalchemy.testing import AssertsExecutionResults
 from sqlalchemy.testing import combinations
-from sqlalchemy.testing import config
 from sqlalchemy.testing import engines
 from sqlalchemy.testing import eq_
 from sqlalchemy.testing import eq_ignore_whitespace
@@ -792,17 +790,20 @@ class DialectTest(
         )
 
 
-class AttachedDBTest(fixtures.TestBase):
+class AttachedDBTest(fixtures.TablesTest):
     __only_on__ = "sqlite"
     __backend__ = True
 
-    def _fixture(self):
-        meta = self.metadata
+    run_create_tables = "each"
+
+    @classmethod
+    def define_tables(self, metadata):
+        meta = metadata
 
         Table("created", meta, Column("foo", Integer), Column("bar", String))
         Table("local_only", meta, Column("q", Integer), Column("p", Integer))
 
-        ct = Table(
+        Table(
             "created",
             meta,
             Column("id", Integer),
@@ -818,33 +819,15 @@ class AttachedDBTest(fixtures.TestBase):
             schema="test_schema",
         )
 
-        with self.conn.begin():
-            meta.create_all(self.conn)
-        return ct
-
-    def setup_test(self):
-        self.engine = engines.testing_engine(options={"use_reaper": False})
-
-        provision._sqlite_post_configure_engine(
-            self.engine.url, self.engine, config.ident
-        )
-        self.conn = self.engine.connect()
-        self.metadata = MetaData()
-
-    def teardown_test(self):
-        self.conn.rollback()
-        with self.conn.begin():
-            self.metadata.drop_all(self.conn)
-        self.conn.close()
-        self.engine.dispose()
-
-    def test_no_tables(self):
-        insp = inspect(self.conn)
+    def test_no_tables(self, connection):
+        tt = self.tables("test_schema.created", "test_schema.another_created")
+        for t in tt:
+            t.drop(connection)
+        insp = inspect(connection)
         eq_(insp.get_table_names("test_schema"), [])
 
-    def test_column_names(self):
-        self._fixture()
-        insp = inspect(self.conn)
+    def test_column_names(self, connection):
+        insp = inspect(connection)
         eq_(
             [
                 d["name"]
@@ -868,78 +851,69 @@ class AttachedDBTest(fixtures.TestBase):
 
         eq_([d["name"] for d in insp.get_columns("local_only")], ["q", "p"])
 
-    def test_table_names_present(self):
-        self._fixture()
-        insp = inspect(self.conn)
+    def test_table_names_present(self, connection):
+        insp = inspect(connection)
         eq_(
             set(insp.get_table_names("test_schema")),
             {"created", "another_created"},
         )
 
-    def test_table_names_system(self):
-        self._fixture()
-        insp = inspect(self.conn)
+    def test_table_names_system(self, connection):
+        insp = inspect(connection)
         eq_(
             set(insp.get_table_names("test_schema")),
             {"created", "another_created"},
         )
 
-    def test_schema_names(self):
-        self._fixture()
-        insp = inspect(self.conn)
+    def test_schema_names(self, connection):
+        insp = inspect(connection)
         eq_(insp.get_schema_names(), ["main", "test_schema"])
 
         # implicitly creates a "temp" schema
-        self.conn.exec_driver_sql("select * from sqlite_temp_master")
+        connection.exec_driver_sql("select * from sqlite_temp_master")
 
         # we're not including it
-        insp = inspect(self.conn)
+        insp = inspect(connection)
         eq_(insp.get_schema_names(), ["main", "test_schema"])
 
-    def test_reflect_system_table(self):
+    def test_reflect_system_table(self, connection):
         meta = MetaData()
         alt_master = Table(
             "sqlite_master",
             meta,
-            autoload_with=self.conn,
+            autoload_with=connection,
             schema="test_schema",
         )
         assert len(alt_master.c) > 0
 
-    def test_reflect_user_table(self):
-        self._fixture()
-
+    def test_reflect_user_table(self, connection):
         m2 = MetaData()
-        c2 = Table("created", m2, autoload_with=self.conn)
+        c2 = Table("created", m2, autoload_with=connection)
         eq_(len(c2.c), 2)
 
-    def test_crud(self):
-        ct = self._fixture()
+    def test_crud(self, connection):
 
-        with self.conn.begin():
-            self.conn.execute(ct.insert(), {"id": 1, "name": "foo"})
-            eq_(self.conn.execute(ct.select()).fetchall(), [(1, "foo")])
+        (ct,) = self.tables("test_schema.created")
+        connection.execute(ct.insert(), {"id": 1, "name": "foo"})
+        eq_(connection.execute(ct.select()).fetchall(), [(1, "foo")])
 
-            self.conn.execute(ct.update(), {"id": 2, "name": "bar"})
-            eq_(self.conn.execute(ct.select()).fetchall(), [(2, "bar")])
-            self.conn.execute(ct.delete())
-            eq_(self.conn.execute(ct.select()).fetchall(), [])
+        connection.execute(ct.update(), {"id": 2, "name": "bar"})
+        eq_(connection.execute(ct.select()).fetchall(), [(2, "bar")])
+        connection.execute(ct.delete())
+        eq_(connection.execute(ct.select()).fetchall(), [])
 
-    def test_col_targeting(self):
-        ct = self._fixture()
+    def test_col_targeting(self, connection):
+        (ct,) = self.tables("test_schema.created")
 
-        with self.conn.begin():
-            self.conn.execute(ct.insert(), {"id": 1, "name": "foo"})
-        row = self.conn.execute(ct.select()).first()
+        connection.execute(ct.insert(), {"id": 1, "name": "foo"})
+        row = connection.execute(ct.select()).first()
         eq_(row._mapping["id"], 1)
         eq_(row._mapping["name"], "foo")
 
-    def test_col_targeting_union(self):
-        ct = self._fixture()
-
-        with self.conn.begin():
-            self.conn.execute(ct.insert(), {"id": 1, "name": "foo"})
-        row = self.conn.execute(ct.select().union(ct.select())).first()
+    def test_col_targeting_union(self, connection):
+        (ct,) = self.tables("test_schema.created")
+        connection.execute(ct.insert(), {"id": 1, "name": "foo"})
+        row = connection.execute(ct.select().union(ct.select())).first()
         eq_(row._mapping["id"], 1)
         eq_(row._mapping["name"], "foo")