]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Support for Create Table As
authorGreg Jarzab <greg.jarzab@gmail.com>
Tue, 23 Sep 2025 13:08:46 +0000 (09:08 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 25 Oct 2025 18:24:46 +0000 (14:24 -0400)
Added support for the SQL ``CREATE TABLE ... AS SELECT`` construct via the
new :class:`_sql.CreateTableAs` DDL construct and the
:meth:`_sql.SelectBase.into` method. The new construct allows creating a
table directly from the results of a SELECT statement, with support for
options such as ``TEMPORARY`` and ``IF NOT EXISTS`` where supported by the
target database.  Pull request courtesy Greg Jarzab.

Fixes: #4950
Closes: #12860
Pull-request: https://github.com/sqlalchemy/sqlalchemy/pull/12860
Pull-request-sha: 7de8a109b892fd91222ce2f59c388ca275021ddb

Change-Id: Id9c8e4a3c520ffc61de1e48e331b6220e3d52fc9

18 files changed:
doc/build/changelog/migration_21.rst
doc/build/changelog/unreleased_21/4950.rst [new file with mode: 0644]
doc/build/core/ddl.rst
doc/build/tutorial/data_select.rst
lib/sqlalchemy/__init__.py
lib/sqlalchemy/dialects/mssql/base.py
lib/sqlalchemy/schema.py
lib/sqlalchemy/sql/compiler.py
lib/sqlalchemy/sql/ddl.py
lib/sqlalchemy/sql/selectable.py
lib/sqlalchemy/testing/requirements.py
lib/sqlalchemy/testing/suite/__init__.py
lib/sqlalchemy/testing/suite/test_create_table_as.py [new file with mode: 0644]
lib/sqlalchemy/util/preloaded.py
pyproject.toml
test/requirements.py
test/sql/test_create_table_as.py [new file with mode: 0644]
test/typing/plain_files/sql/create_table_as.py [new file with mode: 0644]

index f98cb4c3aad112139da15c4b8d5bf0c83787955c..cbd26f476b22b4d9c41b39e4e3074dd0d88b1a33 100644 (file)
@@ -618,6 +618,53 @@ not the database portion::
 
 :ticket:`11234`
 
+.. _change_4950:
+
+CREATE TABLE AS SELECT Support
+-------------------------------
+
+SQLAlchemy 2.1 adds support for the SQL ``CREATE TABLE ... AS SELECT``
+construct as well as the ``SELECT ... INTO`` variant for selected backends,
+which creates a new table directly from the results of a SELECT
+statement. This is available via the new :class:`_schema.CreateTableAs` DDL
+construct and the :meth:`_sql.SelectBase.into` convenience method.
+
+The :class:`_schema.CreateTableAs` construct can be used to create a new table
+from any SELECT statement::
+
+    >>> from sqlalchemy import select, CreateTableAs
+    >>> select_stmt = select(users.c.id, users.c.name).where(users.c.status == "active")
+    >>> create_table_as = CreateTableAs(select_stmt, "active_users")
+
+The above construct renders as a ``CREATE TABLE AS`` statement::
+
+    >>> print(create_table_as)
+    CREATE TABLE active_users AS SELECT users.id, users.name
+    FROM users
+    WHERE users.status = 'active'
+
+The construct can be executed to emit the above DDL, and the table may then
+be accessed using the :attr:`.CreateTableAs.table` attribute which
+supplies a :class:`.Table`::
+
+    >>> print(select(create_table_as.table))
+    SELECT users.id, users.name
+    FROM active_users
+
+See :ref:`tutorial_create_table_as` for a tutorial.
+
+.. seealso::
+
+    :ref:`tutorial_create_table_as` - in the :ref:`unified_tutorial`
+
+    :class:`_schema.CreateTableAs` - DDL construct for CREATE TABLE AS
+
+    :meth:`_sql.SelectBase.into` - convenience method on SELECT and UNION
+    statements
+
+:ticket:`4950`
+
+
 .. _change_11250:
 
 Potential breaking change to odbc_connect= handling for mssql+pyodbc
diff --git a/doc/build/changelog/unreleased_21/4950.rst b/doc/build/changelog/unreleased_21/4950.rst
new file mode 100644 (file)
index 0000000..3e0c986
--- /dev/null
@@ -0,0 +1,16 @@
+.. change::
+    :tags: feature, sql
+    :tickets: 4950
+
+    Added support for the SQL ``CREATE TABLE ... AS SELECT`` construct via the
+    new :class:`_schema.CreateTableAs` DDL construct and the
+    :meth:`_sql.Select.into` method. The new construct allows creating a
+    table directly from the results of a SELECT statement, with support for
+    options such as ``TEMPORARY`` and ``IF NOT EXISTS`` where supported by the
+    target database.  Pull request courtesy Greg Jarzab.
+
+    .. seealso::
+
+        :ref:`change_4950`
+
+
index b0fcd5fd65e7f976289f56d2efaf3286568ffcb9..8b21d5e61487e79ee7ee80a669734dca7eb6bd6c 100644 (file)
@@ -329,6 +329,8 @@ DDL Expression Constructs API
 .. autoclass:: CreateTable
     :members:
 
+.. autoclass:: CreateTableAs
+    :members:
 
 .. autoclass:: DropTable
     :members:
index 51d82279aac0f639fc8c831ab824d8f3c3527778..706bb788003a59950834ecb2d1a3ce4bc98590c1 100644 (file)
@@ -1818,6 +1818,110 @@ where it is usable for custom SQL functions::
 
     :ref:`postgresql_column_valued` - in the :ref:`postgresql_toplevel` documentation.
 
+.. _tutorial_create_table_as:
+
+Using CREATE TABLE AS / SELECT INTO with :func:`.select`
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. versionadded:: 2.1
+
+The :class:`.CreateTableAs` construct, along with a complementing method
+:meth:`.Select.into`, provides support for the "CREATE TABLE AS" / "SELECT INTO"
+DDL constructs, which allows the creation of new tables in the database that
+represent the contents of an arbitrary SELECT statement.   This SQL syntax
+is supported by all included SQLAlchemy backends.
+
+We can produce a :class:`_schema.CreateTableAs` expression from a
+:func:`_sql.select` created against any combinations of tables::
+
+    >>> from sqlalchemy import select, CreateTableAs
+    >>> select_stmt = select(User.id, User.name).where(User.name.like("sponge%"))
+    >>> create_table_as = CreateTableAs(select_stmt, "spongebob_users")
+
+We can also use the equivalent :meth:`.Select.into` method::
+
+    >>> create_table_as = select_stmt.into("spongebob_users")
+
+Stringifying this construct on most backends illustrates the ``CREATE TABLE AS`` syntax::
+
+    >>> print(create_table_as)
+    CREATE TABLE spongebob_users AS SELECT user_account.id, user_account.name
+    FROM user_account
+    WHERE user_account.name LIKE 'sponge%'
+
+On Microsoft SQL Server, we observe that SELECT INTO is generated instead::
+
+    >>> from sqlalchemy.dialects import mssql
+    >>> print(create_table_as.compile(dialect=mssql.dialect()))
+    SELECT user_account.id, user_account.name INTO spongebob_users
+    FROM user_account
+    WHERE user_account.name LIKE 'sponge%'
+
+We can invoke the :class:`.CreateTableAs` construct directly on a database
+connection to create the new table in the database::
+
+    >>> session.execute(create_table_as)
+    {execsql}BEGIN (implicit)
+    CREATE TABLE spongebob_users AS SELECT user_account.id, user_account.name
+    FROM user_account
+    WHERE user_account.name LIKE 'sponge%'
+    [...] ()
+    {stop}<sqlalchemy.engine.cursor.CursorResult object at ...>
+
+The database now has a new table ``spongebob_users`` which contains all the columns and rows
+that would be returned by the SELECT statement.   This is a real table
+in the database that will remain until we drop it (unless it's a temporary
+table that automatically drops, or if transactional DDL is rolled back).
+
+To use the new table with SQLAlchemy Core expressions, we can access a
+new :class:`.Table` via the :attr:`.CreateTableAs.table` attribute; this
+:class:`.Table` is by default associated with a newly created
+:class:`.MetaData` object local to the :class:`.CreateTableAs` object:
+
+.. sourcecode:: pycon+sql
+
+    >>> select_stmt = select(create_table_as.table)
+    >>> result = session.execute(select_stmt)
+    {execsql}SELECT spongebob_users.id, spongebob_users.name
+    FROM spongebob_users
+    [...] ()
+    {stop}>>> result.all()
+    {execsql}[(1, 'spongebob')]
+
+To emit DROP for this table, we use :meth:`.Table.drop`::
+
+    >>> create_table_as.table.drop(session.connection())
+    {execsql}DROP TABLE spongebob_users
+    [...] ()
+
+Alternatively, we can associate the :class:`.CreateTableAs` with an existing
+:class:`.MetaData` using the :paramref:`.CreateTableAs.metadata` parameter, in
+which case operations like :meth:`.MetaData.drop_all` will include a DROP for
+this table.
+
+.. note:: The :class:`.CreateTableAs` construct is not currently included in the
+   sequence initiated by :meth:`.MetaData.create_all`, meaning that this
+   operation would emit a simple ``CREATE TABLE`` for the table, rather than
+   using ``CREATE TABLE AS`` or ``SELECT INTO``, which would omit the
+   ``SELECT`` statement; so when associating
+   :class:`.CreateTableAs` with an existing :class:`.MetaData`, be sure to
+   ensure that :meth:`.MetaData.create_all` is not called on that :class:`.MetaData`
+   unless the :class:`.CreateTableAs` construct were already invoked for that
+   database, assuring the table already exists.
+
+:class:`.CreateTableAs` and :meth:`.Select.into` both support optional flags
+such as ``TEMPORARY`` and ``IF NOT EXISTS`` where supported by the target
+database::
+
+    >>> # Create a temporary table with IF NOT EXISTS
+    >>> stmt = select(User.id, User.name).into(
+    ...     "temp_snapshot", temporary=True, if_not_exists=True
+    ... )
+    >>> print(stmt)
+    CREATE TEMPORARY TABLE IF NOT EXISTS temp_snapshot AS SELECT user_account.id, user_account.name
+    FROM user_account
+
+
 .. _tutorial_casts:
 
 Data Casts and Type Coercion
index 7a70450e05010b1813d72745ee68382e32683d3a..5cdeb2074f752297ce9bf15732f50c012fa8b935 100644 (file)
@@ -62,6 +62,7 @@ from .schema import Column as Column
 from .schema import ColumnDefault as ColumnDefault
 from .schema import Computed as Computed
 from .schema import Constraint as Constraint
+from .schema import CreateTableAs as CreateTableAs
 from .schema import DDL as DDL
 from .schema import DDLElement as DDLElement
 from .schema import DefaultClause as DefaultClause
index 001c0768339e302ecc542f9ea0141d120f96dca2..abc129a91d4b4461eaf6a90a424a9c8f78ba4cab 100644 (file)
@@ -2694,6 +2694,40 @@ class MSDDLCompiler(compiler.DDLCompiler):
             self.preparer.format_table(drop.element.table),
         )
 
+    def visit_create_table_as(self, element, **kw):
+        prep = self.preparer
+
+        # SQL Server doesn't support CREATE TABLE AS, use SELECT INTO instead
+        # Format: SELECT columns INTO new_table FROM source WHERE ...
+
+        qualified = prep.format_table(element.table)
+
+        # Get the inner SELECT SQL
+        inner_kw = dict(kw)
+        inner_kw["literal_binds"] = True
+        select_sql = self.sql_compiler.process(element.selectable, **inner_kw)
+
+        # Inject INTO clause before FROM keyword
+        # Find FROM position (case-insensitive)
+        select_upper = select_sql.upper()
+        from_idx = select_upper.find(" FROM ")
+        if from_idx == -1:
+            from_idx = select_upper.find("\nFROM ")
+
+        if from_idx == -1:
+            raise exc.CompileError(
+                "Could not find FROM keyword in selectable for CREATE TABLE AS"
+            )
+
+        # Insert INTO clause before FROM
+        result = (
+            select_sql[:from_idx]
+            + f"INTO {qualified} "
+            + select_sql[from_idx:]
+        )
+
+        return result
+
     def visit_primary_key_constraint(self, constraint, **kw):
         if len(constraint) == 0:
             return ""
index e0cd659bec13374fc7748c8e88cbe180b5ca978e..0fba390e43c0e9a5be8da4f70e92ff22e81e4a43 100644 (file)
@@ -20,6 +20,7 @@ from .sql.ddl import CreateIndex as CreateIndex
 from .sql.ddl import CreateSchema as CreateSchema
 from .sql.ddl import CreateSequence as CreateSequence
 from .sql.ddl import CreateTable as CreateTable
+from .sql.ddl import CreateTableAs as CreateTableAs
 from .sql.ddl import DDL as DDL
 from .sql.ddl import DDLElement as DDLElement
 from .sql.ddl import DropColumnComment as DropColumnComment
index e95eaa59183c406f6715029c454488fbe5793e40..f5bf5235132337cdc3c8bb7e90d8ba15af21a959 100644 (file)
@@ -94,6 +94,7 @@ if typing.TYPE_CHECKING:
     from .base import CompileState
     from .base import Executable
     from .cache_key import CacheKey
+    from .ddl import CreateTableAs
     from .ddl import ExecutableDDLElement
     from .dml import Delete
     from .dml import Insert
@@ -6937,6 +6938,24 @@ class DDLCompiler(Compiled):
         text += "\n)%s\n\n" % self.post_create_table(table)
         return text
 
+    def visit_create_table_as(self, element: CreateTableAs, **kw: Any) -> str:
+        prep = self.preparer
+
+        inner_kw = dict(kw)
+        inner_kw["literal_binds"] = True
+        select_sql = self.sql_compiler.process(element.selectable, **inner_kw)
+
+        parts = [
+            "CREATE",
+            "TEMPORARY" if element.temporary else None,
+            "TABLE",
+            "IF NOT EXISTS" if element.if_not_exists else None,
+            prep.format_table(element.table),
+            "AS",
+            select_sql,
+        ]
+        return " ".join(p for p in parts if p)
+
     def visit_create_column(self, create, first_pk=False, **kw):
         column = create.element
 
index 0c31e6d3cec1ca70c0382ce8463d26c4b152bb2a..58a8c3c8e8c4b6499ce964bfef43f5a6ba31b3f6 100644 (file)
@@ -29,11 +29,14 @@ from typing import Tuple
 from typing import TypeVar
 from typing import Union
 
+from . import coercions
 from . import roles
 from .base import _generative
 from .base import Executable
 from .base import SchemaVisitor
 from .elements import ClauseElement
+from .selectable import SelectBase
+from .selectable import TableClause
 from .. import exc
 from .. import util
 from ..util import topological
@@ -47,10 +50,10 @@ if typing.TYPE_CHECKING:
     from .schema import Constraint
     from .schema import ForeignKeyConstraint
     from .schema import Index
+    from .schema import MetaData
     from .schema import SchemaItem
     from .schema import Sequence as Sequence  # noqa: F401
     from .schema import Table
-    from .selectable import TableClause
     from ..engine.base import Connection
     from ..engine.interfaces import CacheStats
     from ..engine.interfaces import CompiledCacheType
@@ -546,6 +549,152 @@ class CreateTable(_CreateBase["Table"]):
         self.include_foreign_key_constraints = include_foreign_key_constraints
 
 
+class CreateTableAs(ExecutableDDLElement):
+    """Represent a CREATE TABLE ... AS statement.
+
+    This creates a new table directly from the output of a SELECT.
+    The set of columns in the new table is derived from the
+    SELECT list; constraints, indexes, and defaults are not copied.
+
+    E.g.::
+
+        from sqlalchemy import select
+        from sqlalchemy.sql.ddl import CreateTableAs
+
+        # Create a new table from a SELECT
+        stmt = CreateTableAs(
+            select(users.c.id, users.c.name).where(users.c.status == "active"),
+            "active_users",
+        )
+
+        with engine.begin() as conn:
+            conn.execute(stmt)
+
+        # With optional flags
+        stmt = CreateTableAs(
+            select(users.c.id, users.c.name),
+            "temp_snapshot",
+            temporary=True,
+            if_not_exists=True,
+        )
+
+    The generated table object can be accessed via the :attr:`.table` property,
+    which will be an instance of :class:`.Table`; by default this is associated
+    with a local :class:`.MetaData` construct::
+
+        stmt = CreateTableAs(select(users.c.id, users.c.name), "active_users")
+        active_users_table = stmt.table
+
+    To associate the :class:`.Table` with an existing :class:`.MetaData`,
+    use the :paramref:`_schema.CreateTableAs.metadata` parameter::
+
+        stmt = CreateTableAs(
+            select(users.c.id, users.c.name),
+            "active_users",
+            metadata=some_metadata,
+        )
+        active_users_table = stmt.table
+
+    .. versionadded:: 2.1
+
+    :param selectable: :class:`_sql.Select`
+        The SELECT statement providing the columns and rows.
+
+    :param table_name: str
+        Table name as a string. Must be unqualified; use the ``schema``
+        argument for qualification.
+
+    :param metadata: :class:`_schema.MetaData`, optional
+        If provided, the :class:`_schema.Table` object available via the
+        :attr:`.table` attribute will be associated with this
+        :class:`.MetaData`.  Otherwise, a new, empty :class:`.MetaData`
+        is created.
+
+    :param schema: str, optional schema or owner name.
+
+    :param temporary: bool, default False.
+        If True, render ``TEMPORARY``
+
+    :param if_not_exists: bool, default False.
+        If True, render ``IF NOT EXISTS``
+
+    .. seealso::
+
+        :ref:`tutorial_create_table_as` - in the :ref:`unified_tutorial`
+
+        :meth:`_sql.SelectBase.into` - convenience method to create a
+        :class:`_schema.CreateTableAs` from a SELECT statement
+
+
+
+    """
+
+    __visit_name__ = "create_table_as"
+    inherit_cache = False
+
+    table: Table
+    """:class:`.Table` object representing the table that this
+    :class:`.CreateTableAs` would generate when executed."""
+
+    def __init__(
+        self,
+        selectable: SelectBase,
+        table_name: str,
+        *,
+        metadata: Optional["MetaData"] = None,
+        schema: Optional[str] = None,
+        temporary: bool = False,
+        if_not_exists: bool = False,
+    ):
+        # Coerce selectable to a Select statement
+        selectable = coercions.expect(roles.DMLSelectRole, selectable)
+
+        if isinstance(table_name, str):
+            if not table_name:
+                raise exc.ArgumentError("Table name must be non-empty")
+
+            if "." in table_name:
+                raise exc.ArgumentError(
+                    "Target string must be unqualified (use schema=)."
+                )
+
+        self.schema = schema
+        self.selectable = selectable
+        self.temporary = bool(temporary)
+        self.if_not_exists = bool(if_not_exists)
+        self.metadata = metadata
+        self.table_name = table_name
+        self._gen_table()
+
+    @util.preload_module("sqlalchemy.sql.schema")
+    def _gen_table(self):
+        MetaData = util.preloaded.sql_schema.MetaData
+        Column = util.preloaded.sql_schema.Column
+        Table = util.preloaded.sql_schema.Table
+        MetaData = util.preloaded.sql_schema.MetaData
+
+        column_name_type_pairs = (
+            (name, col_element.type)
+            for _, name, _, col_element, _ in (
+                self.selectable._generate_columns_plus_names(
+                    anon_for_dupe_key=False
+                )
+            )
+        )
+
+        if self.metadata is None:
+            self.metadata = metadata = MetaData()
+        else:
+            metadata = self.metadata
+
+        self.table = Table(
+            self.table_name,
+            metadata,
+            *(Column(name, typ) for name, typ in column_name_type_pairs),
+            schema=self.schema,
+        )
+
+
 class _DropView(_DropBase["Table"]):
     """Semi-public 'DROP VIEW' construct.
 
index b9b0104615b67ad2afc24130c6dddb21512c52c9..6e62d30bc49b5babcd8b195abd78bc912c3ae071 100644 (file)
@@ -138,6 +138,7 @@ if TYPE_CHECKING:
     from .base import ReadOnlyColumnCollection
     from .cache_key import _CacheKeyTraversalType
     from .compiler import SQLCompiler
+    from .ddl import CreateTableAs
     from .dml import Delete
     from .dml import Update
     from .elements import BinaryExpression
@@ -148,6 +149,7 @@ if TYPE_CHECKING:
     from .functions import Function
     from .schema import ForeignKey
     from .schema import ForeignKeyConstraint
+    from .schema import MetaData
     from .sqltypes import TableValueType
     from .type_api import TypeEngine
     from .visitors import _CloneCallableType
@@ -3796,6 +3798,84 @@ class SelectBase(
             self._ensure_disambiguated_names(), name=name
         )
 
+    @util.preload_module("sqlalchemy.sql.ddl")
+    def into(
+        self,
+        target: str,
+        *,
+        metadata: Optional["MetaData"] = None,
+        schema: Optional[str] = None,
+        temporary: bool = False,
+        if_not_exists: bool = False,
+    ) -> CreateTableAs:
+        """Create a :class:`_schema.CreateTableAs` construct from this SELECT.
+
+        This method provides a convenient way to create a ``CREATE TABLE ...
+        AS`` statement from a SELECT, as well as compound SELECTs like UNION.
+        The new table will be created with columns matching the SELECT list.
+
+        Supported on all included backends, the construct emits
+        ``CREATE TABLE...AS`` for all backends except SQL Server, which instead
+        emits a ``SELECT..INTO`` statement.
+
+        e.g.::
+
+            from sqlalchemy import select
+
+            # Create a new table from a SELECT
+            stmt = (
+                select(users.c.id, users.c.name)
+                .where(users.c.status == "active")
+                .into("active_users")
+            )
+
+            with engine.begin() as conn:
+                conn.execute(stmt)
+
+            # With optional flags
+            stmt = (
+                select(users.c.id)
+                .where(users.c.status == "inactive")
+                .into("inactive_users", schema="analytics", if_not_exists=True)
+            )
+
+        .. versionadded:: 2.1
+
+        :param target: Name of the table to create as a string. Must be
+            unqualified; use the ``schema`` parameter for qualification.
+
+        :param metadata: :class:`_schema.MetaData`, optional
+            If provided, the :class:`_schema.Table` object available via the
+            :attr:`.table` attribute will be associated with this
+            :class:`.MetaData`.  Otherwise, a new, empty :class:`.MetaData`
+            is created.
+
+        :param schema: Optional schema name for the new table.
+
+        :param temporary: If True, create a temporary table where supported
+
+        :param if_not_exists: If True, add IF NOT EXISTS clause where supported
+
+        :return: A :class:`_schema.CreateTableAs` construct.
+
+        .. seealso::
+
+            :ref:`tutorial_create_table_as` - in the :ref:`unified_tutorial`
+
+            :class:`_schema.CreateTableAs`
+
+        """
+        sql_ddl = util.preloaded.sql_ddl
+
+        return sql_ddl.CreateTableAs(
+            self,
+            target,
+            metadata=metadata,
+            schema=schema,
+            temporary=temporary,
+            if_not_exists=if_not_exists,
+        )
+
     def _ensure_disambiguated_names(self) -> Self:
         """Ensure that the names generated by this selectbase will be
         disambiguated in some way, if possible.
index d22e37a2a5875215e7471bdd1f532e0bed5b6eb9..c7a75c40dc576c2a50258bae7caad3955b88d077 100644 (file)
@@ -47,6 +47,18 @@ class SuiteRequirements(Requirements):
 
         return exclusions.open()
 
+    @property
+    def create_table_as(self):
+        """target platform supports CREATE TABLE AS SELECT."""
+
+        return exclusions.closed()
+
+    @property
+    def create_temp_table_as(self):
+        """target platform supports CREATE TEMPORARY TABLE AS SELECT."""
+
+        return exclusions.closed()
+
     @property
     def table_ddl_if_exists(self):
         """target platform supports IF NOT EXISTS / IF EXISTS for tables."""
index 141be112f2b860cf238e9260a752329a6f4b2c96..8d79b36d0a24e9f96d584ba2bbe7ba1f150d7f23 100644 (file)
@@ -4,6 +4,7 @@
 #
 # This module is part of SQLAlchemy and is released under
 # the MIT License: https://www.opensource.org/licenses/mit-license.php
+from .test_create_table_as import *  # noqa
 from .test_cte import *  # noqa
 from .test_ddl import *  # noqa
 from .test_dialect import *  # noqa
diff --git a/lib/sqlalchemy/testing/suite/test_create_table_as.py b/lib/sqlalchemy/testing/suite/test_create_table_as.py
new file mode 100644 (file)
index 0000000..5e48dd5
--- /dev/null
@@ -0,0 +1,329 @@
+# testing/suite/test_create_table_as.py
+# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+from .. import fixtures
+from ..assertions import eq_
+from ..provision import get_temp_table_name
+from ... import bindparam
+from ... import Column
+from ... import func
+from ... import inspect
+from ... import Integer
+from ... import literal
+from ... import MetaData
+from ... import select
+from ... import String
+from ... import testing
+from ...schema import DropTable
+from ...schema import Table
+from ...sql.ddl import CreateTableAs
+from ...testing import config
+
+
+class CreateTableAsTest(fixtures.TablesTest):
+    __backend__ = True
+    __requires__ = ("create_table_as",)
+
+    @classmethod
+    def temp_table_name(cls):
+        return get_temp_table_name(
+            config, config.db, f"user_tmp_{config.ident}"
+        )
+
+    @classmethod
+    def define_tables(cls, metadata):
+        Table(
+            "source_table",
+            metadata,
+            Column("id", Integer, primary_key=True, autoincrement=False),
+            Column("name", String(50)),
+            Column("value", Integer),
+        )
+        Table("a", metadata, Column("id", Integer))
+        Table("b", metadata, Column("id", Integer))
+
+    @classmethod
+    def insert_data(cls, connection):
+        table = cls.tables.source_table
+        connection.execute(
+            table.insert(),
+            [
+                {"id": 1, "name": "alice", "value": 100},
+                {"id": 2, "name": "bob", "value": 200},
+                {"id": 3, "name": "charlie", "value": 300},
+            ],
+        )
+
+        a = cls.tables.a
+        b = cls.tables.b
+
+        connection.execute(a.insert(), [{"id": v} for v in [1, 3]])
+        connection.execute(b.insert(), [{"id": v} for v in [2, 4]])
+
+    @testing.fixture(scope="function", autouse=True)
+    def drop_dest_table(self, connection):
+        for schema in None, config.test_schema:
+            for name in ("dest_table", self.temp_table_name()):
+                if inspect(connection).has_table(name, schema=schema):
+                    connection.execute(
+                        DropTable(Table(name, MetaData(), schema=schema))
+                    )
+        connection.commit()
+
+    @testing.combinations(
+        ("plain", False, False),
+        ("use_temp", False, True, testing.requires.create_temp_table_as),
+        ("use_schema", True, False, testing.requires.schemas),
+        argnames="use_schemas,use_temp",
+        id_="iaa",
+    )
+    def test_create_table_as_tableclause(
+        self, connection, use_temp, use_schemas
+    ):
+        source_table = self.tables.source_table
+        stmt = CreateTableAs(
+            select(source_table.c.id, source_table.c.name).where(
+                source_table.c.value > 100
+            ),
+            self.temp_table_name() if use_temp else "dest_table",
+            temporary=bool(use_temp),
+            schema=config.test_schema if use_schemas else None,
+        )
+
+        # Execute the CTAS
+        connection.execute(stmt)
+
+        # Verify we can SELECT from the generated table
+        dest = stmt.table
+        result = connection.execute(
+            select(dest.c.id, dest.c.name).order_by(dest.c.id)
+        ).fetchall()
+
+        eq_(result, [(2, "bob"), (3, "charlie")])
+
+        # Verify reflection works
+        insp = inspect(connection)
+        cols = insp.get_columns(
+            self.temp_table_name() if use_temp else "dest_table",
+            schema=config.test_schema if use_schemas else None,
+        )
+        eq_(len(cols), 2)
+        eq_(cols[0]["name"], "id")
+        eq_(cols[1]["name"], "name")
+
+        # Verify type affinity
+        eq_(cols[0]["type"]._type_affinity, Integer)
+        eq_(cols[1]["type"]._type_affinity, String)
+
+    @testing.variation(
+        "use_temp", [False, (True, testing.requires.create_temp_table_as)]
+    )
+    def test_create_table_as_with_metadata(
+        self, connection, metadata, use_temp
+    ):
+        source_table = self.tables.source_table
+        stmt = CreateTableAs(
+            select(
+                source_table.c.id, source_table.c.name, source_table.c.value
+            ),
+            self.temp_table_name() if use_temp else "dest_table",
+            metadata=metadata,
+            temporary=bool(use_temp),
+        )
+
+        # Execute the CTAS
+        connection.execute(stmt)
+
+        # Verify the generated table is a proper Table object
+        dest = stmt.table
+        assert isinstance(dest, Table)
+        assert dest.metadata is metadata
+
+        # SELECT from the generated table
+        result = connection.execute(
+            select(dest.c.id, dest.c.name, dest.c.value).where(dest.c.id == 2)
+        ).fetchall()
+
+        eq_(result, [(2, "bob", 200)])
+
+        # Drop the table using the Table object
+        dest.drop(connection)
+
+        # Verify it's gone
+        if not use_temp:
+            insp = inspect(connection)
+            assert "dest_table" not in insp.get_table_names()
+        elif testing.requires.temp_table_names.enabled:
+            insp = inspect(connection)
+            assert self.temp_table_name() not in insp.get_temp_table_names()
+
+    def test_create_table_as_with_labels(self, connection):
+        source_table = self.tables.source_table
+
+        stmt = CreateTableAs(
+            select(
+                source_table.c.id.label("user_id"),
+                source_table.c.name.label("user_name"),
+            ),
+            "dest_table",
+        )
+
+        connection.execute(stmt)
+
+        # Verify column names from labels
+        insp = inspect(connection)
+        cols = insp.get_columns("dest_table")
+        eq_(len(cols), 2)
+        eq_(cols[0]["name"], "user_id")
+        eq_(cols[1]["name"], "user_name")
+
+        # Verify we can query using the labels
+        dest = stmt.table
+        result = connection.execute(
+            select(dest.c.user_id, dest.c.user_name).where(dest.c.user_id == 1)
+        ).fetchall()
+
+        eq_(result, [(1, "alice")])
+
+    def test_create_table_as_into_method(self, connection):
+        source_table = self.tables.source_table
+        stmt = select(source_table.c.id, source_table.c.value).into(
+            "dest_table"
+        )
+
+        connection.execute(stmt)
+
+        # Verify the table was created and can be queried
+        dest = stmt.table
+        result = connection.execute(
+            select(dest.c.id, dest.c.value).order_by(dest.c.id)
+        ).fetchall()
+
+        eq_(result, [(1, 100), (2, 200), (3, 300)])
+
+    @testing.variation(
+        "use_temp", [False, (True, testing.requires.create_temp_table_as)]
+    )
+    @testing.variation("use_into", [True, False])
+    def test_metadata_use_cases(
+        self, use_temp, use_into, metadata, connection
+    ):
+        table_name = self.temp_table_name() if use_temp else "dest_table"
+        source_table = self.tables.source_table
+        select_stmt = select(
+            source_table.c.id, source_table.c.name, source_table.c.value
+        ).where(source_table.c.value > 100)
+
+        if use_into:
+            cas = select_stmt.into(
+                table_name, temporary=use_temp, metadata=metadata
+            )
+        else:
+            cas = CreateTableAs(
+                select_stmt,
+                table_name,
+                temporary=use_temp,
+                metadata=metadata,
+            )
+
+        connection.execute(cas)
+        dest = cas.table
+        eq_(dest.name, table_name)
+        result = connection.execute(
+            select(dest.c.id, dest.c.name).order_by(dest.c.id)
+        ).fetchall()
+
+        eq_(result, [(2, "bob"), (3, "charlie")])
+
+        if use_temp:
+            if testing.requires.temp_table_names.enabled:
+                insp = inspect(connection)
+                assert table_name in insp.get_temp_table_names()
+
+                metadata.drop_all(connection)
+                insp = inspect(connection)
+                assert table_name not in insp.get_temp_table_names()
+        else:
+            insp = inspect(connection)
+            assert table_name in insp.get_table_names()
+
+            metadata.drop_all(connection)
+            insp = inspect(connection)
+            assert table_name not in insp.get_table_names()
+
+    @testing.requires.table_ddl_if_exists
+    def test_if_not_exists(self, connection):
+        source_table = self.tables.source_table
+        cas = CreateTableAs(
+            select(source_table.c.id).select_from(source_table),
+            "dest_table",
+            if_not_exists=True,
+        )
+
+        insp = inspect(connection)
+        assert "dest_table" not in insp.get_table_names()
+
+        connection.execute(cas)
+
+        insp = inspect(connection)
+        assert "dest_table" in insp.get_table_names()
+
+        # succeeds even though table exists
+        connection.execute(cas)
+
+    def test_literal_inlining_inside_select(self, connection):
+        src = self.tables.source_table
+        sel = select(
+            (src.c.id + 1).label("id2"),
+            literal("x").label("tag"),
+        ).select_from(src)
+
+        stmt = CreateTableAs(sel, "dest_table")
+        connection.execute(stmt)
+
+        tbl = stmt.table
+        row = connection.execute(
+            select(func.count(), func.min(tbl.c.tag), func.max(tbl.c.tag))
+        ).first()
+        eq_(row, (3, "x", "x"))
+
+    def test_create_table_as_with_bind_param_executes(self, connection):
+        src = self.tables.source_table
+
+        sel = (
+            select(src.c.id, src.c.name)
+            .select_from(src)
+            .where(src.c.name == bindparam("p", value="alice"))
+        )
+
+        stmt = CreateTableAs(sel, "dest_table")
+        connection.execute(stmt)
+
+        tbl = stmt.table
+
+        row = connection.execute(
+            select(func.count(), func.min(tbl.c.name), func.max(tbl.c.name))
+        ).first()
+        eq_(row, (1, "alice", "alice"))
+
+    def test_compound_select_smoke(self, connection):
+
+        a, b = self.tables("a", "b")
+
+        sel = select(a.c.id).union_all(select(b.c.id)).order_by(a.c.id)
+        stmt = CreateTableAs(sel, "dest_table")
+        connection.execute(stmt)
+
+        vals = (
+            connection.execute(
+                select(stmt.table.c.id).order_by(stmt.table.c.id)
+            )
+            .scalars()
+            .all()
+        )
+        eq_(vals, [1, 2, 3, 4])
index 4ea9aa90f30c161cb62e5bfa8d47958340c02715..51616496a436122b4e3484b8009f2408eef699db 100644 (file)
@@ -44,6 +44,7 @@ if TYPE_CHECKING:
     from sqlalchemy.orm import strategies as _orm_strategies
     from sqlalchemy.orm import strategy_options as _orm_strategy_options
     from sqlalchemy.orm import util as _orm_util
+    from sqlalchemy.sql import ddl as _sql_ddl
     from sqlalchemy.sql import default_comparator as _sql_default_comparator
     from sqlalchemy.sql import dml as _sql_dml
     from sqlalchemy.sql import elements as _sql_elements
@@ -79,6 +80,7 @@ if TYPE_CHECKING:
     orm_strategy_options = _orm_strategy_options
     orm_state = _orm_state
     orm_util = _orm_util
+    sql_ddl = _sql_ddl
     sql_default_comparator = _sql_default_comparator
     sql_dml = _sql_dml
     sql_elements = _sql_elements
index 679fe0b0929c9d67d5d91357a9b3ba587d3ab0cb..caeaf47c6e7f0b7d8d0f395ff7dc43e7981ae600 100644 (file)
@@ -182,7 +182,7 @@ lint = [
     "flake8-rst-docstrings",
     "pydocstyle<4.0.0",
     "pygments",
-    "black==25.1.0",
+    "black==25.9.0",
     "slotscheck>=0.17.0",
     "zimports",  # required by generate_tuple_map_overloads
 ]
index adecb3115838841a014e0d99c2b3065a4a2cf1fa..c23e29042849e933ee9b3d497053ff04ea5ca6bd 100644 (file)
@@ -99,6 +99,20 @@ class DefaultRequirements(SuiteRequirements):
 
         return only_on(["postgresql", "mysql", "mariadb", "sqlite"])
 
+    @property
+    def create_table_as(self):
+        """target platform supports CREATE TABLE AS SELECT."""
+
+        return only_on(
+            ["postgresql", "mysql", "mariadb", "sqlite", "mssql", "oracle"]
+        )
+
+    @property
+    def create_temp_table_as(self):
+        """target platform supports CREATE TEMPORARY TABLE AS SELECT."""
+
+        return only_on(["postgresql", "mysql", "mariadb", "sqlite", "mssql"])
+
     @property
     def index_ddl_if_exists(self):
         """target platform supports IF NOT EXISTS / IF EXISTS for indexes."""
diff --git a/test/sql/test_create_table_as.py b/test/sql/test_create_table_as.py
new file mode 100644 (file)
index 0000000..bf05837
--- /dev/null
@@ -0,0 +1,357 @@
+import re
+
+from sqlalchemy import bindparam
+from sqlalchemy import Column
+from sqlalchemy import Integer
+from sqlalchemy import literal
+from sqlalchemy import MetaData
+from sqlalchemy import String
+from sqlalchemy import Table
+from sqlalchemy import testing
+from sqlalchemy.exc import ArgumentError
+from sqlalchemy.schema import CreateTable
+from sqlalchemy.sql import column
+from sqlalchemy.sql import select
+from sqlalchemy.sql import table
+from sqlalchemy.sql.ddl import CreateTableAs
+from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import is_
+from sqlalchemy.testing.assertions import AssertsCompiledSQL
+from sqlalchemy.testing.assertions import expect_raises_message
+from sqlalchemy.testing.assertions import expect_warnings
+
+
+class CreateTableAsDefaultDialectTest(fixtures.TestBase, AssertsCompiledSQL):
+    __dialect__ = "default"
+
+    @testing.fixture
+    def src_table(self):
+        return Table(
+            "src",
+            MetaData(),
+            Column("id", Integer),
+            Column("name", String(50)),
+        )
+
+    @testing.fixture
+    def src_two_tables(self):
+        a = table("a", column("id"), column("name"))
+        b = table("b", column("id"), column("status"))
+        return a, b
+
+    def test_basic_element(self, src_table):
+        src = src_table
+        stmt = CreateTableAs(
+            select(src.c.id, src.c.name).select_from(src),
+            "dst",
+        )
+        self.assert_compile(
+            stmt,
+            "CREATE TABLE dst AS SELECT src.id, src.name FROM src",
+        )
+
+    def test_schema_element_qualified(self, src_table):
+        src = src_table
+        stmt = CreateTableAs(
+            select(src.c.id).select_from(src),
+            "dst",
+            schema="analytics",
+        )
+        self.assert_compile(
+            stmt,
+            "CREATE TABLE analytics.dst AS SELECT src.id FROM src",
+        )
+
+    def test_blank_schema_treated_as_none(self, src_table):
+        src = src_table
+        stmt = CreateTableAs(
+            select(src.c.id).select_from(src), "dst", schema=""
+        )
+        self.assert_compile(stmt, "CREATE TABLE dst AS SELECT src.id FROM src")
+
+    def test_binds_rendered_inline(self, src_table):
+        src = src_table
+        stmt = CreateTableAs(
+            select(literal("x").label("tag")).select_from(src),
+            "dst",
+        )
+        self.assert_compile(
+            stmt,
+            "CREATE TABLE dst AS SELECT 'x' AS tag FROM src",
+        )
+
+    def test_temporary_no_schema(self, src_table):
+        src = src_table
+        stmt = CreateTableAs(
+            select(src.c.id, src.c.name).select_from(src),
+            "dst",
+            temporary=True,
+        )
+        self.assert_compile(
+            stmt,
+            "CREATE TEMPORARY TABLE dst AS "
+            "SELECT src.id, src.name FROM src",
+        )
+
+    def test_temporary_exists_flags(self, src_table):
+        src = src_table
+        stmt = CreateTableAs(
+            select(src.c.id).select_from(src),
+            "dst",
+            schema="sch",
+            temporary=True,
+            if_not_exists=True,
+        )
+        self.assert_compile(
+            stmt,
+            "CREATE TEMPORARY TABLE "
+            "IF NOT EXISTS sch.dst AS SELECT src.id FROM src",
+        )
+
+    def test_if_not_exists(self, src_table):
+        src = src_table
+        stmt = CreateTableAs(
+            select(src.c.id, src.c.name).select_from(src),
+            "dst",
+            if_not_exists=True,
+        )
+        self.assert_compile(
+            stmt,
+            "CREATE TABLE IF NOT EXISTS dst AS "
+            "SELECT src.id, src.name FROM src",
+        )
+
+    def test_join_with_binds_rendered_inline(self, src_two_tables):
+        a, b = src_two_tables
+
+        s = (
+            select(a.c.id, a.c.name)
+            .select_from(a.join(b, a.c.id == b.c.id))
+            .where(b.c.status == "active")
+        ).into("dst")
+
+        # Ensure WHERE survives into CTAS and binds are rendered inline
+        self.assert_compile(
+            s,
+            "CREATE TABLE dst AS "
+            "SELECT a.id, a.name FROM a JOIN b ON a.id = b.id "
+            "WHERE b.status = 'active'",
+        )
+
+    def test_into_equivalent_to_element(self, src_table):
+        src = src_table
+        s = select(src.c.id).select_from(src).where(src.c.id == 2)
+        via_into = s.into("dst")
+        via_element = CreateTableAs(s, "dst")
+
+        self.assert_compile(
+            via_into,
+            "CREATE TABLE dst AS SELECT src.id FROM src WHERE src.id = 2",
+        )
+        self.assert_compile(
+            via_element,
+            "CREATE TABLE dst AS SELECT src.id FROM src WHERE src.id = 2",
+        )
+
+    def test_into_does_not_mutate_original_select(self, src_table):
+        src = src_table
+        s = select(src.c.id).select_from(src).where(src.c.id == 5)
+
+        # compile original SELECT
+        self.assert_compile(
+            s,
+            "SELECT src.id FROM src WHERE src.id = :id_1",
+        )
+
+        # build CTAS
+        _ = s.into("dst")
+
+        # original is still a SELECT
+        self.assert_compile(
+            s,
+            "SELECT src.id FROM src WHERE src.id = :id_1",
+        )
+
+    def test_into_with_schema_argument(self, src_table):
+        src = src_table
+        s = select(src.c.id).select_from(src).into("t", schema="analytics")
+        self.assert_compile(
+            s,
+            "CREATE TABLE analytics.t AS SELECT src.id FROM src",
+        )
+
+    def test_target_string_must_be_unqualified(self, src_table):
+        src = src_table
+        with expect_raises_message(
+            ArgumentError,
+            re.escape("Target string must be unqualified (use schema=)."),
+        ):
+            CreateTableAs(select(src.c.id).select_from(src), "sch.dst")
+
+    def test_empty_name(self):
+        with expect_raises_message(
+            ArgumentError, "Table name must be non-empty"
+        ):
+            CreateTableAs(select(literal(1)), "")
+
+    @testing.variation("provide_metadata", [True, False])
+    def test_generated_metadata_table_property(
+        self, src_table, provide_metadata
+    ):
+        src = src_table
+
+        if provide_metadata:
+            metadata = MetaData()
+        else:
+            metadata = None
+
+        stmt = CreateTableAs(
+            select(src.c.name.label("thename"), src.c.id).select_from(src),
+            "dst",
+            schema="sch",
+            metadata=metadata,
+        )
+
+        if metadata is not None:
+            is_(stmt.metadata, metadata)
+
+        assert isinstance(stmt.table, Table)
+        is_(stmt.table.metadata, stmt.metadata)
+
+        self.assert_compile(
+            CreateTable(stmt.table),
+            "CREATE TABLE sch.dst (thename VARCHAR(50), id INTEGER)",
+        )
+
+    def test_labels_in_select_list_preserved(self, src_table):
+        src = src_table
+        stmt = CreateTableAs(
+            select(
+                src.c.id.label("user_id"), src.c.name.label("user_name")
+            ).select_from(src),
+            "dst",
+        )
+        self.assert_compile(
+            stmt,
+            "CREATE TABLE dst AS "
+            "SELECT src.id AS user_id, src.name AS user_name FROM src",
+        )
+
+    def test_distinct_and_group_by_survive(self, src_table):
+        src = src_table
+        sel = (
+            select(src.c.name).select_from(src).distinct().group_by(src.c.name)
+        )
+        stmt = CreateTableAs(sel, "dst")
+        self.assert_compile(
+            stmt,
+            "CREATE TABLE dst AS "
+            "SELECT DISTINCT src.name FROM src GROUP BY src.name",
+        )
+
+    def test_bindparam_no_value_raises(self, src_table):
+        src = src_table
+        sel = select(src.c.name).where(src.c.name == bindparam("x"))
+        stmt = CreateTableAs(sel, "dst")
+
+        with expect_warnings(
+            "Bound parameter 'x' rendering literal NULL in a SQL expression;"
+        ):
+            self.assert_compile(
+                stmt,
+                "CREATE TABLE dst AS SELECT src.name FROM src "
+                "WHERE src.name = NULL",
+            )
+
+    def test_union_all_with_binds_rendered_inline(self, src_two_tables):
+        a, b = src_two_tables
+
+        # Named binds so params are deterministic
+        s1 = (
+            select(a.c.id)
+            .select_from(a)
+            .where(a.c.id == bindparam("p_a", value=1))
+        )
+        s2 = (
+            select(b.c.id)
+            .select_from(b)
+            .where(b.c.id == bindparam("p_b", value=2))
+        )
+
+        u_all = s1.union_all(s2)
+        stmt = CreateTableAs(u_all, "dst")
+
+        self.assert_compile(
+            stmt,
+            "CREATE TABLE dst AS "
+            "SELECT a.id FROM a WHERE a.id = 1 "
+            "UNION ALL SELECT b.id FROM b WHERE b.id = 2",
+        )
+
+    def test_union_labels_follow_first_select(self, src_two_tables):
+        # Many engines take column names
+        # of a UNION from the first SELECT’s labels.
+        a = table("a", column("val"))
+        b = table("b", column("val"))
+
+        s1 = select(a.c.val.label("first_name")).select_from(a)
+        s2 = select(b.c.val).select_from(b)  # unlabeled second branch
+
+        u = s1.union(s2)
+        stmt = CreateTableAs(u, "dst")
+
+        # We only assert what’s stable across dialects:
+        #  - first SELECT has the label
+        #  - a UNION occurs
+        self.assert_compile(
+            stmt,
+            "CREATE TABLE dst AS "
+            "SELECT a.val AS first_name FROM a "
+            "UNION "
+            "SELECT b.val FROM b",
+        )
+
+    def test_union_all_with_inlined_literals_smoke(self, src_two_tables):
+        # Proves literal_binds=True behavior applies across branches.
+        a, b = src_two_tables
+        u = (
+            select(literal(1).label("x"))
+            .select_from(a)
+            .union_all(select(literal("b").label("x")).select_from(b))
+        )
+        stmt = CreateTableAs(u, "dst")
+        self.assert_compile(
+            stmt,
+            "CREATE TABLE dst AS "
+            "SELECT 1 AS x FROM a UNION ALL SELECT 'b' AS x FROM b",
+        )
+
+    def test_select_shape_where_order_limit(self, src_table):
+        src = src_table
+        sel = (
+            select(src.c.id, src.c.name)
+            .select_from(src)
+            .where(src.c.id > literal(10))
+            .order_by(src.c.name)
+            .limit(5)
+            .offset(0)
+        )
+        stmt = CreateTableAs(sel, "dst")
+        self.assert_compile(
+            stmt,
+            "CREATE TABLE dst AS "
+            "SELECT src.id, src.name FROM src "
+            "WHERE src.id > 10 ORDER BY src.name LIMIT 5 OFFSET 0",
+        )
+
+    def test_cte_smoke(self, src_two_tables):
+        # Proves CTAS works with a WITH-CTE wrapper and labeled column.
+        a, _ = src_two_tables
+        cte = select(a.c.id.label("aid")).select_from(a).cte("u")
+        stmt = CreateTableAs(select(cte.c.aid), "dst")
+        self.assert_compile(
+            stmt,
+            "CREATE TABLE dst AS "
+            "WITH u AS (SELECT a.id AS aid FROM a) "
+            "SELECT u.aid FROM u",
+        )
diff --git a/test/typing/plain_files/sql/create_table_as.py b/test/typing/plain_files/sql/create_table_as.py
new file mode 100644 (file)
index 0000000..a9530cc
--- /dev/null
@@ -0,0 +1,114 @@
+"""Typing tests for CREATE TABLE AS."""
+
+from sqlalchemy import Column
+from sqlalchemy import Integer
+from sqlalchemy import MetaData
+from sqlalchemy import select
+from sqlalchemy import String
+from sqlalchemy import Table
+from sqlalchemy.sql.ddl import CreateTableAs
+
+# Setup
+metadata = MetaData()
+users = Table(
+    "users",
+    metadata,
+    Column("id", Integer, primary_key=True),
+    Column("name", String(50)),
+    Column("email", String(100)),
+    Column("status", String(20)),
+)
+
+# Test 1: Basic CreateTableAs with string table name
+stmt1 = select(users.c.id, users.c.name).where(users.c.id > 10)
+ctas1 = CreateTableAs(stmt1, "active_users")
+
+# Test 2: CreateTableAs with MetaData (creates Table object)
+ctas2 = CreateTableAs(stmt1, "active_users_table", metadata=metadata)
+
+# Test 3: Using .into() method on Select
+ctas3 = stmt1.into("users_copy")
+
+# Test 4: With schema parameter
+ctas4 = CreateTableAs(stmt1, "users_backup", schema="backup")
+
+# Test 5: With temporary flag
+ctas5 = CreateTableAs(stmt1, "temp_users", temporary=True)
+
+# Test 6: With if_not_exists flag
+ctas6 = CreateTableAs(stmt1, "users_safe", if_not_exists=True)
+
+# Test 7: Combining flags
+ctas7 = CreateTableAs(
+    stmt1, "temp_backup", temporary=True, if_not_exists=True, schema="temp"
+)
+
+# Test 8: Access table property
+dest_table1 = ctas1.table
+dest_table2 = ctas2.table
+
+# Test 9: Access columns from generated table
+id_column = dest_table1.c.id
+name_column = dest_table1.c.name
+
+# Test 10: Use generated table in another select
+new_select = select(dest_table1.c.id, dest_table1.c.name).where(
+    dest_table1.c.id < 100
+)
+
+# Test 11: With column labels
+labeled_stmt = select(
+    users.c.id.label("user_id"),
+    users.c.name.label("user_name"),
+    users.c.email.label("user_email"),
+)
+ctas_labeled = CreateTableAs(labeled_stmt, "labeled_users")
+labeled_table = ctas_labeled.table
+user_id_col = labeled_table.c.user_id
+user_name_col = labeled_table.c.user_name
+
+# Test 12: With WHERE clause
+filtered_stmt = select(users.c.id, users.c.status).where(
+    users.c.status == "active"
+)
+ctas_filtered = CreateTableAs(filtered_stmt, "active_status")
+
+# Test 13: With JOIN
+orders = Table(
+    "orders",
+    metadata,
+    Column("id", Integer, primary_key=True),
+    Column("user_id", Integer),
+    Column("amount", Integer),
+)
+join_stmt = select(users.c.id, users.c.name, orders.c.amount).select_from(
+    users.join(orders, users.c.id == orders.c.user_id)
+)
+ctas_join = CreateTableAs(join_stmt, "user_orders")
+
+# Test 14: With UNION
+stmt_a = select(users.c.id, users.c.name).where(users.c.status == "active")
+stmt_b = select(users.c.id, users.c.name).where(users.c.status == "pending")
+union_stmt = stmt_a.union(stmt_b)
+ctas_union = CreateTableAs(union_stmt, "combined_users")
+
+# Test 15: .into() with metadata
+ctas_into_meta = stmt1.into("users_copy_meta", metadata=metadata)
+
+# Test 16: .into() with all options
+ctas_into_full = stmt1.into(
+    "full_copy", metadata=metadata, schema="backup", temporary=True
+)
+
+# Test 17: Verify generated table can be used in expressions
+generated = ctas1.table
+count_stmt = select(generated.c.id).where(generated.c.id > 5)
+
+# Test 18: Chained operations
+final_stmt = (
+    select(users.c.id, users.c.name)
+    .where(users.c.status == "active")
+    .into("final_result")
+)
+final_table = final_stmt.table
+final_id = final_table.c.id