]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Fix has_table() false negative for #temp tables
authorGord Thompson <gord@gordthompson.com>
Sat, 9 Oct 2021 22:44:57 +0000 (16:44 -0600)
committermike bayer <mike_mp@zzzcomputing.com>
Tue, 12 Oct 2021 20:23:11 +0000 (20:23 +0000)
Fixed issue with :meth:`.Inspector.has_table` where it would return False
if a local temp table with the same name from a different session happened
to be returned first when querying tempdb.  This is a continuation of
:ticket:`6910` which accounted for the temp table existing only in the
alternate session and not the current one.

Fixes: #7168
Change-Id: I19dbb71a63184c6d41822b0e882b7b284ac83786

doc/build/changelog/unreleased_14/7168.rst [new file with mode: 0644]
lib/sqlalchemy/dialects/mssql/base.py
test/dialect/mssql/test_reflection.py

diff --git a/doc/build/changelog/unreleased_14/7168.rst b/doc/build/changelog/unreleased_14/7168.rst
new file mode 100644 (file)
index 0000000..3d599d9
--- /dev/null
@@ -0,0 +1,9 @@
+.. change::
+    :tags: bug, mssql
+    :tickets: 7168
+
+    Fixed issue with :meth:`.Inspector.has_table` where it would return False
+    if a local temp table with the same name from a different session happened
+    to be returned first when querying tempdb.  This is a continuation of
+    :ticket:`6910` which accounted for the temp table existing only in the
+    alternate session and not the current one.
index 0dff4213999e35a039daacd48f327f5a28cee2d2..414caedc35884b40db9e87eb2d905349f1e7c62d 100644 (file)
@@ -2835,14 +2835,18 @@ class MSDialect(default.DefaultDialect):
                 )
             )
 
-            table_name = connection.execute(s.limit(1)).scalar()
-            if table_name:
-                # #6910: verify it's not a temp table from another session
-                obj_id = connection.execute(
-                    text("SELECT object_id(:table_name)"),
-                    {"table_name": "tempdb.dbo.[{}]".format(table_name)},
-                ).scalar()
-                return bool(obj_id)
+            # #7168: fetch all (not just first match) in case some other #temp
+            #        table with the same name happens to appear first
+            table_names = connection.execute(s).scalars().fetchall()
+            # #6910: verify it's not a temp table from another session
+            for table_name in table_names:
+                if bool(
+                    connection.scalar(
+                        text("SELECT object_id(:table_name)"),
+                        {"table_name": "tempdb.dbo.[{}]".format(table_name)},
+                    )
+                ):
+                    return True
             else:
                 return False
         else:
index 86eff0fe488e11c9460c1634180f653797c985b6..12bb46d913e5f631544e2334ba7e7deb7369b806 100644 (file)
@@ -309,6 +309,53 @@ class ReflectionTest(fixtures.TestBase, ComparesTables, AssertsCompiledSQL):
         found_it = testing.db.dialect.has_table(connection, table_name)
         eq_(found_it, exists)
 
+    def test_has_table_temp_not_present_but_another_session(self):
+        """test #6910"""
+
+        with testing.db.connect() as c1, testing.db.connect() as c2:
+
+            try:
+                with c1.begin():
+                    c1.exec_driver_sql(
+                        "create table #myveryveryuniquetemptablename (a int)"
+                    )
+                assert not c2.dialect.has_table(
+                    c2, "#myveryveryuniquetemptablename"
+                )
+            finally:
+                with c1.begin():
+                    c1.exec_driver_sql(
+                        "drop table #myveryveryuniquetemptablename"
+                    )
+
+    def test_has_table_temp_temp_present_both_sessions(self):
+        """test #7168, continues from #6910"""
+
+        with testing.db.connect() as c1, testing.db.connect() as c2:
+            try:
+                with c1.begin():
+                    c1.exec_driver_sql(
+                        "create table #myveryveryuniquetemptablename (a int)"
+                    )
+
+                with c2.begin():
+                    c2.exec_driver_sql(
+                        "create table #myveryveryuniquetemptablename (a int)"
+                    )
+
+                assert c2.dialect.has_table(
+                    c2, "#myveryveryuniquetemptablename"
+                )
+            finally:
+                with c1.begin():
+                    c1.exec_driver_sql(
+                        "drop table #myveryveryuniquetemptablename"
+                    )
+                with c2.begin():
+                    c2.exec_driver_sql(
+                        "drop table #myveryveryuniquetemptablename"
+                    )
+
     def test_db_qualified_items(self, metadata, connection):
         Table("foo", metadata, Column("id", Integer, primary_key=True))
         Table(