@_db_plus_owner
def has_table(self, connection, tablename, dbname, owner, schema):
- tables = ischema.tables
+ if tablename.startswith("#"): # temporary table
+ tables = ischema.mssql_temp_table_columns
+ result = connection.execute(
+ sql.select(tables.c.table_name)
+ .where(
+ tables.c.table_name.like(
+ self._temp_table_name_like_pattern(tablename)
+ )
+ )
+ .limit(1)
+ )
+ return result.scalar() is not None
+ else:
+ tables = ischema.tables
- s = sql.select(tables.c.table_name).where(
- sql.and_(
- tables.c.table_type == "BASE TABLE",
- tables.c.table_name == tablename,
+ s = sql.select(tables.c.table_name).where(
+ sql.and_(
+ tables.c.table_type == "BASE TABLE",
+ tables.c.table_name == tablename,
+ )
)
- )
- if owner:
- s = s.where(tables.c.table_schema == owner)
+ if owner:
+ s = s.where(tables.c.table_schema == owner)
- c = connection.execute(s)
+ c = connection.execute(s)
- return c.first() is not None
+ return c.first() is not None
@_db_plus_owner
def has_sequence(self, connection, sequencename, dbname, owner, schema):
view_def = rp.scalar()
return view_def
+ def _temp_table_name_like_pattern(self, tablename):
+ return tablename + (("___%") if not tablename.startswith("##") else "")
+
def _get_internal_temp_table_name(self, connection, tablename):
# it's likely that schema is always "dbo", but since we can
# get it here, let's get it.
"from tempdb.information_schema.tables "
"where table_name like :p1"
),
- {
- "p1": tablename
- + (("___%") if not tablename.startswith("##") else "")
- },
+ {"p1": self._temp_table_name_like_pattern(tablename)},
).one()
except exc.MultipleResultsFound as me:
util.raise_(
result, [(2, "bar", datetime.datetime(2020, 2, 2, 2, 2, 2))],
)
+ @testing.provide_metadata
+ @testing.combinations(
+ ("local_temp", "#tmp", True),
+ ("global_temp", "##tmp", True),
+ ("nonexistent", "#no_es_bueno", False),
+ id_="iaa",
+ argnames="table_name, exists",
+ )
+ def test_has_table_temporary(self, connection, table_name, exists):
+ if exists:
+ tt = Table(table_name, self.metadata, Column("id", Integer),)
+ tt.create(connection)
+
+ found_it = testing.db.dialect.has_table(connection, table_name)
+ eq_(found_it, exists)
+
@testing.provide_metadata
def test_db_qualified_items(self):
metadata = self.metadata