]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Allow dropping a schema with a sequence shared by more than one table.
authorFederico Caselli <cfederico87@gmail.com>
Thu, 18 Mar 2021 22:38:09 +0000 (23:38 +0100)
committerFederico Caselli <cfederico87@gmail.com>
Fri, 19 Mar 2021 23:01:56 +0000 (00:01 +0100)
Fixes: #6071
Change-Id: I5c4483abf075622cccb73cb4c4f8c873174b4e32

doc/build/changelog/unreleased_13/6071.rst [new file with mode: 0644]
lib/sqlalchemy/sql/ddl.py
test/requirements.py
test/sql/test_sequences.py

diff --git a/doc/build/changelog/unreleased_13/6071.rst b/doc/build/changelog/unreleased_13/6071.rst
new file mode 100644 (file)
index 0000000..f9a04c9
--- /dev/null
@@ -0,0 +1,5 @@
+.. change::
+    :tags: bug, schema
+    :tickets: 6071
+
+    Allow dropping a schema with a sequence shared by more than one table.
index a166a6bdfd24330a96798e16d17e9891e37a7cc0..8d8e68d2f7f790fdbd4cb4b92cd3b13320d08239 100644 (file)
@@ -1002,7 +1002,7 @@ class SchemaDropper(DDLBase):
         seq_coll = [
             s
             for s in metadata._sequences.values()
-            if s.column is None and self._can_drop_sequence(s)
+            if self._can_drop_sequence(s)
         ]
 
         event_collection = [t for (t, fks) in collection if t is not None]
@@ -1018,14 +1018,17 @@ class SchemaDropper(DDLBase):
         for table, fkcs in collection:
             if table is not None:
                 self.traverse_single(
-                    table, drop_ok=True, _is_metadata_operation=True
+                    table,
+                    drop_ok=True,
+                    _is_metadata_operation=True,
+                    _ignore_sequences=seq_coll,
                 )
             else:
                 for fkc in fkcs:
                     self.traverse_single(fkc)
 
         for seq in seq_coll:
-            self.traverse_single(seq, drop_ok=True)
+            self.traverse_single(seq, drop_ok=seq.column is None)
 
         metadata.dispatch.after_drop(
             metadata,
@@ -1073,7 +1076,13 @@ class SchemaDropper(DDLBase):
 
         self.connection.execute(DropIndex(index))
 
-    def visit_table(self, table, drop_ok=False, _is_metadata_operation=False):
+    def visit_table(
+        self,
+        table,
+        drop_ok=False,
+        _is_metadata_operation=False,
+        _ignore_sequences=[],
+    ):
         if not drop_ok and not self._can_drop_table(table):
             return
 
@@ -1093,7 +1102,10 @@ class SchemaDropper(DDLBase):
         # latest/core/defaults.html#associating-a-sequence-as-the-server-side-
         # default), so have to be dropped after the table is dropped.
         for column in table.columns:
-            if column.default is not None:
+            if (
+                column.default is not None
+                and column.default not in _ignore_sequences
+            ):
                 self.traverse_single(column.default)
 
         table.dispatch.after_drop(
index 33f5e7fd0612436daaf4ef21e11bf1d5557ac3c8..9ca753dfdc6b8c1015f3531985563237721a7b13 100644 (file)
@@ -459,8 +459,9 @@ class DefaultRequirements(SuiteRequirements):
     def sequences_as_server_defaults(self):
         """Target database must support SEQUENCE as a server side default."""
 
-        return only_on(
-            "postgresql", "doesn't support sequences as a server side default."
+        return self.sequences + only_on(
+            ["postgresql", "mariadb", "oracle >= 18"],
+            "doesn't support sequences as a server side default.",
         )
 
     @property
index 5cfc2663f603fdb9852fdecf09492fb1f47e2e31..dd8491d310cec4d6eb0f255db3709f6a7385392e 100644 (file)
@@ -13,6 +13,8 @@ from sqlalchemy.testing import assert_raises_message
 from sqlalchemy.testing import engines
 from sqlalchemy.testing import eq_
 from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import is_false
+from sqlalchemy.testing import is_true
 from sqlalchemy.testing.assertsql import AllOf
 from sqlalchemy.testing.assertsql import CompiledSQL
 from sqlalchemy.testing.assertsql import EachOf
@@ -348,6 +350,43 @@ class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
         result = connection.execute(t.insert())
         eq_(result.inserted_primary_key, (1,))
 
+    @testing.requires.sequences_as_server_defaults
+    @testing.provide_metadata
+    def test_shared_sequence(self, connection):
+        # test case for #6071
+        common_seq = Sequence("common_sequence", metadata=self.metadata)
+        Table(
+            "table_1",
+            self.metadata,
+            Column(
+                "id",
+                Integer,
+                common_seq,
+                server_default=common_seq.next_value(),
+                primary_key=True,
+            ),
+        )
+        Table(
+            "table_2",
+            self.metadata,
+            Column(
+                "id",
+                Integer,
+                common_seq,
+                server_default=common_seq.next_value(),
+                primary_key=True,
+            ),
+        )
+
+        self.metadata.create_all(connection)
+        is_true(self._has_sequence(connection, "common_sequence"))
+        is_true(testing.db.dialect.has_table(connection, "table_1"))
+        is_true(testing.db.dialect.has_table(connection, "table_2"))
+        self.metadata.drop_all(connection)
+        is_false(self._has_sequence(connection, "common_sequence"))
+        is_false(testing.db.dialect.has_table(connection, "table_1"))
+        is_false(testing.db.dialect.has_table(connection, "table_2"))
+
 
 class FutureSequenceTest(fixtures.FutureEngineMixin, SequenceTest):
     __requires__ = ("sequences",)
@@ -522,19 +561,29 @@ class SequenceAsServerDefaultTest(
         with self.sql_execution_asserter(testing.db) as asserter:
             self.tables_test_metadata.drop_all(testing.db, checkfirst=False)
 
+        asserter.assert_(
+            AllOf(
+                CompiledSQL("DROP TABLE t_seq_test_2", {}),
+                CompiledSQL("DROP TABLE t_seq_test", {}),
+            ),
+            AllOf(
+                # dropped as part of metadata level
+                CompiledSQL("DROP SEQUENCE t_seq", {}),
+                CompiledSQL("DROP SEQUENCE t_seq_2", {}),
+            ),
+        )
+
+    def test_drop_ordering_single_table(self):
+        with self.sql_execution_asserter(testing.db) as asserter:
+            for table in self.tables_test_metadata.tables.values():
+                table.drop(testing.db, checkfirst=False)
+
         asserter.assert_(
             AllOf(
                 CompiledSQL("DROP TABLE t_seq_test_2", {}),
                 EachOf(
                     CompiledSQL("DROP TABLE t_seq_test", {}),
-                    CompiledSQL(
-                        "DROP SEQUENCE t_seq",  # dropped as part of t_seq_test
-                        {},
-                    ),
+                    CompiledSQL("DROP SEQUENCE t_seq", {}),
                 ),
-            ),
-            CompiledSQL(
-                "DROP SEQUENCE t_seq_2",  # dropped as part of metadata level
-                {},
-            ),
+            )
         )