From 7f5e9e733c7b77e55af8836d1ff182c861ab5d16 Mon Sep 17 00:00:00 2001 From: Greg Jarzab Date: Tue, 23 Sep 2025 09:08:46 -0400 Subject: [PATCH] Support for Create Table As Added support for the SQL ``CREATE TABLE ... AS SELECT`` construct via the new :class:`_sql.CreateTableAs` DDL construct and the :meth:`_sql.SelectBase.into` method. The new construct allows creating a table directly from the results of a SELECT statement, with support for options such as ``TEMPORARY`` and ``IF NOT EXISTS`` where supported by the target database. Pull request courtesy Greg Jarzab. Fixes: #4950 Closes: #12860 Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/12860 Pull-request-sha: 7de8a109b892fd91222ce2f59c388ca275021ddb Change-Id: Id9c8e4a3c520ffc61de1e48e331b6220e3d52fc9 --- doc/build/changelog/migration_21.rst | 47 +++ doc/build/changelog/unreleased_21/4950.rst | 16 + doc/build/core/ddl.rst | 2 + doc/build/tutorial/data_select.rst | 104 +++++ lib/sqlalchemy/__init__.py | 1 + lib/sqlalchemy/dialects/mssql/base.py | 34 ++ lib/sqlalchemy/schema.py | 1 + lib/sqlalchemy/sql/compiler.py | 19 + lib/sqlalchemy/sql/ddl.py | 151 +++++++- lib/sqlalchemy/sql/selectable.py | 80 ++++ lib/sqlalchemy/testing/requirements.py | 12 + lib/sqlalchemy/testing/suite/__init__.py | 1 + .../testing/suite/test_create_table_as.py | 329 ++++++++++++++++ lib/sqlalchemy/util/preloaded.py | 2 + pyproject.toml | 2 +- test/requirements.py | 14 + test/sql/test_create_table_as.py | 357 ++++++++++++++++++ .../typing/plain_files/sql/create_table_as.py | 114 ++++++ 18 files changed, 1284 insertions(+), 2 deletions(-) create mode 100644 doc/build/changelog/unreleased_21/4950.rst create mode 100644 lib/sqlalchemy/testing/suite/test_create_table_as.py create mode 100644 test/sql/test_create_table_as.py create mode 100644 test/typing/plain_files/sql/create_table_as.py diff --git a/doc/build/changelog/migration_21.rst b/doc/build/changelog/migration_21.rst index f98cb4c3aa..cbd26f476b 100644 --- a/doc/build/changelog/migration_21.rst +++ b/doc/build/changelog/migration_21.rst @@ -618,6 +618,53 @@ not the database portion:: :ticket:`11234` +.. _change_4950: + +CREATE TABLE AS SELECT Support +------------------------------- + +SQLAlchemy 2.1 adds support for the SQL ``CREATE TABLE ... AS SELECT`` +construct as well as the ``SELECT ... INTO`` variant for selected backends, +which creates a new table directly from the results of a SELECT +statement. This is available via the new :class:`_schema.CreateTableAs` DDL +construct and the :meth:`_sql.SelectBase.into` convenience method. + +The :class:`_schema.CreateTableAs` construct can be used to create a new table +from any SELECT statement:: + + >>> from sqlalchemy import select, CreateTableAs + >>> select_stmt = select(users.c.id, users.c.name).where(users.c.status == "active") + >>> create_table_as = CreateTableAs(select_stmt, "active_users") + +The above construct renders as a ``CREATE TABLE AS`` statement:: + + >>> print(create_table_as) + CREATE TABLE active_users AS SELECT users.id, users.name + FROM users + WHERE users.status = 'active' + +The construct can be executed to emit the above DDL, and the table may then +be accessed using the :attr:`.CreateTableAs.table` attribute which +supplies a :class:`.Table`:: + + >>> print(select(create_table_as.table)) + SELECT users.id, users.name + FROM active_users + +See :ref:`tutorial_create_table_as` for a tutorial. + +.. seealso:: + + :ref:`tutorial_create_table_as` - in the :ref:`unified_tutorial` + + :class:`_schema.CreateTableAs` - DDL construct for CREATE TABLE AS + + :meth:`_sql.SelectBase.into` - convenience method on SELECT and UNION + statements + +:ticket:`4950` + + .. _change_11250: Potential breaking change to odbc_connect= handling for mssql+pyodbc diff --git a/doc/build/changelog/unreleased_21/4950.rst b/doc/build/changelog/unreleased_21/4950.rst new file mode 100644 index 0000000000..3e0c98601c --- /dev/null +++ b/doc/build/changelog/unreleased_21/4950.rst @@ -0,0 +1,16 @@ +.. change:: + :tags: feature, sql + :tickets: 4950 + + Added support for the SQL ``CREATE TABLE ... AS SELECT`` construct via the + new :class:`_schema.CreateTableAs` DDL construct and the + :meth:`_sql.Select.into` method. The new construct allows creating a + table directly from the results of a SELECT statement, with support for + options such as ``TEMPORARY`` and ``IF NOT EXISTS`` where supported by the + target database. Pull request courtesy Greg Jarzab. + + .. seealso:: + + :ref:`change_4950` + + diff --git a/doc/build/core/ddl.rst b/doc/build/core/ddl.rst index b0fcd5fd65..8b21d5e614 100644 --- a/doc/build/core/ddl.rst +++ b/doc/build/core/ddl.rst @@ -329,6 +329,8 @@ DDL Expression Constructs API .. autoclass:: CreateTable :members: +.. autoclass:: CreateTableAs + :members: .. autoclass:: DropTable :members: diff --git a/doc/build/tutorial/data_select.rst b/doc/build/tutorial/data_select.rst index 51d82279aa..706bb78800 100644 --- a/doc/build/tutorial/data_select.rst +++ b/doc/build/tutorial/data_select.rst @@ -1818,6 +1818,110 @@ where it is usable for custom SQL functions:: :ref:`postgresql_column_valued` - in the :ref:`postgresql_toplevel` documentation. +.. _tutorial_create_table_as: + +Using CREATE TABLE AS / SELECT INTO with :func:`.select` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. versionadded:: 2.1 + +The :class:`.CreateTableAs` construct, along with a complementing method +:meth:`.Select.into`, provides support for the "CREATE TABLE AS" / "SELECT INTO" +DDL constructs, which allows the creation of new tables in the database that +represent the contents of an arbitrary SELECT statement. This SQL syntax +is supported by all included SQLAlchemy backends. + +We can produce a :class:`_schema.CreateTableAs` expression from a +:func:`_sql.select` created against any combinations of tables:: + + >>> from sqlalchemy import select, CreateTableAs + >>> select_stmt = select(User.id, User.name).where(User.name.like("sponge%")) + >>> create_table_as = CreateTableAs(select_stmt, "spongebob_users") + +We can also use the equivalent :meth:`.Select.into` method:: + + >>> create_table_as = select_stmt.into("spongebob_users") + +Stringifying this construct on most backends illustrates the ``CREATE TABLE AS`` syntax:: + + >>> print(create_table_as) + CREATE TABLE spongebob_users AS SELECT user_account.id, user_account.name + FROM user_account + WHERE user_account.name LIKE 'sponge%' + +On Microsoft SQL Server, we observe that SELECT INTO is generated instead:: + + >>> from sqlalchemy.dialects import mssql + >>> print(create_table_as.compile(dialect=mssql.dialect())) + SELECT user_account.id, user_account.name INTO spongebob_users + FROM user_account + WHERE user_account.name LIKE 'sponge%' + +We can invoke the :class:`.CreateTableAs` construct directly on a database +connection to create the new table in the database:: + + >>> session.execute(create_table_as) + {execsql}BEGIN (implicit) + CREATE TABLE spongebob_users AS SELECT user_account.id, user_account.name + FROM user_account + WHERE user_account.name LIKE 'sponge%' + [...] () + {stop} + +The database now has a new table ``spongebob_users`` which contains all the columns and rows +that would be returned by the SELECT statement. This is a real table +in the database that will remain until we drop it (unless it's a temporary +table that automatically drops, or if transactional DDL is rolled back). + +To use the new table with SQLAlchemy Core expressions, we can access a +new :class:`.Table` via the :attr:`.CreateTableAs.table` attribute; this +:class:`.Table` is by default associated with a newly created +:class:`.MetaData` object local to the :class:`.CreateTableAs` object: + +.. sourcecode:: pycon+sql + + >>> select_stmt = select(create_table_as.table) + >>> result = session.execute(select_stmt) + {execsql}SELECT spongebob_users.id, spongebob_users.name + FROM spongebob_users + [...] () + {stop}>>> result.all() + {execsql}[(1, 'spongebob')] + +To emit DROP for this table, we use :meth:`.Table.drop`:: + + >>> create_table_as.table.drop(session.connection()) + {execsql}DROP TABLE spongebob_users + [...] () + +Alternatively, we can associate the :class:`.CreateTableAs` with an existing +:class:`.MetaData` using the :paramref:`.CreateTableAs.metadata` parameter, in +which case operations like :meth:`.MetaData.drop_all` will include a DROP for +this table. + +.. note:: The :class:`.CreateTableAs` construct is not currently included in the + sequence initiated by :meth:`.MetaData.create_all`, meaning that this + operation would emit a simple ``CREATE TABLE`` for the table, rather than + using ``CREATE TABLE AS`` or ``SELECT INTO``, which would omit the + ``SELECT`` statement; so when associating + :class:`.CreateTableAs` with an existing :class:`.MetaData`, be sure to + ensure that :meth:`.MetaData.create_all` is not called on that :class:`.MetaData` + unless the :class:`.CreateTableAs` construct were already invoked for that + database, assuring the table already exists. + +:class:`.CreateTableAs` and :meth:`.Select.into` both support optional flags +such as ``TEMPORARY`` and ``IF NOT EXISTS`` where supported by the target +database:: + + >>> # Create a temporary table with IF NOT EXISTS + >>> stmt = select(User.id, User.name).into( + ... "temp_snapshot", temporary=True, if_not_exists=True + ... ) + >>> print(stmt) + CREATE TEMPORARY TABLE IF NOT EXISTS temp_snapshot AS SELECT user_account.id, user_account.name + FROM user_account + + .. _tutorial_casts: Data Casts and Type Coercion diff --git a/lib/sqlalchemy/__init__.py b/lib/sqlalchemy/__init__.py index 7a70450e05..5cdeb2074f 100644 --- a/lib/sqlalchemy/__init__.py +++ b/lib/sqlalchemy/__init__.py @@ -62,6 +62,7 @@ from .schema import Column as Column from .schema import ColumnDefault as ColumnDefault from .schema import Computed as Computed from .schema import Constraint as Constraint +from .schema import CreateTableAs as CreateTableAs from .schema import DDL as DDL from .schema import DDLElement as DDLElement from .schema import DefaultClause as DefaultClause diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py index 001c076833..abc129a91d 100644 --- a/lib/sqlalchemy/dialects/mssql/base.py +++ b/lib/sqlalchemy/dialects/mssql/base.py @@ -2694,6 +2694,40 @@ class MSDDLCompiler(compiler.DDLCompiler): self.preparer.format_table(drop.element.table), ) + def visit_create_table_as(self, element, **kw): + prep = self.preparer + + # SQL Server doesn't support CREATE TABLE AS, use SELECT INTO instead + # Format: SELECT columns INTO new_table FROM source WHERE ... + + qualified = prep.format_table(element.table) + + # Get the inner SELECT SQL + inner_kw = dict(kw) + inner_kw["literal_binds"] = True + select_sql = self.sql_compiler.process(element.selectable, **inner_kw) + + # Inject INTO clause before FROM keyword + # Find FROM position (case-insensitive) + select_upper = select_sql.upper() + from_idx = select_upper.find(" FROM ") + if from_idx == -1: + from_idx = select_upper.find("\nFROM ") + + if from_idx == -1: + raise exc.CompileError( + "Could not find FROM keyword in selectable for CREATE TABLE AS" + ) + + # Insert INTO clause before FROM + result = ( + select_sql[:from_idx] + + f"INTO {qualified} " + + select_sql[from_idx:] + ) + + return result + def visit_primary_key_constraint(self, constraint, **kw): if len(constraint) == 0: return "" diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py index e0cd659bec..0fba390e43 100644 --- a/lib/sqlalchemy/schema.py +++ b/lib/sqlalchemy/schema.py @@ -20,6 +20,7 @@ from .sql.ddl import CreateIndex as CreateIndex from .sql.ddl import CreateSchema as CreateSchema from .sql.ddl import CreateSequence as CreateSequence from .sql.ddl import CreateTable as CreateTable +from .sql.ddl import CreateTableAs as CreateTableAs from .sql.ddl import DDL as DDL from .sql.ddl import DDLElement as DDLElement from .sql.ddl import DropColumnComment as DropColumnComment diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index e95eaa5918..f5bf523513 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -94,6 +94,7 @@ if typing.TYPE_CHECKING: from .base import CompileState from .base import Executable from .cache_key import CacheKey + from .ddl import CreateTableAs from .ddl import ExecutableDDLElement from .dml import Delete from .dml import Insert @@ -6937,6 +6938,24 @@ class DDLCompiler(Compiled): text += "\n)%s\n\n" % self.post_create_table(table) return text + def visit_create_table_as(self, element: CreateTableAs, **kw: Any) -> str: + prep = self.preparer + + inner_kw = dict(kw) + inner_kw["literal_binds"] = True + select_sql = self.sql_compiler.process(element.selectable, **inner_kw) + + parts = [ + "CREATE", + "TEMPORARY" if element.temporary else None, + "TABLE", + "IF NOT EXISTS" if element.if_not_exists else None, + prep.format_table(element.table), + "AS", + select_sql, + ] + return " ".join(p for p in parts if p) + def visit_create_column(self, create, first_pk=False, **kw): column = create.element diff --git a/lib/sqlalchemy/sql/ddl.py b/lib/sqlalchemy/sql/ddl.py index 0c31e6d3ce..58a8c3c8e8 100644 --- a/lib/sqlalchemy/sql/ddl.py +++ b/lib/sqlalchemy/sql/ddl.py @@ -29,11 +29,14 @@ from typing import Tuple from typing import TypeVar from typing import Union +from . import coercions from . import roles from .base import _generative from .base import Executable from .base import SchemaVisitor from .elements import ClauseElement +from .selectable import SelectBase +from .selectable import TableClause from .. import exc from .. import util from ..util import topological @@ -47,10 +50,10 @@ if typing.TYPE_CHECKING: from .schema import Constraint from .schema import ForeignKeyConstraint from .schema import Index + from .schema import MetaData from .schema import SchemaItem from .schema import Sequence as Sequence # noqa: F401 from .schema import Table - from .selectable import TableClause from ..engine.base import Connection from ..engine.interfaces import CacheStats from ..engine.interfaces import CompiledCacheType @@ -546,6 +549,152 @@ class CreateTable(_CreateBase["Table"]): self.include_foreign_key_constraints = include_foreign_key_constraints +class CreateTableAs(ExecutableDDLElement): + """Represent a CREATE TABLE ... AS statement. + + This creates a new table directly from the output of a SELECT. + The set of columns in the new table is derived from the + SELECT list; constraints, indexes, and defaults are not copied. + + E.g.:: + + from sqlalchemy import select + from sqlalchemy.sql.ddl import CreateTableAs + + # Create a new table from a SELECT + stmt = CreateTableAs( + select(users.c.id, users.c.name).where(users.c.status == "active"), + "active_users", + ) + + with engine.begin() as conn: + conn.execute(stmt) + + # With optional flags + stmt = CreateTableAs( + select(users.c.id, users.c.name), + "temp_snapshot", + temporary=True, + if_not_exists=True, + ) + + The generated table object can be accessed via the :attr:`.table` property, + which will be an instance of :class:`.Table`; by default this is associated + with a local :class:`.MetaData` construct:: + + stmt = CreateTableAs(select(users.c.id, users.c.name), "active_users") + active_users_table = stmt.table + + To associate the :class:`.Table` with an existing :class:`.MetaData`, + use the :paramref:`_schema.CreateTableAs.metadata` parameter:: + + stmt = CreateTableAs( + select(users.c.id, users.c.name), + "active_users", + metadata=some_metadata, + ) + active_users_table = stmt.table + + .. versionadded:: 2.1 + + :param selectable: :class:`_sql.Select` + The SELECT statement providing the columns and rows. + + :param table_name: str + Table name as a string. Must be unqualified; use the ``schema`` + argument for qualification. + + :param metadata: :class:`_schema.MetaData`, optional + If provided, the :class:`_schema.Table` object available via the + :attr:`.table` attribute will be associated with this + :class:`.MetaData`. Otherwise, a new, empty :class:`.MetaData` + is created. + + :param schema: str, optional schema or owner name. + + :param temporary: bool, default False. + If True, render ``TEMPORARY`` + + :param if_not_exists: bool, default False. + If True, render ``IF NOT EXISTS`` + + .. seealso:: + + :ref:`tutorial_create_table_as` - in the :ref:`unified_tutorial` + + :meth:`_sql.SelectBase.into` - convenience method to create a + :class:`_schema.CreateTableAs` from a SELECT statement + + + + """ + + __visit_name__ = "create_table_as" + inherit_cache = False + + table: Table + """:class:`.Table` object representing the table that this + :class:`.CreateTableAs` would generate when executed.""" + + def __init__( + self, + selectable: SelectBase, + table_name: str, + *, + metadata: Optional["MetaData"] = None, + schema: Optional[str] = None, + temporary: bool = False, + if_not_exists: bool = False, + ): + # Coerce selectable to a Select statement + selectable = coercions.expect(roles.DMLSelectRole, selectable) + + if isinstance(table_name, str): + if not table_name: + raise exc.ArgumentError("Table name must be non-empty") + + if "." in table_name: + raise exc.ArgumentError( + "Target string must be unqualified (use schema=)." + ) + + self.schema = schema + self.selectable = selectable + self.temporary = bool(temporary) + self.if_not_exists = bool(if_not_exists) + self.metadata = metadata + self.table_name = table_name + self._gen_table() + + @util.preload_module("sqlalchemy.sql.schema") + def _gen_table(self): + MetaData = util.preloaded.sql_schema.MetaData + Column = util.preloaded.sql_schema.Column + Table = util.preloaded.sql_schema.Table + MetaData = util.preloaded.sql_schema.MetaData + + column_name_type_pairs = ( + (name, col_element.type) + for _, name, _, col_element, _ in ( + self.selectable._generate_columns_plus_names( + anon_for_dupe_key=False + ) + ) + ) + + if self.metadata is None: + self.metadata = metadata = MetaData() + else: + metadata = self.metadata + + self.table = Table( + self.table_name, + metadata, + *(Column(name, typ) for name, typ in column_name_type_pairs), + schema=self.schema, + ) + + class _DropView(_DropBase["Table"]): """Semi-public 'DROP VIEW' construct. diff --git a/lib/sqlalchemy/sql/selectable.py b/lib/sqlalchemy/sql/selectable.py index b9b0104615..6e62d30bc4 100644 --- a/lib/sqlalchemy/sql/selectable.py +++ b/lib/sqlalchemy/sql/selectable.py @@ -138,6 +138,7 @@ if TYPE_CHECKING: from .base import ReadOnlyColumnCollection from .cache_key import _CacheKeyTraversalType from .compiler import SQLCompiler + from .ddl import CreateTableAs from .dml import Delete from .dml import Update from .elements import BinaryExpression @@ -148,6 +149,7 @@ if TYPE_CHECKING: from .functions import Function from .schema import ForeignKey from .schema import ForeignKeyConstraint + from .schema import MetaData from .sqltypes import TableValueType from .type_api import TypeEngine from .visitors import _CloneCallableType @@ -3796,6 +3798,84 @@ class SelectBase( self._ensure_disambiguated_names(), name=name ) + @util.preload_module("sqlalchemy.sql.ddl") + def into( + self, + target: str, + *, + metadata: Optional["MetaData"] = None, + schema: Optional[str] = None, + temporary: bool = False, + if_not_exists: bool = False, + ) -> CreateTableAs: + """Create a :class:`_schema.CreateTableAs` construct from this SELECT. + + This method provides a convenient way to create a ``CREATE TABLE ... + AS`` statement from a SELECT, as well as compound SELECTs like UNION. + The new table will be created with columns matching the SELECT list. + + Supported on all included backends, the construct emits + ``CREATE TABLE...AS`` for all backends except SQL Server, which instead + emits a ``SELECT..INTO`` statement. + + e.g.:: + + from sqlalchemy import select + + # Create a new table from a SELECT + stmt = ( + select(users.c.id, users.c.name) + .where(users.c.status == "active") + .into("active_users") + ) + + with engine.begin() as conn: + conn.execute(stmt) + + # With optional flags + stmt = ( + select(users.c.id) + .where(users.c.status == "inactive") + .into("inactive_users", schema="analytics", if_not_exists=True) + ) + + .. versionadded:: 2.1 + + :param target: Name of the table to create as a string. Must be + unqualified; use the ``schema`` parameter for qualification. + + :param metadata: :class:`_schema.MetaData`, optional + If provided, the :class:`_schema.Table` object available via the + :attr:`.table` attribute will be associated with this + :class:`.MetaData`. Otherwise, a new, empty :class:`.MetaData` + is created. + + :param schema: Optional schema name for the new table. + + :param temporary: If True, create a temporary table where supported + + :param if_not_exists: If True, add IF NOT EXISTS clause where supported + + :return: A :class:`_schema.CreateTableAs` construct. + + .. seealso:: + + :ref:`tutorial_create_table_as` - in the :ref:`unified_tutorial` + + :class:`_schema.CreateTableAs` + + """ + sql_ddl = util.preloaded.sql_ddl + + return sql_ddl.CreateTableAs( + self, + target, + metadata=metadata, + schema=schema, + temporary=temporary, + if_not_exists=if_not_exists, + ) + def _ensure_disambiguated_names(self) -> Self: """Ensure that the names generated by this selectbase will be disambiguated in some way, if possible. diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index d22e37a2a5..c7a75c40dc 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -47,6 +47,18 @@ class SuiteRequirements(Requirements): return exclusions.open() + @property + def create_table_as(self): + """target platform supports CREATE TABLE AS SELECT.""" + + return exclusions.closed() + + @property + def create_temp_table_as(self): + """target platform supports CREATE TEMPORARY TABLE AS SELECT.""" + + return exclusions.closed() + @property def table_ddl_if_exists(self): """target platform supports IF NOT EXISTS / IF EXISTS for tables.""" diff --git a/lib/sqlalchemy/testing/suite/__init__.py b/lib/sqlalchemy/testing/suite/__init__.py index 141be112f2..8d79b36d0a 100644 --- a/lib/sqlalchemy/testing/suite/__init__.py +++ b/lib/sqlalchemy/testing/suite/__init__.py @@ -4,6 +4,7 @@ # # This module is part of SQLAlchemy and is released under # the MIT License: https://www.opensource.org/licenses/mit-license.php +from .test_create_table_as import * # noqa from .test_cte import * # noqa from .test_ddl import * # noqa from .test_dialect import * # noqa diff --git a/lib/sqlalchemy/testing/suite/test_create_table_as.py b/lib/sqlalchemy/testing/suite/test_create_table_as.py new file mode 100644 index 0000000000..5e48dd5844 --- /dev/null +++ b/lib/sqlalchemy/testing/suite/test_create_table_as.py @@ -0,0 +1,329 @@ +# testing/suite/test_create_table_as.py +# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + +from .. import fixtures +from ..assertions import eq_ +from ..provision import get_temp_table_name +from ... import bindparam +from ... import Column +from ... import func +from ... import inspect +from ... import Integer +from ... import literal +from ... import MetaData +from ... import select +from ... import String +from ... import testing +from ...schema import DropTable +from ...schema import Table +from ...sql.ddl import CreateTableAs +from ...testing import config + + +class CreateTableAsTest(fixtures.TablesTest): + __backend__ = True + __requires__ = ("create_table_as",) + + @classmethod + def temp_table_name(cls): + return get_temp_table_name( + config, config.db, f"user_tmp_{config.ident}" + ) + + @classmethod + def define_tables(cls, metadata): + Table( + "source_table", + metadata, + Column("id", Integer, primary_key=True, autoincrement=False), + Column("name", String(50)), + Column("value", Integer), + ) + Table("a", metadata, Column("id", Integer)) + Table("b", metadata, Column("id", Integer)) + + @classmethod + def insert_data(cls, connection): + table = cls.tables.source_table + connection.execute( + table.insert(), + [ + {"id": 1, "name": "alice", "value": 100}, + {"id": 2, "name": "bob", "value": 200}, + {"id": 3, "name": "charlie", "value": 300}, + ], + ) + + a = cls.tables.a + b = cls.tables.b + + connection.execute(a.insert(), [{"id": v} for v in [1, 3]]) + connection.execute(b.insert(), [{"id": v} for v in [2, 4]]) + + @testing.fixture(scope="function", autouse=True) + def drop_dest_table(self, connection): + for schema in None, config.test_schema: + for name in ("dest_table", self.temp_table_name()): + if inspect(connection).has_table(name, schema=schema): + connection.execute( + DropTable(Table(name, MetaData(), schema=schema)) + ) + connection.commit() + + @testing.combinations( + ("plain", False, False), + ("use_temp", False, True, testing.requires.create_temp_table_as), + ("use_schema", True, False, testing.requires.schemas), + argnames="use_schemas,use_temp", + id_="iaa", + ) + def test_create_table_as_tableclause( + self, connection, use_temp, use_schemas + ): + source_table = self.tables.source_table + stmt = CreateTableAs( + select(source_table.c.id, source_table.c.name).where( + source_table.c.value > 100 + ), + self.temp_table_name() if use_temp else "dest_table", + temporary=bool(use_temp), + schema=config.test_schema if use_schemas else None, + ) + + # Execute the CTAS + connection.execute(stmt) + + # Verify we can SELECT from the generated table + dest = stmt.table + result = connection.execute( + select(dest.c.id, dest.c.name).order_by(dest.c.id) + ).fetchall() + + eq_(result, [(2, "bob"), (3, "charlie")]) + + # Verify reflection works + insp = inspect(connection) + cols = insp.get_columns( + self.temp_table_name() if use_temp else "dest_table", + schema=config.test_schema if use_schemas else None, + ) + eq_(len(cols), 2) + eq_(cols[0]["name"], "id") + eq_(cols[1]["name"], "name") + + # Verify type affinity + eq_(cols[0]["type"]._type_affinity, Integer) + eq_(cols[1]["type"]._type_affinity, String) + + @testing.variation( + "use_temp", [False, (True, testing.requires.create_temp_table_as)] + ) + def test_create_table_as_with_metadata( + self, connection, metadata, use_temp + ): + source_table = self.tables.source_table + stmt = CreateTableAs( + select( + source_table.c.id, source_table.c.name, source_table.c.value + ), + self.temp_table_name() if use_temp else "dest_table", + metadata=metadata, + temporary=bool(use_temp), + ) + + # Execute the CTAS + connection.execute(stmt) + + # Verify the generated table is a proper Table object + dest = stmt.table + assert isinstance(dest, Table) + assert dest.metadata is metadata + + # SELECT from the generated table + result = connection.execute( + select(dest.c.id, dest.c.name, dest.c.value).where(dest.c.id == 2) + ).fetchall() + + eq_(result, [(2, "bob", 200)]) + + # Drop the table using the Table object + dest.drop(connection) + + # Verify it's gone + if not use_temp: + insp = inspect(connection) + assert "dest_table" not in insp.get_table_names() + elif testing.requires.temp_table_names.enabled: + insp = inspect(connection) + assert self.temp_table_name() not in insp.get_temp_table_names() + + def test_create_table_as_with_labels(self, connection): + source_table = self.tables.source_table + + stmt = CreateTableAs( + select( + source_table.c.id.label("user_id"), + source_table.c.name.label("user_name"), + ), + "dest_table", + ) + + connection.execute(stmt) + + # Verify column names from labels + insp = inspect(connection) + cols = insp.get_columns("dest_table") + eq_(len(cols), 2) + eq_(cols[0]["name"], "user_id") + eq_(cols[1]["name"], "user_name") + + # Verify we can query using the labels + dest = stmt.table + result = connection.execute( + select(dest.c.user_id, dest.c.user_name).where(dest.c.user_id == 1) + ).fetchall() + + eq_(result, [(1, "alice")]) + + def test_create_table_as_into_method(self, connection): + source_table = self.tables.source_table + stmt = select(source_table.c.id, source_table.c.value).into( + "dest_table" + ) + + connection.execute(stmt) + + # Verify the table was created and can be queried + dest = stmt.table + result = connection.execute( + select(dest.c.id, dest.c.value).order_by(dest.c.id) + ).fetchall() + + eq_(result, [(1, 100), (2, 200), (3, 300)]) + + @testing.variation( + "use_temp", [False, (True, testing.requires.create_temp_table_as)] + ) + @testing.variation("use_into", [True, False]) + def test_metadata_use_cases( + self, use_temp, use_into, metadata, connection + ): + table_name = self.temp_table_name() if use_temp else "dest_table" + source_table = self.tables.source_table + select_stmt = select( + source_table.c.id, source_table.c.name, source_table.c.value + ).where(source_table.c.value > 100) + + if use_into: + cas = select_stmt.into( + table_name, temporary=use_temp, metadata=metadata + ) + else: + cas = CreateTableAs( + select_stmt, + table_name, + temporary=use_temp, + metadata=metadata, + ) + + connection.execute(cas) + dest = cas.table + eq_(dest.name, table_name) + result = connection.execute( + select(dest.c.id, dest.c.name).order_by(dest.c.id) + ).fetchall() + + eq_(result, [(2, "bob"), (3, "charlie")]) + + if use_temp: + if testing.requires.temp_table_names.enabled: + insp = inspect(connection) + assert table_name in insp.get_temp_table_names() + + metadata.drop_all(connection) + insp = inspect(connection) + assert table_name not in insp.get_temp_table_names() + else: + insp = inspect(connection) + assert table_name in insp.get_table_names() + + metadata.drop_all(connection) + insp = inspect(connection) + assert table_name not in insp.get_table_names() + + @testing.requires.table_ddl_if_exists + def test_if_not_exists(self, connection): + source_table = self.tables.source_table + cas = CreateTableAs( + select(source_table.c.id).select_from(source_table), + "dest_table", + if_not_exists=True, + ) + + insp = inspect(connection) + assert "dest_table" not in insp.get_table_names() + + connection.execute(cas) + + insp = inspect(connection) + assert "dest_table" in insp.get_table_names() + + # succeeds even though table exists + connection.execute(cas) + + def test_literal_inlining_inside_select(self, connection): + src = self.tables.source_table + sel = select( + (src.c.id + 1).label("id2"), + literal("x").label("tag"), + ).select_from(src) + + stmt = CreateTableAs(sel, "dest_table") + connection.execute(stmt) + + tbl = stmt.table + row = connection.execute( + select(func.count(), func.min(tbl.c.tag), func.max(tbl.c.tag)) + ).first() + eq_(row, (3, "x", "x")) + + def test_create_table_as_with_bind_param_executes(self, connection): + src = self.tables.source_table + + sel = ( + select(src.c.id, src.c.name) + .select_from(src) + .where(src.c.name == bindparam("p", value="alice")) + ) + + stmt = CreateTableAs(sel, "dest_table") + connection.execute(stmt) + + tbl = stmt.table + + row = connection.execute( + select(func.count(), func.min(tbl.c.name), func.max(tbl.c.name)) + ).first() + eq_(row, (1, "alice", "alice")) + + def test_compound_select_smoke(self, connection): + + a, b = self.tables("a", "b") + + sel = select(a.c.id).union_all(select(b.c.id)).order_by(a.c.id) + stmt = CreateTableAs(sel, "dest_table") + connection.execute(stmt) + + vals = ( + connection.execute( + select(stmt.table.c.id).order_by(stmt.table.c.id) + ) + .scalars() + .all() + ) + eq_(vals, [1, 2, 3, 4]) diff --git a/lib/sqlalchemy/util/preloaded.py b/lib/sqlalchemy/util/preloaded.py index 4ea9aa90f3..51616496a4 100644 --- a/lib/sqlalchemy/util/preloaded.py +++ b/lib/sqlalchemy/util/preloaded.py @@ -44,6 +44,7 @@ if TYPE_CHECKING: from sqlalchemy.orm import strategies as _orm_strategies from sqlalchemy.orm import strategy_options as _orm_strategy_options from sqlalchemy.orm import util as _orm_util + from sqlalchemy.sql import ddl as _sql_ddl from sqlalchemy.sql import default_comparator as _sql_default_comparator from sqlalchemy.sql import dml as _sql_dml from sqlalchemy.sql import elements as _sql_elements @@ -79,6 +80,7 @@ if TYPE_CHECKING: orm_strategy_options = _orm_strategy_options orm_state = _orm_state orm_util = _orm_util + sql_ddl = _sql_ddl sql_default_comparator = _sql_default_comparator sql_dml = _sql_dml sql_elements = _sql_elements diff --git a/pyproject.toml b/pyproject.toml index 679fe0b092..caeaf47c6e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -182,7 +182,7 @@ lint = [ "flake8-rst-docstrings", "pydocstyle<4.0.0", "pygments", - "black==25.1.0", + "black==25.9.0", "slotscheck>=0.17.0", "zimports", # required by generate_tuple_map_overloads ] diff --git a/test/requirements.py b/test/requirements.py index adecb31158..c23e290428 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -99,6 +99,20 @@ class DefaultRequirements(SuiteRequirements): return only_on(["postgresql", "mysql", "mariadb", "sqlite"]) + @property + def create_table_as(self): + """target platform supports CREATE TABLE AS SELECT.""" + + return only_on( + ["postgresql", "mysql", "mariadb", "sqlite", "mssql", "oracle"] + ) + + @property + def create_temp_table_as(self): + """target platform supports CREATE TEMPORARY TABLE AS SELECT.""" + + return only_on(["postgresql", "mysql", "mariadb", "sqlite", "mssql"]) + @property def index_ddl_if_exists(self): """target platform supports IF NOT EXISTS / IF EXISTS for indexes.""" diff --git a/test/sql/test_create_table_as.py b/test/sql/test_create_table_as.py new file mode 100644 index 0000000000..bf05837d4d --- /dev/null +++ b/test/sql/test_create_table_as.py @@ -0,0 +1,357 @@ +import re + +from sqlalchemy import bindparam +from sqlalchemy import Column +from sqlalchemy import Integer +from sqlalchemy import literal +from sqlalchemy import MetaData +from sqlalchemy import String +from sqlalchemy import Table +from sqlalchemy import testing +from sqlalchemy.exc import ArgumentError +from sqlalchemy.schema import CreateTable +from sqlalchemy.sql import column +from sqlalchemy.sql import select +from sqlalchemy.sql import table +from sqlalchemy.sql.ddl import CreateTableAs +from sqlalchemy.testing import fixtures +from sqlalchemy.testing import is_ +from sqlalchemy.testing.assertions import AssertsCompiledSQL +from sqlalchemy.testing.assertions import expect_raises_message +from sqlalchemy.testing.assertions import expect_warnings + + +class CreateTableAsDefaultDialectTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = "default" + + @testing.fixture + def src_table(self): + return Table( + "src", + MetaData(), + Column("id", Integer), + Column("name", String(50)), + ) + + @testing.fixture + def src_two_tables(self): + a = table("a", column("id"), column("name")) + b = table("b", column("id"), column("status")) + return a, b + + def test_basic_element(self, src_table): + src = src_table + stmt = CreateTableAs( + select(src.c.id, src.c.name).select_from(src), + "dst", + ) + self.assert_compile( + stmt, + "CREATE TABLE dst AS SELECT src.id, src.name FROM src", + ) + + def test_schema_element_qualified(self, src_table): + src = src_table + stmt = CreateTableAs( + select(src.c.id).select_from(src), + "dst", + schema="analytics", + ) + self.assert_compile( + stmt, + "CREATE TABLE analytics.dst AS SELECT src.id FROM src", + ) + + def test_blank_schema_treated_as_none(self, src_table): + src = src_table + stmt = CreateTableAs( + select(src.c.id).select_from(src), "dst", schema="" + ) + self.assert_compile(stmt, "CREATE TABLE dst AS SELECT src.id FROM src") + + def test_binds_rendered_inline(self, src_table): + src = src_table + stmt = CreateTableAs( + select(literal("x").label("tag")).select_from(src), + "dst", + ) + self.assert_compile( + stmt, + "CREATE TABLE dst AS SELECT 'x' AS tag FROM src", + ) + + def test_temporary_no_schema(self, src_table): + src = src_table + stmt = CreateTableAs( + select(src.c.id, src.c.name).select_from(src), + "dst", + temporary=True, + ) + self.assert_compile( + stmt, + "CREATE TEMPORARY TABLE dst AS " + "SELECT src.id, src.name FROM src", + ) + + def test_temporary_exists_flags(self, src_table): + src = src_table + stmt = CreateTableAs( + select(src.c.id).select_from(src), + "dst", + schema="sch", + temporary=True, + if_not_exists=True, + ) + self.assert_compile( + stmt, + "CREATE TEMPORARY TABLE " + "IF NOT EXISTS sch.dst AS SELECT src.id FROM src", + ) + + def test_if_not_exists(self, src_table): + src = src_table + stmt = CreateTableAs( + select(src.c.id, src.c.name).select_from(src), + "dst", + if_not_exists=True, + ) + self.assert_compile( + stmt, + "CREATE TABLE IF NOT EXISTS dst AS " + "SELECT src.id, src.name FROM src", + ) + + def test_join_with_binds_rendered_inline(self, src_two_tables): + a, b = src_two_tables + + s = ( + select(a.c.id, a.c.name) + .select_from(a.join(b, a.c.id == b.c.id)) + .where(b.c.status == "active") + ).into("dst") + + # Ensure WHERE survives into CTAS and binds are rendered inline + self.assert_compile( + s, + "CREATE TABLE dst AS " + "SELECT a.id, a.name FROM a JOIN b ON a.id = b.id " + "WHERE b.status = 'active'", + ) + + def test_into_equivalent_to_element(self, src_table): + src = src_table + s = select(src.c.id).select_from(src).where(src.c.id == 2) + via_into = s.into("dst") + via_element = CreateTableAs(s, "dst") + + self.assert_compile( + via_into, + "CREATE TABLE dst AS SELECT src.id FROM src WHERE src.id = 2", + ) + self.assert_compile( + via_element, + "CREATE TABLE dst AS SELECT src.id FROM src WHERE src.id = 2", + ) + + def test_into_does_not_mutate_original_select(self, src_table): + src = src_table + s = select(src.c.id).select_from(src).where(src.c.id == 5) + + # compile original SELECT + self.assert_compile( + s, + "SELECT src.id FROM src WHERE src.id = :id_1", + ) + + # build CTAS + _ = s.into("dst") + + # original is still a SELECT + self.assert_compile( + s, + "SELECT src.id FROM src WHERE src.id = :id_1", + ) + + def test_into_with_schema_argument(self, src_table): + src = src_table + s = select(src.c.id).select_from(src).into("t", schema="analytics") + self.assert_compile( + s, + "CREATE TABLE analytics.t AS SELECT src.id FROM src", + ) + + def test_target_string_must_be_unqualified(self, src_table): + src = src_table + with expect_raises_message( + ArgumentError, + re.escape("Target string must be unqualified (use schema=)."), + ): + CreateTableAs(select(src.c.id).select_from(src), "sch.dst") + + def test_empty_name(self): + with expect_raises_message( + ArgumentError, "Table name must be non-empty" + ): + CreateTableAs(select(literal(1)), "") + + @testing.variation("provide_metadata", [True, False]) + def test_generated_metadata_table_property( + self, src_table, provide_metadata + ): + src = src_table + + if provide_metadata: + metadata = MetaData() + else: + metadata = None + + stmt = CreateTableAs( + select(src.c.name.label("thename"), src.c.id).select_from(src), + "dst", + schema="sch", + metadata=metadata, + ) + + if metadata is not None: + is_(stmt.metadata, metadata) + + assert isinstance(stmt.table, Table) + is_(stmt.table.metadata, stmt.metadata) + + self.assert_compile( + CreateTable(stmt.table), + "CREATE TABLE sch.dst (thename VARCHAR(50), id INTEGER)", + ) + + def test_labels_in_select_list_preserved(self, src_table): + src = src_table + stmt = CreateTableAs( + select( + src.c.id.label("user_id"), src.c.name.label("user_name") + ).select_from(src), + "dst", + ) + self.assert_compile( + stmt, + "CREATE TABLE dst AS " + "SELECT src.id AS user_id, src.name AS user_name FROM src", + ) + + def test_distinct_and_group_by_survive(self, src_table): + src = src_table + sel = ( + select(src.c.name).select_from(src).distinct().group_by(src.c.name) + ) + stmt = CreateTableAs(sel, "dst") + self.assert_compile( + stmt, + "CREATE TABLE dst AS " + "SELECT DISTINCT src.name FROM src GROUP BY src.name", + ) + + def test_bindparam_no_value_raises(self, src_table): + src = src_table + sel = select(src.c.name).where(src.c.name == bindparam("x")) + stmt = CreateTableAs(sel, "dst") + + with expect_warnings( + "Bound parameter 'x' rendering literal NULL in a SQL expression;" + ): + self.assert_compile( + stmt, + "CREATE TABLE dst AS SELECT src.name FROM src " + "WHERE src.name = NULL", + ) + + def test_union_all_with_binds_rendered_inline(self, src_two_tables): + a, b = src_two_tables + + # Named binds so params are deterministic + s1 = ( + select(a.c.id) + .select_from(a) + .where(a.c.id == bindparam("p_a", value=1)) + ) + s2 = ( + select(b.c.id) + .select_from(b) + .where(b.c.id == bindparam("p_b", value=2)) + ) + + u_all = s1.union_all(s2) + stmt = CreateTableAs(u_all, "dst") + + self.assert_compile( + stmt, + "CREATE TABLE dst AS " + "SELECT a.id FROM a WHERE a.id = 1 " + "UNION ALL SELECT b.id FROM b WHERE b.id = 2", + ) + + def test_union_labels_follow_first_select(self, src_two_tables): + # Many engines take column names + # of a UNION from the first SELECT’s labels. + a = table("a", column("val")) + b = table("b", column("val")) + + s1 = select(a.c.val.label("first_name")).select_from(a) + s2 = select(b.c.val).select_from(b) # unlabeled second branch + + u = s1.union(s2) + stmt = CreateTableAs(u, "dst") + + # We only assert what’s stable across dialects: + # - first SELECT has the label + # - a UNION occurs + self.assert_compile( + stmt, + "CREATE TABLE dst AS " + "SELECT a.val AS first_name FROM a " + "UNION " + "SELECT b.val FROM b", + ) + + def test_union_all_with_inlined_literals_smoke(self, src_two_tables): + # Proves literal_binds=True behavior applies across branches. + a, b = src_two_tables + u = ( + select(literal(1).label("x")) + .select_from(a) + .union_all(select(literal("b").label("x")).select_from(b)) + ) + stmt = CreateTableAs(u, "dst") + self.assert_compile( + stmt, + "CREATE TABLE dst AS " + "SELECT 1 AS x FROM a UNION ALL SELECT 'b' AS x FROM b", + ) + + def test_select_shape_where_order_limit(self, src_table): + src = src_table + sel = ( + select(src.c.id, src.c.name) + .select_from(src) + .where(src.c.id > literal(10)) + .order_by(src.c.name) + .limit(5) + .offset(0) + ) + stmt = CreateTableAs(sel, "dst") + self.assert_compile( + stmt, + "CREATE TABLE dst AS " + "SELECT src.id, src.name FROM src " + "WHERE src.id > 10 ORDER BY src.name LIMIT 5 OFFSET 0", + ) + + def test_cte_smoke(self, src_two_tables): + # Proves CTAS works with a WITH-CTE wrapper and labeled column. + a, _ = src_two_tables + cte = select(a.c.id.label("aid")).select_from(a).cte("u") + stmt = CreateTableAs(select(cte.c.aid), "dst") + self.assert_compile( + stmt, + "CREATE TABLE dst AS " + "WITH u AS (SELECT a.id AS aid FROM a) " + "SELECT u.aid FROM u", + ) diff --git a/test/typing/plain_files/sql/create_table_as.py b/test/typing/plain_files/sql/create_table_as.py new file mode 100644 index 0000000000..a9530ccdfe --- /dev/null +++ b/test/typing/plain_files/sql/create_table_as.py @@ -0,0 +1,114 @@ +"""Typing tests for CREATE TABLE AS.""" + +from sqlalchemy import Column +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy import select +from sqlalchemy import String +from sqlalchemy import Table +from sqlalchemy.sql.ddl import CreateTableAs + +# Setup +metadata = MetaData() +users = Table( + "users", + metadata, + Column("id", Integer, primary_key=True), + Column("name", String(50)), + Column("email", String(100)), + Column("status", String(20)), +) + +# Test 1: Basic CreateTableAs with string table name +stmt1 = select(users.c.id, users.c.name).where(users.c.id > 10) +ctas1 = CreateTableAs(stmt1, "active_users") + +# Test 2: CreateTableAs with MetaData (creates Table object) +ctas2 = CreateTableAs(stmt1, "active_users_table", metadata=metadata) + +# Test 3: Using .into() method on Select +ctas3 = stmt1.into("users_copy") + +# Test 4: With schema parameter +ctas4 = CreateTableAs(stmt1, "users_backup", schema="backup") + +# Test 5: With temporary flag +ctas5 = CreateTableAs(stmt1, "temp_users", temporary=True) + +# Test 6: With if_not_exists flag +ctas6 = CreateTableAs(stmt1, "users_safe", if_not_exists=True) + +# Test 7: Combining flags +ctas7 = CreateTableAs( + stmt1, "temp_backup", temporary=True, if_not_exists=True, schema="temp" +) + +# Test 8: Access table property +dest_table1 = ctas1.table +dest_table2 = ctas2.table + +# Test 9: Access columns from generated table +id_column = dest_table1.c.id +name_column = dest_table1.c.name + +# Test 10: Use generated table in another select +new_select = select(dest_table1.c.id, dest_table1.c.name).where( + dest_table1.c.id < 100 +) + +# Test 11: With column labels +labeled_stmt = select( + users.c.id.label("user_id"), + users.c.name.label("user_name"), + users.c.email.label("user_email"), +) +ctas_labeled = CreateTableAs(labeled_stmt, "labeled_users") +labeled_table = ctas_labeled.table +user_id_col = labeled_table.c.user_id +user_name_col = labeled_table.c.user_name + +# Test 12: With WHERE clause +filtered_stmt = select(users.c.id, users.c.status).where( + users.c.status == "active" +) +ctas_filtered = CreateTableAs(filtered_stmt, "active_status") + +# Test 13: With JOIN +orders = Table( + "orders", + metadata, + Column("id", Integer, primary_key=True), + Column("user_id", Integer), + Column("amount", Integer), +) +join_stmt = select(users.c.id, users.c.name, orders.c.amount).select_from( + users.join(orders, users.c.id == orders.c.user_id) +) +ctas_join = CreateTableAs(join_stmt, "user_orders") + +# Test 14: With UNION +stmt_a = select(users.c.id, users.c.name).where(users.c.status == "active") +stmt_b = select(users.c.id, users.c.name).where(users.c.status == "pending") +union_stmt = stmt_a.union(stmt_b) +ctas_union = CreateTableAs(union_stmt, "combined_users") + +# Test 15: .into() with metadata +ctas_into_meta = stmt1.into("users_copy_meta", metadata=metadata) + +# Test 16: .into() with all options +ctas_into_full = stmt1.into( + "full_copy", metadata=metadata, schema="backup", temporary=True +) + +# Test 17: Verify generated table can be used in expressions +generated = ctas1.table +count_stmt = select(generated.c.id).where(generated.c.id > 5) + +# Test 18: Chained operations +final_stmt = ( + select(users.c.id, users.c.name) + .where(users.c.status == "active") + .into("final_result") +) +final_table = final_stmt.table +final_id = final_table.c.id -- 2.47.3