From 98361f19a8144cbc8d528c62a873786be65fc026 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sat, 25 Oct 2025 13:19:48 -0400 Subject: [PATCH] Add DDL association to Table, CreateView support Added support for the SQL ``CREATE VIEW`` statement via the new :class:`.CreateView` DDL class. The new class allows creating database views from SELECT statements, with support for options such as ``TEMPORARY``, ``IF NOT EXISTS``, and ``MATERIALIZED`` where supported by the target database. Views defined with :class:`.CreateView` integrate with :class:`.MetaData` for automated DDL generation and provide a :class:`.Table` object for querying. this alters the CreateTableAs commit: * adds the ability for Table to be associated with Create and Drop DDL constructs * Adds CreateView variant of CreateTableAs * Both associate themselves with Table so they take place in create_all/create/drop_all/drop Fixes: #181 Change-Id: If3e568d3d6a6ce19e3d15198c3fbbe06bd847c83 --- doc/build/changelog/migration_21.rst | 128 ++- doc/build/changelog/unreleased_21/181.rst | 17 + doc/build/changelog/unreleased_21/4950.rst | 6 +- doc/build/core/ddl.rst | 7 +- doc/build/core/metadata.rst | 312 +++++++- doc/build/tutorial/data_select.rst | 102 --- lib/sqlalchemy/__init__.py | 4 + lib/sqlalchemy/dialects/mssql/base.py | 7 + lib/sqlalchemy/dialects/sqlite/base.py | 13 + lib/sqlalchemy/schema.py | 3 +- lib/sqlalchemy/sql/compiler.py | 39 +- lib/sqlalchemy/sql/ddl.py | 435 +++++++--- lib/sqlalchemy/sql/schema.py | 67 ++ lib/sqlalchemy/sql/selectable.py | 4 +- lib/sqlalchemy/testing/config.py | 9 + lib/sqlalchemy/testing/provision.py | 4 +- lib/sqlalchemy/testing/requirements.py | 5 + lib/sqlalchemy/testing/suite/__init__.py | 2 +- .../testing/suite/test_create_table_as.py | 329 -------- lib/sqlalchemy/testing/suite/test_ddl.py | 31 + .../testing/suite/test_reflection.py | 29 +- .../testing/suite/test_table_via_select.py | 688 ++++++++++++++++ test/base/test_tutorials.py | 4 +- test/base/test_utils.py | 19 + test/dialect/mssql/test_compiler.py | 25 + test/dialect/sqlite/test_compiler.py | 34 + test/requirements.py | 5 + test/sql/test_create_table_as.py | 357 --------- test/sql/test_ddlemit.py | 137 +++- test/sql/test_metadata.py | 46 ++ test/sql/test_table_via_select.py | 753 ++++++++++++++++++ 31 files changed, 2674 insertions(+), 947 deletions(-) create mode 100644 doc/build/changelog/unreleased_21/181.rst delete mode 100644 lib/sqlalchemy/testing/suite/test_create_table_as.py create mode 100644 lib/sqlalchemy/testing/suite/test_table_via_select.py delete mode 100644 test/sql/test_create_table_as.py create mode 100644 test/sql/test_table_via_select.py diff --git a/doc/build/changelog/migration_21.rst b/doc/build/changelog/migration_21.rst index cbd26f476b..ba9011c37c 100644 --- a/doc/build/changelog/migration_21.rst +++ b/doc/build/changelog/migration_21.rst @@ -620,42 +620,120 @@ not the database portion:: .. _change_4950: -CREATE TABLE AS SELECT Support -------------------------------- +CREATE VIEW and CREATE TABLE AS SELECT Support +---------------------------------------------- + +SQLAlchemy 2.1 adds support for the SQL ``CREATE VIEW`` and +``CREATE TABLE ... AS SELECT`` constructs, as well as the ``SELECT ... INTO`` +variant for selected backends. Both DDL statements generate a table +or table-like construct based on the structure and rows represented by a +SELECT statement. The constructs are available via the :class:`.CreateView` +and :class:`_schema.CreateTableAs` DDL classes, as well as the +:meth:`_sql.SelectBase.into` convenience method. + +Both constructs work in exactly the same way, including that a :class:`.Table` +object is automatically generated from a given :class:`.Select`. DDL +can then be emitted by executing the construct directly or by allowing the +:meth:`.MetaData.create_all` or :meth:`.Table.create` sequences to emit the +correct DDL. + +E.g. using :class:`.CreateView`:: + + >>> from sqlalchemy import Table, Column, Integer, String, MetaData + >>> from sqlalchemy import CreateView, select + >>> + >>> metadata_obj = MetaData() + >>> user_table = Table( + ... "user_account", + ... metadata_obj, + ... Column("id", Integer, primary_key=True), + ... Column("name", String(30)), + ... Column("fullname", String), + ... ) + >>> view = CreateView( + ... select(user_table).where(user_table.c.name.like("%spongebob%")), + ... "spongebob_view", + ... metadata=metadata_obj, + ... ) + + +The above ``CreateView`` construct will emit CREATE VIEW when executed directly, +or when a DDL create operation is run. When using :meth:`.MetaData.create_all`, +the view is created after all dependent tables have been created: -SQLAlchemy 2.1 adds support for the SQL ``CREATE TABLE ... AS SELECT`` -construct as well as the ``SELECT ... INTO`` variant for selected backends, -which creates a new table directly from the results of a SELECT -statement. This is available via the new :class:`_schema.CreateTableAs` DDL -construct and the :meth:`_sql.SelectBase.into` convenience method. +.. sourcecode:: pycon+sql + + >>> from sqlalchemy import create_engine + >>> e = create_engine("sqlite://", echo=True) + >>> metadata_obj.create_all(e) + {opensql}BEGIN (implicit) + + CREATE TABLE user_account ( + id INTEGER NOT NULL, + name VARCHAR(30), + fullname VARCHAR, + PRIMARY KEY (id) + ) -The :class:`_schema.CreateTableAs` construct can be used to create a new table -from any SELECT statement:: + CREATE VIEW spongebob_view AS + SELECT user_account.id, user_account.name, user_account.fullname + FROM user_account + WHERE user_account.name LIKE '%spongebob%' - >>> from sqlalchemy import select, CreateTableAs - >>> select_stmt = select(users.c.id, users.c.name).where(users.c.status == "active") - >>> create_table_as = CreateTableAs(select_stmt, "active_users") + COMMIT -The above construct renders as a ``CREATE TABLE AS`` statement:: +The view is usable in SQL expressions via the :attr:`.CreateView.table` attribute: - >>> print(create_table_as) - CREATE TABLE active_users AS SELECT users.id, users.name - FROM users - WHERE users.status = 'active' +.. sourcecode:: pycon+sql + + >>> with e.connect() as conn: + ... conn.execute(select(view.table)) + {opensql}BEGIN (implicit) + SELECT spongebob_view.id, spongebob_view.name, spongebob_view.fullname + FROM spongebob_view + + ROLLBACK -The construct can be executed to emit the above DDL, and the table may then -be accessed using the :attr:`.CreateTableAs.table` attribute which -supplies a :class:`.Table`:: +:class:`_schema.CreateTableAs` works in the same way, emitting ``CREATE TABLE AS``:: - >>> print(select(create_table_as.table)) - SELECT users.id, users.name - FROM active_users + >>> from sqlalchemy import CreateTableAs + >>> select_stmt = select(users.c.id, users.c.name).where(users.c.name == "squidward") + >>> create_table_as = CreateTableAs(select_stmt, "squidward_users") -See :ref:`tutorial_create_table_as` for a tutorial. +In this case, :class:`.CreateTableAs` was not given a :class:`.MetaData` collection. +While a :class:`.MetaData` collection will be created automatically in this case, +the actual ``CREATE TABLE AS`` statement can also be generated by directly +executing the object: + +.. sourcecode:: pycon+sql + + >>> with e.begin() as conn: + ... conn.execute(create_table_as) + {opensql}BEGIN (implicit) + CREATE TABLE squidward_users AS SELECT user_account.id, user_account.name + FROM user_account + WHERE user_account.name = 'squidward' + COMMIT + +Like before, the :class:`.Table` is accessible from :attr:`.CreateTableAs.table`: + +.. sourcecode:: pycon+sql + + >>> with e.connect() as conn: + ... conn.execute(select(create_table_as.table)) + {opensql}BEGIN (implicit) + SELECT squidward_users.id, squidward_users.name + FROM squidward_users + + ROLLBACK .. seealso:: - :ref:`tutorial_create_table_as` - in the :ref:`unified_tutorial` + :ref:`metadata_create_view` - in :ref:`metadata_toplevel` + + :ref:`metadata_create_table_as` - in :ref:`metadata_toplevel` + + :class:`_schema.CreateView` - DDL construct for CREATE VIEW :class:`_schema.CreateTableAs` - DDL construct for CREATE TABLE AS diff --git a/doc/build/changelog/unreleased_21/181.rst b/doc/build/changelog/unreleased_21/181.rst new file mode 100644 index 0000000000..cca64a89bf --- /dev/null +++ b/doc/build/changelog/unreleased_21/181.rst @@ -0,0 +1,17 @@ +.. change:: + :tags: feature, schema + :tickets: 181 + + Added support for the SQL ``CREATE VIEW`` statement via the new + :class:`.CreateView` DDL class. The new class allows creating database + views from SELECT statements, with support for options such as + ``TEMPORARY``, ``IF NOT EXISTS``, and ``MATERIALIZED`` where supported by + the target database. Views defined with :class:`.CreateView` integrate with + :class:`.MetaData` for automated DDL generation and provide a + :class:`.Table` object for querying. + + .. seealso:: + + :ref:`change_4950` + + diff --git a/doc/build/changelog/unreleased_21/4950.rst b/doc/build/changelog/unreleased_21/4950.rst index 3e0c98601c..b5d0a24f54 100644 --- a/doc/build/changelog/unreleased_21/4950.rst +++ b/doc/build/changelog/unreleased_21/4950.rst @@ -1,5 +1,5 @@ .. change:: - :tags: feature, sql + :tags: feature, schema :tickets: 4950 Added support for the SQL ``CREATE TABLE ... AS SELECT`` construct via the @@ -7,7 +7,9 @@ :meth:`_sql.Select.into` method. The new construct allows creating a table directly from the results of a SELECT statement, with support for options such as ``TEMPORARY`` and ``IF NOT EXISTS`` where supported by the - target database. Pull request courtesy Greg Jarzab. + target database. Tables defined with :class:`_schema.CreateTableAs` + integrate with :class:`.MetaData` for automated DDL generation and provide + a :class:`.Table` object for querying. Pull request courtesy Greg Jarzab. .. seealso:: diff --git a/doc/build/core/ddl.rst b/doc/build/core/ddl.rst index 8b21d5e614..831ea7bc4b 100644 --- a/doc/build/core/ddl.rst +++ b/doc/build/core/ddl.rst @@ -329,21 +329,20 @@ DDL Expression Constructs API .. autoclass:: CreateTable :members: -.. autoclass:: CreateTableAs - :members: - .. autoclass:: DropTable :members: +.. autoclass:: DropView + :members: .. autoclass:: CreateColumn :members: +.. autofunction:: insert_sentinel .. autoclass:: CreateSequence :members: - .. autoclass:: DropSequence :members: diff --git a/doc/build/core/metadata.rst b/doc/build/core/metadata.rst index 318509bbda..93af90e42e 100644 --- a/doc/build/core/metadata.rst +++ b/doc/build/core/metadata.rst @@ -153,6 +153,7 @@ table include:: ``employees.c["some column"]``. See :class:`_sql.ColumnCollection` for further information. +.. _metadata_creating_and_dropping: Creating and Dropping Database Tables ------------------------------------- @@ -558,9 +559,309 @@ The schema feature of SQLAlchemy interacts with the table reflection feature introduced at :ref:`metadata_reflection_toplevel`. See the section :ref:`metadata_reflection_schemas` for additional details on how this works. +.. _metadata_alt_create_forms: + +Alternate CREATE TABLE forms: CREATE VIEW, CREATE TABLE AS +---------------------------------------------------------- + +.. versionadded:: 2.1 SQLAlchemy 2.1 introduces new table creation DDL + sequences CREATE VIEW and CREATE TABLE AS, both of which create a + table or table-like object derived from a SELECT statement + +The :meth:`.MetaData.create_all` sequence discussed at +:ref:`metadata_creating_and_dropping` makes use of a :class:`.DDL` construct +called :class:`.CreateTable` in order to emit the actual ``CREATE TABLE`` +statement. SQLAlchemy 2.1 features additional DDL constructs that can create +tables and views from SELECT statements: :class:`.CreateTableAs` and +:class:`.CreateView`. Both classes are constructed with a :func:`_sql.select` +object that serves as the source of data. Once constructed, they each provide +access to a dynamically-generated :class:`.Table` object that contains the +correct name and :class:`.Column` configuration; this :class:`.Table` can +then be used in subsequent :func:`_sql.select` statements to query the new +table or view. To emit the actual ``CREATE TABLE AS`` or ``CREATE VIEW`` +statement to a database, the :class:`.CreateTableAs` or :class:`.CreateView` +objects may be invoked directly via :meth:`.Connection.execute`, or they +will be invoked automatically via the :meth:`.Table.create` method or +:meth:`.MetaData.create_all` if a :class:`.MetaData` is provided to the +constructor. + + +.. _metadata_create_view: + +Using :class:`.CreateView` +^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The :class:`.CreateView` construct provides support for the ``CREATE VIEW`` +DDL construct, which allows the creation of database views that represent +the result of a SELECT statement. Unlike a table, a view does not store data +directly; instead, it dynamically evaluates the underlying SELECT query +whenever the view is accessed. A compatible SQL syntax is supported by all +included SQLAlchemy backends. + +A :class:`.CreateView` expression may be produced from a +:func:`_sql.select` created against any combinations of tables:: + + >>> from sqlalchemy.sql.ddl import CreateView + >>> select_stmt = select(user.c.user_id, user.c.user_name).where(user.c.status == "active") + >>> create_view = CreateView(select_stmt, "active_users") + +Stringifying this construct illustrates the ``CREATE VIEW`` syntax:: + + >>> print(create_view) + CREATE VIEW active_users AS SELECT "user".user_id, "user".user_name + FROM "user" + WHERE "user".status = 'active' + +A :class:`.Table` object corresponding to the structure of the view that would +be created can be accessed via the :attr:`.CreateView.table` attribute as soon +as the object is constructed. New Core :func:`_sql.select` +objects can use this :class:`.Table` like any other selectable:: + + >>> view_stmt = select(create_view.table).where(create_view.table.c.user_id > 5) + >>> print(view_stmt) + SELECT active_users.user_id, active_users.user_name + FROM active_users + WHERE active_users.user_id > :user_id_1 + +The DDL for :class:`.CreateView` may be executed in a database either +by calling standard :meth:`.Table.create` or :meth:`.MetaData.create_all` +methods, or by executing the construct directly: -Backend-Specific Options ------------------------- +.. sourcecode:: pycon+sql + + >>> with engine.begin() as connection: + ... connection.execute(create_view) + {opensql}BEGIN (implicit) + CREATE VIEW active_users AS SELECT user.user_id, user.user_name + FROM user + WHERE user.status = 'active' + COMMIT + +The database now has a new view ``active_users`` which will dynamically +evaluate the SELECT statement whenever the view is queried. + +:class:`.CreateView` interacts with a :class:`.MetaData` collection; an +explicit :class:`.MetaData` may be passed using the +:paramref:`.CreateView.metadata` parameter, where operations like +:meth:`.MetaData.create_all` and :meth:`.MetaData.drop_all` may be used to +emit a CREATE / DROP DDL within larger DDL sequences. :class:`.CreateView` +includes itself in the new :class:`.Table` via the :meth:`.Table.set_creator_ddl` +method and also applies :class:`.DropView` to the :meth:`.Table.set_dropper_ddl` +elements, so that ``CREATE VIEW`` and ``DROP VIEW`` will be emitted for the +:class:`.Table`: + +.. sourcecode:: pycon+sql + + >>> create_view = CreateView(select_stmt, "active_users", metadata=metadata_obj) + >>> metadata_obj.create_all(engine) + {opensql}BEGIN (implicit) + PRAGMA main.table_info("active_users") + ... + CREATE VIEW active_users AS SELECT user.user_id, user.user_name + FROM user + WHERE user.status = 'active' + COMMIT + +DROP may be emitted for this view alone using :meth:`.Table.drop` +against :attr:`.CreateView.table`, just like it would be used for +any other table; the :class:`.DropView` DDL construct will be invoked: + +.. sourcecode:: pycon+sql + + >>> create_view.table.drop(engine) + {opensql}DROP VIEW active_users + COMMIT + +:class:`.CreateView` supports optional flags such as ``TEMPORARY``, +``OR REPLACE``, and ``MATERIALIZED`` where supported by the target +database:: + + >>> # Create a view with OR REPLACE + >>> stmt = CreateView( + ... select(user.c.user_id, user.c.user_name), + ... "user_snapshot", + ... or_replace=True, + ... ) + >>> print(stmt) + CREATE OR REPLACE VIEW user_snapshot AS SELECT user.user_id, user.user_name + FROM user + +The ``OR REPLACE`` clause renders in all forms, including a simple use +of :meth:`.Table.create`, which does not use a "checkfirst" query by default:: + + >>> stmt.table.create(engine) + BEGIN (implicit) + CREATE OR REPLACE VIEW user_snapshot AS SELECT user.user_id, user.user_name + FROM user + COMMIT + +.. tip:: + + The exact phrase ``OR REPLACE`` is supported by PostgreSQL, Oracle + Database, MySQL and MariaDB. When :class:`.CreateView` with + :paramref:`.CreateView.or_replace` is used on Microsoft SQL Server, the + equivalent keywords ``OR ALTER`` is emitted instead. The remaining + SQLAlchemy-native dialect, SQLite, remains an outlier - for SQLite, the + dialect-specific parameter ``sqlite_if_not_exists`` may be used to create a + view with a check for already existing:: + + stmt = CreateView( + select(user.c.user_id, user.c.user_name), + "user_snapshot", + sqlite_if_not_exists=True, + ) + + ``sqlite_if_not_exists`` is separate from :paramref:`.CreateView.or_replace` + since it has a different meaning, leaving an existing view unmodified + whereas :paramref:`.CreateView.or_replace` will update the definition of + an existing view. + +The ``MATERIALIZED`` keyword may be emitted by specifying :paramref:`.CreateView.materialized`:: + + >>> stmt = CreateView( + ... select(user.c.user_id, user.c.user_name), + ... "user_snapshot", + ... materialized=True, + ... ) + >>> print(stmt) + CREATE MATERIALIZED VIEW user_snapshot AS SELECT user.user_id, user.user_name + FROM user + +Materialized views store the query results physically and can offer performance +benefits for complex queries, though they typically need to be refreshed periodically +using database-specific commands. The Oracle and PostgreSQL backends currently +support ``MATERIALIZED``; however it may be the case that ``MATERIALIZED`` cannot be +combined with ``OR REPLACE``. + + +.. _metadata_create_table_as: + +Using :class:`.CreateTableAs` or :meth:`.Select.into` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + + +The :class:`.CreateTableAs` construct, along with a complementing method +:meth:`.Select.into`, provides support for the "CREATE TABLE AS" / "SELECT INTO" +DDL constructs, which allows the creation of new tables in the database that +represent the contents of an arbitrary SELECT statement. A compatible SQL +syntax is supported by all included SQLAlchemy backends. + +A :class:`_schema.CreateTableAs` expression may be produced from a +:func:`_sql.select` created against any combinations of tables:: + + >>> from sqlalchemy import select, CreateTableAs + >>> select_stmt = select(user.c.user_id, user.c.user_name).where( + ... user.c.user_name.like("sponge%") + ... ) + >>> create_table_as = CreateTableAs(select_stmt, "spongebob_users") + +The equivalent :meth:`.Select.into` method may also be used; this creates +a :class:`.CreateTableAs` construct as well:: + + >>> create_table_as = select_stmt.into("spongebob_users") + +Stringifying this construct on most backends illustrates the ``CREATE TABLE AS`` syntax:: + + >>> print(create_table_as) + CREATE TABLE spongebob_users AS SELECT "user".user_id, "user".user_name + FROM "user" + WHERE "user".user_name LIKE 'sponge%' + +On Microsoft SQL Server, SELECT INTO is generated instead:: + + >>> from sqlalchemy.dialects import mssql + >>> print(create_table_as.compile(dialect=mssql.dialect())) + SELECT [user].user_id, [user].user_name INTO spongebob_users + FROM [user] + WHERE [user].user_name LIKE 'sponge%' + +A :class:`.Table` object corresponding to the structure of the view that would +be created can be accessed via the :attr:`.CreateTableAs.table` attribute as soon +as the object is constructed. New Core :func:`_sql.select` +objects can use this :class:`.Table` like any other selectable:: + + >>> ctas_stmt = select(create_table_as.table).where(create_table_as.table.c.user_id > 5) + >>> print(ctas_stmt) + SELECT spongebob_users.user_id, spongebob_users.user_name + FROM spongebob_users + WHERE spongebob_users.user_id > :user_id_1 + +The DDL for :class:`.CreateTableAs` may be executed in a database either +by calling standard :meth:`.Table.create` or :meth:`.MetaData.create_all` +methods, or by executing the construct directly: + +.. sourcecode:: pycon+sql + + >>> with engine.begin() as connection: + ... connection.execute(create_table_as) + {opensql}BEGIN (implicit) + CREATE TABLE spongebob_users AS SELECT user.user_id, user.user_name + FROM user + WHERE user.user_name LIKE 'sponge%' + COMMIT + +The database now has a new table ``spongebob_users`` which contains all the +columns and rows that would be returned by the SELECT statement. This is a +real table in the database that will remain until we drop it (unless it's a +temporary table that automatically drops, or if transactional DDL is rolled +back). + +Like :class:`.CreateView`, :class:`.CreateTableAs` interacts +with a :class:`.MetaData` collection; an explicit :class:`.MetaData` may be +passed using the :paramref:`.CreateTableAs.metadata` parameter, where +operations like :meth:`.MetaData.create_all` and :meth:`.MetaData.drop_all` may +be used to emit a CREATE / DROP DDL within larger DDL sequences. :class:`.CreateView` +includes itself in the new :class:`.Table` via the :meth:`.Table.set_creator_ddl` +method, so that ``CREATE TABLE AS `` will be emitted for the +:class:`.Table`: + +.. sourcecode:: pycon+sql + + >>> create_table_as = CreateTableAs(select_stmt, "spongebob_users", metadata=metadata_obj) + >>> metadata_obj.create_all(engine) + {opensql}BEGIN (implicit) + PRAGMA main.table_info("spongebob_users") + ... + CREATE TABLE spongebob_users AS SELECT user.user_id, user.user_name + FROM user + WHERE user.user_name LIKE 'sponge%' + COMMIT + + +DROP may be emitted for this table alone using :meth:`.Table.drop` +against :attr:`.CreateTableAs.table`, just like it would be used for +any other table: + +.. sourcecode:: pycon+sql + + >>> create_table_as.table.drop(engine) + {opensql}DROP TABLE spongebob_users + COMMIT + +:class:`.CreateTableAs` and :meth:`.Select.into` both support optional flags +such as ``TEMPORARY`` and ``IF NOT EXISTS`` where supported by the target +database:: + + >>> # Create a temporary table with IF NOT EXISTS + >>> stmt = select(user.c.user_id, user.c.user_name).into( + ... "temp_snapshot", temporary=True, if_not_exists=True + ... ) + >>> print(stmt) + CREATE TEMPORARY TABLE IF NOT EXISTS temp_snapshot AS SELECT user_account.id, user_account.name + FROM user_account + +The ``IF NOT EXISTS`` clause renders in all forms, including a simple use +of :meth:`.Table.create`, which does not use a "checkfirst" query by default:: + + >>> stmt.table.create(engine) + BEGIN (implicit) + CREATE TEMPORARY TABLE IF NOT EXISTS temp_snapshot AS SELECT user.user_id, user.user_name + FROM user + COMMIT + + +Backend-Specific Options for :class:`.Table` +-------------------------------------------- :class:`~sqlalchemy.schema.Table` supports database-specific options. For example, MySQL has different table backend types, including "MyISAM" and @@ -597,6 +898,11 @@ Column, Table, MetaData API :members: :inherited-members: +.. autoclass:: CreateTableAs + :members: + +.. autoclass:: CreateView + :members: .. autoclass:: MetaData :members: @@ -607,8 +913,6 @@ Column, Table, MetaData API .. autoclass:: SchemaItem :members: -.. autofunction:: insert_sentinel - .. autoclass:: Table :members: :inherited-members: diff --git a/doc/build/tutorial/data_select.rst b/doc/build/tutorial/data_select.rst index 706bb78800..0b55d06c56 100644 --- a/doc/build/tutorial/data_select.rst +++ b/doc/build/tutorial/data_select.rst @@ -1818,108 +1818,6 @@ where it is usable for custom SQL functions:: :ref:`postgresql_column_valued` - in the :ref:`postgresql_toplevel` documentation. -.. _tutorial_create_table_as: - -Using CREATE TABLE AS / SELECT INTO with :func:`.select` -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -.. versionadded:: 2.1 - -The :class:`.CreateTableAs` construct, along with a complementing method -:meth:`.Select.into`, provides support for the "CREATE TABLE AS" / "SELECT INTO" -DDL constructs, which allows the creation of new tables in the database that -represent the contents of an arbitrary SELECT statement. This SQL syntax -is supported by all included SQLAlchemy backends. - -We can produce a :class:`_schema.CreateTableAs` expression from a -:func:`_sql.select` created against any combinations of tables:: - - >>> from sqlalchemy import select, CreateTableAs - >>> select_stmt = select(User.id, User.name).where(User.name.like("sponge%")) - >>> create_table_as = CreateTableAs(select_stmt, "spongebob_users") - -We can also use the equivalent :meth:`.Select.into` method:: - - >>> create_table_as = select_stmt.into("spongebob_users") - -Stringifying this construct on most backends illustrates the ``CREATE TABLE AS`` syntax:: - - >>> print(create_table_as) - CREATE TABLE spongebob_users AS SELECT user_account.id, user_account.name - FROM user_account - WHERE user_account.name LIKE 'sponge%' - -On Microsoft SQL Server, we observe that SELECT INTO is generated instead:: - - >>> from sqlalchemy.dialects import mssql - >>> print(create_table_as.compile(dialect=mssql.dialect())) - SELECT user_account.id, user_account.name INTO spongebob_users - FROM user_account - WHERE user_account.name LIKE 'sponge%' - -We can invoke the :class:`.CreateTableAs` construct directly on a database -connection to create the new table in the database:: - - >>> session.execute(create_table_as) - {execsql}BEGIN (implicit) - CREATE TABLE spongebob_users AS SELECT user_account.id, user_account.name - FROM user_account - WHERE user_account.name LIKE 'sponge%' - [...] () - {stop} - -The database now has a new table ``spongebob_users`` which contains all the columns and rows -that would be returned by the SELECT statement. This is a real table -in the database that will remain until we drop it (unless it's a temporary -table that automatically drops, or if transactional DDL is rolled back). - -To use the new table with SQLAlchemy Core expressions, we can access a -new :class:`.Table` via the :attr:`.CreateTableAs.table` attribute; this -:class:`.Table` is by default associated with a newly created -:class:`.MetaData` object local to the :class:`.CreateTableAs` object: - -.. sourcecode:: pycon+sql - - >>> select_stmt = select(create_table_as.table) - >>> result = session.execute(select_stmt) - {execsql}SELECT spongebob_users.id, spongebob_users.name - FROM spongebob_users - [...] () - {stop}>>> result.all() - {execsql}[(1, 'spongebob')] - -To emit DROP for this table, we use :meth:`.Table.drop`:: - - >>> create_table_as.table.drop(session.connection()) - {execsql}DROP TABLE spongebob_users - [...] () - -Alternatively, we can associate the :class:`.CreateTableAs` with an existing -:class:`.MetaData` using the :paramref:`.CreateTableAs.metadata` parameter, in -which case operations like :meth:`.MetaData.drop_all` will include a DROP for -this table. - -.. note:: The :class:`.CreateTableAs` construct is not currently included in the - sequence initiated by :meth:`.MetaData.create_all`, meaning that this - operation would emit a simple ``CREATE TABLE`` for the table, rather than - using ``CREATE TABLE AS`` or ``SELECT INTO``, which would omit the - ``SELECT`` statement; so when associating - :class:`.CreateTableAs` with an existing :class:`.MetaData`, be sure to - ensure that :meth:`.MetaData.create_all` is not called on that :class:`.MetaData` - unless the :class:`.CreateTableAs` construct were already invoked for that - database, assuring the table already exists. - -:class:`.CreateTableAs` and :meth:`.Select.into` both support optional flags -such as ``TEMPORARY`` and ``IF NOT EXISTS`` where supported by the target -database:: - - >>> # Create a temporary table with IF NOT EXISTS - >>> stmt = select(User.id, User.name).into( - ... "temp_snapshot", temporary=True, if_not_exists=True - ... ) - >>> print(stmt) - CREATE TEMPORARY TABLE IF NOT EXISTS temp_snapshot AS SELECT user_account.id, user_account.name - FROM user_account .. _tutorial_casts: diff --git a/lib/sqlalchemy/__init__.py b/lib/sqlalchemy/__init__.py index 5cdeb2074f..150417fd0f 100644 --- a/lib/sqlalchemy/__init__.py +++ b/lib/sqlalchemy/__init__.py @@ -62,10 +62,14 @@ from .schema import Column as Column from .schema import ColumnDefault as ColumnDefault from .schema import Computed as Computed from .schema import Constraint as Constraint +from .schema import CreateTable as CreateTable from .schema import CreateTableAs as CreateTableAs +from .schema import CreateView as CreateView from .schema import DDL as DDL from .schema import DDLElement as DDLElement from .schema import DefaultClause as DefaultClause +from .schema import DropTable as DropTable +from .schema import DropView as DropView from .schema import ExecutableDDLElement as ExecutableDDLElement from .schema import FetchedValue as FetchedValue from .schema import ForeignKey as ForeignKey diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py index abc129a91d..94f265ebed 100644 --- a/lib/sqlalchemy/dialects/mssql/base.py +++ b/lib/sqlalchemy/dialects/mssql/base.py @@ -2728,6 +2728,13 @@ class MSDDLCompiler(compiler.DDLCompiler): return result + def visit_create_view(self, create, **kw): + # SQL Server uses CREATE OR ALTER instead of CREATE OR REPLACE + result = super().visit_create_view(create, **kw) + if create.or_replace: + result = result.replace("CREATE OR REPLACE", "CREATE OR ALTER") + return result + def visit_primary_key_constraint(self, constraint, **kw): if len(constraint) == 0: return "" diff --git a/lib/sqlalchemy/dialects/sqlite/base.py b/lib/sqlalchemy/dialects/sqlite/base.py index 3c7cc7d99f..77d0788287 100644 --- a/lib/sqlalchemy/dialects/sqlite/base.py +++ b/lib/sqlalchemy/dialects/sqlite/base.py @@ -1009,6 +1009,7 @@ from ...engine import reflection from ...engine.reflection import ReflectionDefaults from ...sql import coercions from ...sql import compiler +from ...sql import ddl as sa_ddl from ...sql import elements from ...sql import roles from ...sql import schema @@ -1888,6 +1889,17 @@ class SQLiteDDLCompiler(compiler.DDLCompiler): else: return "" + def visit_create_view(self, create, **kw): + """Handle SQLite if_not_exists dialect option for CREATE VIEW.""" + # Get the if_not_exists dialect option from the CreateView object + if_not_exists = create.dialect_options["sqlite"].get( + "if_not_exists", False + ) + + # Pass if_not_exists through kw to the parent's _generate_table_select + kw["if_not_exists"] = if_not_exists + return super().visit_create_view(create, **kw) + class SQLiteTypeCompiler(compiler.GenericTypeCompiler): def visit_large_binary(self, type_, **kw): @@ -2134,6 +2146,7 @@ class SQLiteDialect(default.DefaultDialect): }, ), (sa_schema.Constraint, {"on_conflict": None}), + (sa_ddl.CreateView, {"if_not_exists": False}), ] _broken_fk_pragma_quotes = False diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py index 0fba390e43..df215a672d 100644 --- a/lib/sqlalchemy/schema.py +++ b/lib/sqlalchemy/schema.py @@ -11,7 +11,6 @@ from __future__ import annotations from .sql.base import SchemaVisitor as SchemaVisitor from .sql.ddl import _CreateDropBase as _CreateDropBase -from .sql.ddl import _DropView as _DropView from .sql.ddl import AddConstraint as AddConstraint from .sql.ddl import BaseDDLElement as BaseDDLElement from .sql.ddl import CheckFirst as CheckFirst @@ -21,6 +20,7 @@ from .sql.ddl import CreateSchema as CreateSchema from .sql.ddl import CreateSequence as CreateSequence from .sql.ddl import CreateTable as CreateTable from .sql.ddl import CreateTableAs as CreateTableAs +from .sql.ddl import CreateView as CreateView from .sql.ddl import DDL as DDL from .sql.ddl import DDLElement as DDLElement from .sql.ddl import DropColumnComment as DropColumnComment @@ -31,6 +31,7 @@ from .sql.ddl import DropSchema as DropSchema from .sql.ddl import DropSequence as DropSequence from .sql.ddl import DropTable as DropTable from .sql.ddl import DropTableComment as DropTableComment +from .sql.ddl import DropView as DropView from .sql.ddl import ExecutableDDLElement as ExecutableDDLElement from .sql.ddl import InvokeDDLBase as InvokeDDLBase from .sql.ddl import SetColumnComment as SetColumnComment diff --git a/lib/sqlalchemy/sql/compiler.py b/lib/sqlalchemy/sql/compiler.py index 9d2a9e3bd1..8abe4da201 100644 --- a/lib/sqlalchemy/sql/compiler.py +++ b/lib/sqlalchemy/sql/compiler.py @@ -94,7 +94,9 @@ if typing.TYPE_CHECKING: from .base import CompileState from .base import Executable from .cache_key import CacheKey + from .ddl import _TableViaSelect from .ddl import CreateTableAs + from .ddl import CreateView from .ddl import ExecutableDDLElement from .dml import Delete from .dml import Insert @@ -6928,18 +6930,42 @@ class DDLCompiler(Compiled): text += "\n)%s\n\n" % self.post_create_table(table) return text + def visit_create_view(self, element: CreateView, **kw: Any) -> str: + return self._generate_table_select(element, "view", **kw) + def visit_create_table_as(self, element: CreateTableAs, **kw: Any) -> str: + return self._generate_table_select(element, "create_table_as", **kw) + + def _generate_table_select( + self, + element: _TableViaSelect, + type_: str, + if_not_exists: Optional[bool] = None, + **kw: Any, + ) -> str: prep = self.preparer inner_kw = dict(kw) inner_kw["literal_binds"] = True select_sql = self.sql_compiler.process(element.selectable, **inner_kw) + # Use if_not_exists parameter if provided, otherwise use element's + use_if_not_exists = ( + if_not_exists + if if_not_exists is not None + else element.if_not_exists + ) + parts = [ "CREATE", + "OR REPLACE" if getattr(element, "or_replace", False) else None, "TEMPORARY" if element.temporary else None, - "TABLE", - "IF NOT EXISTS" if element.if_not_exists else None, + ( + "MATERIALIZED VIEW" + if type_ == "view" and getattr(element, "materialized", False) + else "TABLE" if type_ == "create_table_as" else "VIEW" + ), + "IF NOT EXISTS" if use_if_not_exists else None, prep.format_table(element.table), "AS", select_sql, @@ -7005,7 +7031,14 @@ class DDLCompiler(Compiled): return text + self.preparer.format_table(drop.element) def visit_drop_view(self, drop, **kw): - return "\nDROP VIEW " + self.preparer.format_table(drop.element) + text = "\nDROP " + if drop.materialized: + text += "MATERIALIZED VIEW " + else: + text += "VIEW " + if drop.if_exists: + text += "IF EXISTS " + return text + self.preparer.format_table(drop.element) def _verify_index_table(self, index: Index) -> None: if index.table is None: diff --git a/lib/sqlalchemy/sql/ddl.py b/lib/sqlalchemy/sql/ddl.py index 58a8c3c8e8..9d55dc56d8 100644 --- a/lib/sqlalchemy/sql/ddl.py +++ b/lib/sqlalchemy/sql/ddl.py @@ -31,7 +31,9 @@ from typing import Union from . import coercions from . import roles +from . import util as sql_util from .base import _generative +from .base import DialectKWArgs from .base import Executable from .base import SchemaVisitor from .elements import ClauseElement @@ -42,6 +44,7 @@ from .. import util from ..util import topological from ..util.typing import Self + if typing.TYPE_CHECKING: from .compiler import Compiled from .compiler import DDLCompiler @@ -465,12 +468,24 @@ class _CreateBase(_CreateDropBase[_SI]): self.if_not_exists = if_not_exists +class TableCreateDDL(_CreateBase["Table"]): + + def to_metadata(self, metadata: MetaData, table: Table) -> Self: + raise NotImplementedError() + + class _DropBase(_CreateDropBase[_SI]): def __init__(self, element: _SI, if_exists: bool = False) -> None: super().__init__(element) self.if_exists = if_exists +class TableDropDDL(_DropBase["Table"]): + + def to_metadata(self, metadata: MetaData, table: Table) -> Self: + raise NotImplementedError() + + class CreateSchema(_CreateBase[str]): """Represent a CREATE SCHEMA statement. @@ -515,7 +530,7 @@ class DropSchema(_DropBase[str]): self.cascade = cascade -class CreateTable(_CreateBase["Table"]): +class CreateTable(TableCreateDDL): """Represent a CREATE TABLE statement.""" __visit_name__ = "create_table" @@ -548,61 +563,134 @@ class CreateTable(_CreateBase["Table"]): self.columns = [CreateColumn(column) for column in element.columns] self.include_foreign_key_constraints = include_foreign_key_constraints + def to_metadata(self, metadata: MetaData, table: Table) -> Self: + return self.__class__(table, if_not_exists=self.if_not_exists) -class CreateTableAs(ExecutableDDLElement): - """Represent a CREATE TABLE ... AS statement. - This creates a new table directly from the output of a SELECT. - The set of columns in the new table is derived from the - SELECT list; constraints, indexes, and defaults are not copied. +class _TableViaSelect(TableCreateDDL, ExecutableDDLElement): + """Common base class for DDL constructs that generate and render for a + :class:`.Table` given a :class:`.Select` - E.g.:: + .. versionadded:: 2.1 - from sqlalchemy import select - from sqlalchemy.sql.ddl import CreateTableAs + """ - # Create a new table from a SELECT - stmt = CreateTableAs( - select(users.c.id, users.c.name).where(users.c.status == "active"), - "active_users", + table: Table + """:class:`.Table` object representing the table that this + :class:`.CreateTableAs` would generate when executed.""" + + def __init__( + self, + selectable: SelectBase, + name: str, + *, + metadata: Optional["MetaData"] = None, + schema: Optional[str] = None, + temporary: bool = False, + if_not_exists: bool = False, + ): + # Coerce selectable to a Select statement + selectable = coercions.expect(roles.DMLSelectRole, selectable) + + self.schema = schema + self.selectable = selectable + self.temporary = bool(temporary) + self.if_not_exists = bool(if_not_exists) + self.metadata = metadata + self.table_name = name + self._gen_table() + + @property + def element(self): # type: ignore + return self.table + + def to_metadata(self, metadata: MetaData, table: Table) -> Self: + new = self.__class__.__new__(self.__class__) + new.__dict__.update(self.__dict__) + new.metadata = metadata + new.table = table + return new + + @util.preload_module("sqlalchemy.sql.schema") + def _gen_table(self) -> None: + MetaData = util.preloaded.sql_schema.MetaData + Column = util.preloaded.sql_schema.Column + Table = util.preloaded.sql_schema.Table + MetaData = util.preloaded.sql_schema.MetaData + + column_name_type_pairs = ( + (name, col_element.type) + for _, name, _, col_element, _ in ( + self.selectable._generate_columns_plus_names( + anon_for_dupe_key=False + ) + ) ) - with engine.begin() as conn: - conn.execute(stmt) - - # With optional flags - stmt = CreateTableAs( - select(users.c.id, users.c.name), - "temp_snapshot", - temporary=True, - if_not_exists=True, + if self.metadata is None: + self.metadata = metadata = MetaData() + else: + metadata = self.metadata + + self.table = Table( + self.table_name, + metadata, + *(Column(name, typ) for name, typ in column_name_type_pairs), + schema=self.schema, + _creator_ddl=self, ) - The generated table object can be accessed via the :attr:`.table` property, - which will be an instance of :class:`.Table`; by default this is associated - with a local :class:`.MetaData` construct:: - stmt = CreateTableAs(select(users.c.id, users.c.name), "active_users") - active_users_table = stmt.table +class CreateTableAs(DialectKWArgs, _TableViaSelect): + """Represent a CREATE TABLE ... AS statement. + + This creates a new table directly from the output of a SELECT, including + its schema and its initial set of data. Unlike a view, the + new table is fixed and does not synchronize further with the originating + SELECT statement. + + The example below illustrates basic use of :class:`.CreateTableAs`; given a + :class:`.Select` and optional :class:`.MetaData`, the + :class:`.CreateTableAs` may be invoked directly via + :meth:`.Connection.execute` or indirectly via :meth:`.MetaData.create_all`; + the :attr:`.CreateTableAs.table` attribute provides a :class:`.Table` + object with which to generate new queries:: - To associate the :class:`.Table` with an existing :class:`.MetaData`, - use the :paramref:`_schema.CreateTableAs.metadata` parameter:: + from sqlalchemy import CreateTableAs + from sqlalchemy import select - stmt = CreateTableAs( - select(users.c.id, users.c.name), + # instantiate CreateTableAs given a select() and optional MetaData + cas = CreateTableAs( + select(users.c.id, users.c.name).where(users.c.status == "active"), "active_users", metadata=some_metadata, ) - active_users_table = stmt.table + + # a Table object is available immediately via the .table attribute + new_statement = select(cas.table) + + # to emit CREATE TABLE AS, either invoke CreateTableAs directly... + with engine.begin() as conn: + conn.execute(cas) + + # or alternatively, invoke metadata.create_all() + some_metdata.create_all(engine) + + # drop is performed in the usual way, via drop_all + # or table.drop() + some_metdata.drop_all(engine) + + For detailed background on :class:`.CreateTableAs` see + :ref:`metadata_create_table_as`. .. versionadded:: 2.1 :param selectable: :class:`_sql.Select` The SELECT statement providing the columns and rows. - :param table_name: str - Table name as a string. Must be unqualified; use the ``schema`` - argument for qualification. + :param table_name: table name as a string. Combine with the optional + :paramref:`.CreateTableAs.schema` parameter to indicate a + schema-qualified table name. :param metadata: :class:`_schema.MetaData`, optional If provided, the :class:`_schema.Table` object available via the @@ -620,11 +708,12 @@ class CreateTableAs(ExecutableDDLElement): .. seealso:: - :ref:`tutorial_create_table_as` - in the :ref:`unified_tutorial` + :ref:`metadata_create_table_as` - in :ref:`metadata_toplevel` :meth:`_sql.SelectBase.into` - convenience method to create a :class:`_schema.CreateTableAs` from a SELECT statement + :class:`.CreateView` """ @@ -645,66 +734,188 @@ class CreateTableAs(ExecutableDDLElement): schema: Optional[str] = None, temporary: bool = False, if_not_exists: bool = False, + **dialect_kwargs: Any, ): - # Coerce selectable to a Select statement - selectable = coercions.expect(roles.DMLSelectRole, selectable) + self._validate_dialect_kwargs(dialect_kwargs) + super().__init__( + selectable=selectable, + name=table_name, + metadata=metadata, + schema=schema, + temporary=temporary, + if_not_exists=if_not_exists, + ) - if isinstance(table_name, str): - if not table_name: - raise exc.ArgumentError("Table name must be non-empty") - if "." in table_name: - raise exc.ArgumentError( - "Target string must be unqualified (use schema=)." - ) +class CreateView(DialectKWArgs, _TableViaSelect): + """Represent a CREATE VIEW statement. - self.schema = schema - self.selectable = selectable - self.temporary = bool(temporary) - self.if_not_exists = bool(if_not_exists) - self.metadata = metadata - self.table_name = table_name - self._gen_table() + This creates a new view based on a particular SELECT statement. The schema + of the view is based on the columns of the SELECT statement, and the data + present in the view is derived from the rows represented by the + SELECT. A non-materialized view will evaluate the SELECT statement + dynamicaly as it is queried, whereas a materialized view represents a + snapshot of the SELECT statement at a particular point in time and + typically needs to be refreshed manually using database-specific commands. - @util.preload_module("sqlalchemy.sql.schema") - def _gen_table(self): - MetaData = util.preloaded.sql_schema.MetaData - Column = util.preloaded.sql_schema.Column - Table = util.preloaded.sql_schema.Table - MetaData = util.preloaded.sql_schema.MetaData + The example below illustrates basic use of :class:`.CreateView`; given a + :class:`.Select` and optional :class:`.MetaData`, the + :class:`.CreateView` may be invoked directly via + :meth:`.Connection.execute` or indirectly via :meth:`.MetaData.create_all`; + the :attr:`.CreateView.table` attribute provides a :class:`.Table` + object with which to generate new queries:: - column_name_type_pairs = ( - (name, col_element.type) - for _, name, _, col_element, _ in ( - self.selectable._generate_columns_plus_names( - anon_for_dupe_key=False - ) - ) + + from sqlalchemy import select + from sqlalchemy.sql.ddl import CreateView + + # instantiate CreateView given a select() and optional MetaData + create_view = CreateView( + select(users.c.id, users.c.name).where(users.c.status == "active"), + "active_users_view", + metadata=some_metadata, ) - if self.metadata is None: - self.metadata = metadata = MetaData() - else: - metadata = self.metadata + # a Table object is available immediately via the .table attribute + new_statement = select(create_view.table) - self.table = Table( - self.table_name, - metadata, - *(Column(name, typ) for name, typ in column_name_type_pairs), - schema=self.schema, + # to emit CREATE VIEW, either invoke CreateView directly... + with engine.begin() as conn: + conn.execute(create_view) + + # or alternatively, invoke metadata.create_all() + some_metdata.create_all(engine) + + # drop is performed in the usual way, via drop_all + # or table.drop() (will emit DROP VIEW) + some_metdata.drop_all(engine) + + For detailed background on :class:`.CreateView` see + :ref:`metadata_create_view`. + + .. versionadded:: 2.1 + + :param selectable: :class:`_sql.Select` + The SELECT statement defining the view. + + :param view_name: table name as a string. Combine with the optional + :paramref:`.CreateView.schema` parameter to indicate a + schema-qualified table name. + + :param metadata: :class:`_schema.MetaData`, optional + If provided, the :class:`_schema.Table` object available via the + :attr:`.table` attribute will be associated with this + :class:`.MetaData`. Otherwise, a new, empty :class:`.MetaData` + is created. + + :param schema: str, optional schema or owner name. + + :param temporary: bool, default False. + If True, render ``TEMPORARY`` + + :param or_replace: bool, default False. + If True, render ``OR REPLACE`` to replace an existing view if it + exists. Supported by PostgreSQL, MySQL, MariaDB, and Oracle. + Not supported by SQLite or SQL Server. + + .. versionadded:: 2.1 + + :param materialized: bool, default False. + If True, render ``MATERIALIZED`` to create a materialized view. + Materialized views store the query results physically and can be + refreshed periodically. Not supported by all database backends. + + .. versionadded:: 2.1 + + :param dialect_kw: Additional keyword arguments are dialect-specific and + are passed as keyword arguments to the dialect's compiler. + + .. note:: + + For SQLite, the ``sqlite_if_not_exists`` boolean parameter + is supported to render ``CREATE VIEW IF NOT EXISTS``. + + .. versionadded:: 2.1 + + .. seealso:: + + :ref:`metadata_create_view` - in :ref:`metadata_toplevel` + + :class:`.CreateTableAs` - for creating a table from a SELECT statement + + """ + + __visit_name__ = "create_view" + + inherit_cache = False + + table: Table + """:class:`.Table` object representing the view that this + :class:`.CreateView` would generate when executed.""" + + materialized: bool + """Boolean flag indicating if this is a materialized view.""" + + or_replace: bool + """Boolean flag indicating if OR REPLACE should be used.""" + + def __init__( + self, + selectable: SelectBase, + view_name: str, + *, + metadata: Optional["MetaData"] = None, + schema: Optional[str] = None, + temporary: bool = False, + or_replace: bool = False, + materialized: bool = False, + **dialect_kwargs: Any, + ): + self._validate_dialect_kwargs(dialect_kwargs) + super().__init__( + selectable=selectable, + name=view_name, + metadata=metadata, + schema=schema, + temporary=temporary, + if_not_exists=False, + ) + self.materialized = materialized + self.or_replace = or_replace + self.table._dropper_ddl = DropView( + self.table, materialized=materialized ) -class _DropView(_DropBase["Table"]): - """Semi-public 'DROP VIEW' construct. +class DropView(TableDropDDL): + """'DROP VIEW' construct. - Used by the test suite for dialect-agnostic drops of views. - This object will eventually be part of a public "view" API. + .. versionadded:: 2.1 the :class:`.DropView` construct became public + and was renamed from ``_DropView``. """ __visit_name__ = "drop_view" + materialized: bool + """Boolean flag indicating if this is a materialized view.""" + + def __init__( + self, + element: Table, + *, + if_exists: bool = False, + materialized: bool = False, + ) -> None: + super().__init__(element, if_exists=if_exists) + self.materialized = materialized + + def to_metadata(self, metadata: MetaData, table: Table) -> Self: + new = self.__class__.__new__(self.__class__) + new.__dict__.update(self.__dict__) + new.element = table + return new + class CreateConstraint(BaseDDLElement): element: Constraint @@ -832,7 +1043,7 @@ class CreateColumn(BaseDDLElement): self.element = element -class DropTable(_DropBase["Table"]): +class DropTable(TableDropDDL): """Represent a DROP TABLE statement.""" __visit_name__ = "drop_table" @@ -851,6 +1062,9 @@ class DropTable(_DropBase["Table"]): """ super().__init__(element, if_exists=if_exists) + def to_metadata(self, metadata: MetaData, table: Table) -> Self: + return self.__class__(table, if_exists=self.if_exists) + class CreateSequence(_CreateBase["Sequence"]): """Represent a CREATE SEQUENCE statement.""" @@ -1078,6 +1292,9 @@ class CheckFirst(Flag): TABLES = 2 """Check for tables""" + VIEWS = auto() + """Check for views""" + INDEXES = auto() """Check for indexes""" @@ -1091,7 +1308,7 @@ class CheckFirst(Flag): """ - ALL = TABLES | INDEXES | SEQUENCES | TYPES # equivalent to True + ALL = TABLES | VIEWS | INDEXES | SEQUENCES | TYPES # equivalent to True @classmethod def _missing_(cls, value: object) -> Any: @@ -1121,8 +1338,12 @@ class SchemaGenerator(InvokeCreateDDLBase): effective_schema = self.connection.schema_for_object(table) if effective_schema: self.dialect.validate_identifier(effective_schema) + + bool_to_check = ( + CheckFirst.TABLES if not table.is_view else CheckFirst.VIEWS + ) return ( - not self.checkfirst & CheckFirst.TABLES + not self.checkfirst & bool_to_check or not self.dialect.has_table( self.connection, table.name, schema=effective_schema ) @@ -1216,12 +1437,17 @@ class SchemaGenerator(InvokeCreateDDLBase): # e.g., don't omit any foreign key constraints include_foreign_key_constraints = None - CreateTable( - table, - include_foreign_key_constraints=( - include_foreign_key_constraints - ), - )._invoke_with(self.connection) + if table._creator_ddl is not None: + table_create_ddl = table._creator_ddl + else: + table_create_ddl = CreateTable( + table, + include_foreign_key_constraints=( + include_foreign_key_constraints + ), + ) + + table_create_ddl._invoke_with(self.connection) if hasattr(table, "indexes"): for index in table.indexes: @@ -1364,11 +1590,12 @@ class SchemaDropper(InvokeDropDDLBase): effective_schema = self.connection.schema_for_object(table) if effective_schema: self.dialect.validate_identifier(effective_schema) - return ( - not self.checkfirst & CheckFirst.TABLES - or self.dialect.has_table( - self.connection, table.name, schema=effective_schema - ) + bool_to_check = ( + CheckFirst.TABLES if not table.is_view else CheckFirst.VIEWS + ) + + return not self.checkfirst & bool_to_check or self.dialect.has_table( + self.connection, table.name, schema=effective_schema ) def _can_drop_index(self, index): @@ -1419,7 +1646,11 @@ class SchemaDropper(InvokeDropDDLBase): checkfirst=self.checkfirst, _is_metadata_operation=_is_metadata_operation, ): - DropTable(table)._invoke_with(self.connection) + if table._dropper_ddl is not None: + table_dropper_ddl = table._dropper_ddl + else: + table_dropper_ddl = DropTable(table) + table_dropper_ddl._invoke_with(self.connection) # traverse client side defaults which may refer to server-side # sequences. noting that some of these client side defaults may @@ -1531,6 +1762,7 @@ def sort_tables( ] +@util.preload_module("sqlalchemy.sql.schema") def sort_tables_and_constraints( tables, filter_fn=None, extra_dependencies=None, _warn_for_cycles=False ): @@ -1576,6 +1808,7 @@ def sort_tables_and_constraints( """ + Table = util.preloaded.sql_schema.Table fixed_dependencies = set() mutable_dependencies = set() @@ -1601,6 +1834,22 @@ def sort_tables_and_constraints( if dependent_on is not table: mutable_dependencies.add((dependent_on, table)) + if isinstance(table._creator_ddl, _TableViaSelect): + selectable = table._creator_ddl.selectable + for selected_table in sql_util.find_tables( + selectable, + check_columns=True, + include_aliases=True, + include_joins=True, + include_selects=True, + include_crud=True, + ): + if ( + isinstance(selected_table, Table) + and selected_table.metadata is table.metadata + ): + fixed_dependencies.add((selected_table, table)) + fixed_dependencies.update( (parent, table) for parent in table._extra_dependencies ) diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py index 95c9eb7376..a63b21d670 100644 --- a/lib/sqlalchemy/sql/schema.py +++ b/lib/sqlalchemy/sql/schema.py @@ -103,6 +103,8 @@ if typing.TYPE_CHECKING: from .base import ColumnSet from .base import ReadOnlyColumnCollection from .compiler import DDLCompiler + from .ddl import TableCreateDDL + from .ddl import TableDropDDL from .elements import BindParameter from .elements import KeyedColumnElement from .functions import Function @@ -508,6 +510,8 @@ class Table( _typing_Sequence[Tuple[str, Callable[..., Any]]] ] = None, prefixes: Optional[_typing_Sequence[str]] = None, + _creator_ddl: TableCreateDDL | None = None, + _dropper_ddl: TableDropDDL | None = None, # used internally in the metadata.reflect() process _extend_on: Optional[Set[Table]] = None, # used by __new__ to bypass __init__ @@ -823,6 +827,8 @@ class Table( self.schema = quoted_name(schema, quote_schema) self._sentinel_column = None + self._creator_ddl = _creator_ddl + self._dropper_ddl = _dropper_ddl self.indexes = set() self.constraints = set() @@ -876,6 +882,60 @@ class Table( all_names={}, ) + def set_creator_ddl(self, ddl: TableCreateDDL) -> None: + """Set the table create DDL for this :class:`.Table`. + + This allows the CREATE TABLE statement to be controlled or replaced + entirely when :meth:`.Table.create` or :meth:`.MetaData.create_all` is + used. + + E.g.:: + + from sqlalchemy.schema import CreateTable + + table.set_creator_ddl(CreateTable(table, if_not_exists=True)) + + .. versionadded:: 2.1 + + .. seealso:: + + :meth:`.Table.set_dropper_ddl` + + """ + self._creator_ddl = ddl + + def set_dropper_ddl(self, ddl: TableDropDDL) -> None: + """Set the table drop DDL for this :class:`.Table`. + + This allows the DROP TABLE statement to be controlled or replaced + entirely when :meth:`.Table.drop` or :meth:`.MetaData.drop_all` is + used. + + E.g.:: + + from sqlalchemy.schema import DropTable + + table.set_dropper_ddl(DropTable(table, if_exists=True)) + + .. versionadded:: 2.1 + + .. seealso:: + + :meth:`.Table.set_creator_ddl` + + """ + self._dropper_ddl = ddl + + @property + def is_view(self) -> bool: + """True if this table, when DDL for CREATE is emitted, will emit + CREATE VIEW rather than CREATE TABLE. + + .. versionadded:: 2.1 + + """ + return isinstance(self._creator_ddl, ddl.CreateView) + def _autoload( self, metadata: MetaData, @@ -1487,6 +1547,7 @@ class Table( args = [] for col in self.columns: args.append(col._copy(schema=actual_schema, _to_metadata=metadata)) + table = Table( name, metadata, @@ -1495,6 +1556,12 @@ class Table( *args, **self.kwargs, ) + + if self._creator_ddl is not None: + table._creator_ddl = self._creator_ddl.to_metadata(metadata, table) + if self._dropper_ddl is not None: + table._dropper_ddl = self._dropper_ddl.to_metadata(metadata, table) + for const in self.constraints: if isinstance(const, ForeignKeyConstraint): referred_schema = const._referred_schema diff --git a/lib/sqlalchemy/sql/selectable.py b/lib/sqlalchemy/sql/selectable.py index 6e62d30bc4..89023e609c 100644 --- a/lib/sqlalchemy/sql/selectable.py +++ b/lib/sqlalchemy/sql/selectable.py @@ -3846,7 +3846,7 @@ class SelectBase( :param metadata: :class:`_schema.MetaData`, optional If provided, the :class:`_schema.Table` object available via the - :attr:`.table` attribute will be associated with this + :attr:`.CreateTableAs.table` attribute will be associated with this :class:`.MetaData`. Otherwise, a new, empty :class:`.MetaData` is created. @@ -3860,7 +3860,7 @@ class SelectBase( .. seealso:: - :ref:`tutorial_create_table_as` - in the :ref:`unified_tutorial` + :ref:`metadata_create_table_as` - in :ref:`metadata_toplevel` :class:`_schema.CreateTableAs` diff --git a/lib/sqlalchemy/testing/config.py b/lib/sqlalchemy/testing/config.py index f3598f0910..5d27fd0533 100644 --- a/lib/sqlalchemy/testing/config.py +++ b/lib/sqlalchemy/testing/config.py @@ -168,6 +168,7 @@ def combinations_list(arg_iterable: Iterable[Tuple[Any, ...]], **kw): class Variation: + __match_args__ = ("_name",) __slots__ = ("_name", "_argname") def __init__(self, case, argname, case_names): @@ -196,6 +197,14 @@ class Variation: def __repr__(self): return str(self) + def __eq__(self, value: object) -> bool: + if isinstance(value, str): + return self._name == value + elif isinstance(value, Variation): + return self.name == value.name and self._argname == self._argname + else: + return NotImplemented + def fail(self) -> NoReturn: fail(f"Unknown {self}") diff --git a/lib/sqlalchemy/testing/provision.py b/lib/sqlalchemy/testing/provision.py index 7a8bcc5f3c..8453aef47e 100644 --- a/lib/sqlalchemy/testing/provision.py +++ b/lib/sqlalchemy/testing/provision.py @@ -290,7 +290,7 @@ def drop_views(cfg, eng): with eng.begin() as conn: for vname in view_names: conn.execute( - ddl._DropView(schema.Table(vname, schema.MetaData())) + ddl.DropView(schema.Table(vname, schema.MetaData())) ) if config.requirements.schemas.enabled_for_config(cfg): @@ -302,7 +302,7 @@ def drop_views(cfg, eng): with eng.begin() as conn: for vname in view_names: conn.execute( - ddl._DropView( + ddl.DropView( schema.Table( vname, schema.MetaData(), diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index 7aaa65010f..02ff70736d 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -792,6 +792,11 @@ class SuiteRequirements(Requirements): """target database supports temporary views""" return exclusions.closed() + @property + def create_or_replace_view(self): + """target database supports CREATE OR REPLACE VIEW""" + return exclusions.closed() + @property def index_reflection(self): return exclusions.open() diff --git a/lib/sqlalchemy/testing/suite/__init__.py b/lib/sqlalchemy/testing/suite/__init__.py index 8d79b36d0a..52acb868cc 100644 --- a/lib/sqlalchemy/testing/suite/__init__.py +++ b/lib/sqlalchemy/testing/suite/__init__.py @@ -4,7 +4,6 @@ # # This module is part of SQLAlchemy and is released under # the MIT License: https://www.opensource.org/licenses/mit-license.php -from .test_create_table_as import * # noqa from .test_cte import * # noqa from .test_ddl import * # noqa from .test_dialect import * # noqa @@ -14,6 +13,7 @@ from .test_results import * # noqa from .test_rowcount import * # noqa from .test_select import * # noqa from .test_sequence import * # noqa +from .test_table_via_select import * # noqa from .test_types import * # noqa from .test_unicode_ddl import * # noqa from .test_update_delete import * # noqa diff --git a/lib/sqlalchemy/testing/suite/test_create_table_as.py b/lib/sqlalchemy/testing/suite/test_create_table_as.py deleted file mode 100644 index 5e48dd5844..0000000000 --- a/lib/sqlalchemy/testing/suite/test_create_table_as.py +++ /dev/null @@ -1,329 +0,0 @@ -# testing/suite/test_create_table_as.py -# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors -# -# -# This module is part of SQLAlchemy and is released under -# the MIT License: https://www.opensource.org/licenses/mit-license.php -# mypy: ignore-errors - -from .. import fixtures -from ..assertions import eq_ -from ..provision import get_temp_table_name -from ... import bindparam -from ... import Column -from ... import func -from ... import inspect -from ... import Integer -from ... import literal -from ... import MetaData -from ... import select -from ... import String -from ... import testing -from ...schema import DropTable -from ...schema import Table -from ...sql.ddl import CreateTableAs -from ...testing import config - - -class CreateTableAsTest(fixtures.TablesTest): - __backend__ = True - __requires__ = ("create_table_as",) - - @classmethod - def temp_table_name(cls): - return get_temp_table_name( - config, config.db, f"user_tmp_{config.ident}" - ) - - @classmethod - def define_tables(cls, metadata): - Table( - "source_table", - metadata, - Column("id", Integer, primary_key=True, autoincrement=False), - Column("name", String(50)), - Column("value", Integer), - ) - Table("a", metadata, Column("id", Integer)) - Table("b", metadata, Column("id", Integer)) - - @classmethod - def insert_data(cls, connection): - table = cls.tables.source_table - connection.execute( - table.insert(), - [ - {"id": 1, "name": "alice", "value": 100}, - {"id": 2, "name": "bob", "value": 200}, - {"id": 3, "name": "charlie", "value": 300}, - ], - ) - - a = cls.tables.a - b = cls.tables.b - - connection.execute(a.insert(), [{"id": v} for v in [1, 3]]) - connection.execute(b.insert(), [{"id": v} for v in [2, 4]]) - - @testing.fixture(scope="function", autouse=True) - def drop_dest_table(self, connection): - for schema in None, config.test_schema: - for name in ("dest_table", self.temp_table_name()): - if inspect(connection).has_table(name, schema=schema): - connection.execute( - DropTable(Table(name, MetaData(), schema=schema)) - ) - connection.commit() - - @testing.combinations( - ("plain", False, False), - ("use_temp", False, True, testing.requires.create_temp_table_as), - ("use_schema", True, False, testing.requires.schemas), - argnames="use_schemas,use_temp", - id_="iaa", - ) - def test_create_table_as_tableclause( - self, connection, use_temp, use_schemas - ): - source_table = self.tables.source_table - stmt = CreateTableAs( - select(source_table.c.id, source_table.c.name).where( - source_table.c.value > 100 - ), - self.temp_table_name() if use_temp else "dest_table", - temporary=bool(use_temp), - schema=config.test_schema if use_schemas else None, - ) - - # Execute the CTAS - connection.execute(stmt) - - # Verify we can SELECT from the generated table - dest = stmt.table - result = connection.execute( - select(dest.c.id, dest.c.name).order_by(dest.c.id) - ).fetchall() - - eq_(result, [(2, "bob"), (3, "charlie")]) - - # Verify reflection works - insp = inspect(connection) - cols = insp.get_columns( - self.temp_table_name() if use_temp else "dest_table", - schema=config.test_schema if use_schemas else None, - ) - eq_(len(cols), 2) - eq_(cols[0]["name"], "id") - eq_(cols[1]["name"], "name") - - # Verify type affinity - eq_(cols[0]["type"]._type_affinity, Integer) - eq_(cols[1]["type"]._type_affinity, String) - - @testing.variation( - "use_temp", [False, (True, testing.requires.create_temp_table_as)] - ) - def test_create_table_as_with_metadata( - self, connection, metadata, use_temp - ): - source_table = self.tables.source_table - stmt = CreateTableAs( - select( - source_table.c.id, source_table.c.name, source_table.c.value - ), - self.temp_table_name() if use_temp else "dest_table", - metadata=metadata, - temporary=bool(use_temp), - ) - - # Execute the CTAS - connection.execute(stmt) - - # Verify the generated table is a proper Table object - dest = stmt.table - assert isinstance(dest, Table) - assert dest.metadata is metadata - - # SELECT from the generated table - result = connection.execute( - select(dest.c.id, dest.c.name, dest.c.value).where(dest.c.id == 2) - ).fetchall() - - eq_(result, [(2, "bob", 200)]) - - # Drop the table using the Table object - dest.drop(connection) - - # Verify it's gone - if not use_temp: - insp = inspect(connection) - assert "dest_table" not in insp.get_table_names() - elif testing.requires.temp_table_names.enabled: - insp = inspect(connection) - assert self.temp_table_name() not in insp.get_temp_table_names() - - def test_create_table_as_with_labels(self, connection): - source_table = self.tables.source_table - - stmt = CreateTableAs( - select( - source_table.c.id.label("user_id"), - source_table.c.name.label("user_name"), - ), - "dest_table", - ) - - connection.execute(stmt) - - # Verify column names from labels - insp = inspect(connection) - cols = insp.get_columns("dest_table") - eq_(len(cols), 2) - eq_(cols[0]["name"], "user_id") - eq_(cols[1]["name"], "user_name") - - # Verify we can query using the labels - dest = stmt.table - result = connection.execute( - select(dest.c.user_id, dest.c.user_name).where(dest.c.user_id == 1) - ).fetchall() - - eq_(result, [(1, "alice")]) - - def test_create_table_as_into_method(self, connection): - source_table = self.tables.source_table - stmt = select(source_table.c.id, source_table.c.value).into( - "dest_table" - ) - - connection.execute(stmt) - - # Verify the table was created and can be queried - dest = stmt.table - result = connection.execute( - select(dest.c.id, dest.c.value).order_by(dest.c.id) - ).fetchall() - - eq_(result, [(1, 100), (2, 200), (3, 300)]) - - @testing.variation( - "use_temp", [False, (True, testing.requires.create_temp_table_as)] - ) - @testing.variation("use_into", [True, False]) - def test_metadata_use_cases( - self, use_temp, use_into, metadata, connection - ): - table_name = self.temp_table_name() if use_temp else "dest_table" - source_table = self.tables.source_table - select_stmt = select( - source_table.c.id, source_table.c.name, source_table.c.value - ).where(source_table.c.value > 100) - - if use_into: - cas = select_stmt.into( - table_name, temporary=use_temp, metadata=metadata - ) - else: - cas = CreateTableAs( - select_stmt, - table_name, - temporary=use_temp, - metadata=metadata, - ) - - connection.execute(cas) - dest = cas.table - eq_(dest.name, table_name) - result = connection.execute( - select(dest.c.id, dest.c.name).order_by(dest.c.id) - ).fetchall() - - eq_(result, [(2, "bob"), (3, "charlie")]) - - if use_temp: - if testing.requires.temp_table_names.enabled: - insp = inspect(connection) - assert table_name in insp.get_temp_table_names() - - metadata.drop_all(connection) - insp = inspect(connection) - assert table_name not in insp.get_temp_table_names() - else: - insp = inspect(connection) - assert table_name in insp.get_table_names() - - metadata.drop_all(connection) - insp = inspect(connection) - assert table_name not in insp.get_table_names() - - @testing.requires.table_ddl_if_exists - def test_if_not_exists(self, connection): - source_table = self.tables.source_table - cas = CreateTableAs( - select(source_table.c.id).select_from(source_table), - "dest_table", - if_not_exists=True, - ) - - insp = inspect(connection) - assert "dest_table" not in insp.get_table_names() - - connection.execute(cas) - - insp = inspect(connection) - assert "dest_table" in insp.get_table_names() - - # succeeds even though table exists - connection.execute(cas) - - def test_literal_inlining_inside_select(self, connection): - src = self.tables.source_table - sel = select( - (src.c.id + 1).label("id2"), - literal("x").label("tag"), - ).select_from(src) - - stmt = CreateTableAs(sel, "dest_table") - connection.execute(stmt) - - tbl = stmt.table - row = connection.execute( - select(func.count(), func.min(tbl.c.tag), func.max(tbl.c.tag)) - ).first() - eq_(row, (3, "x", "x")) - - def test_create_table_as_with_bind_param_executes(self, connection): - src = self.tables.source_table - - sel = ( - select(src.c.id, src.c.name) - .select_from(src) - .where(src.c.name == bindparam("p", value="alice")) - ) - - stmt = CreateTableAs(sel, "dest_table") - connection.execute(stmt) - - tbl = stmt.table - - row = connection.execute( - select(func.count(), func.min(tbl.c.name), func.max(tbl.c.name)) - ).first() - eq_(row, (1, "alice", "alice")) - - def test_compound_select_smoke(self, connection): - - a, b = self.tables("a", "b") - - sel = select(a.c.id).union_all(select(b.c.id)).order_by(a.c.id) - stmt = CreateTableAs(sel, "dest_table") - connection.execute(stmt) - - vals = ( - connection.execute( - select(stmt.table.c.id).order_by(stmt.table.c.id) - ) - .scalars() - .all() - ) - eq_(vals, [1, 2, 3, 4]) diff --git a/lib/sqlalchemy/testing/suite/test_ddl.py b/lib/sqlalchemy/testing/suite/test_ddl.py index c7e7d817d8..e729c338f7 100644 --- a/lib/sqlalchemy/testing/suite/test_ddl.py +++ b/lib/sqlalchemy/testing/suite/test_ddl.py @@ -122,6 +122,20 @@ class TableDDLTest(fixtures.TestBase): is_true(inspect(connection).has_table("test_table")) connection.execute(schema.CreateTable(table, if_not_exists=True)) + @requirements.table_ddl_if_exists + @util.provide_metadata + def test_create_table_if_not_exists_via_create(self, connection): + table = self._simple_fixture() + + table.set_creator_ddl(schema.CreateTable(table, if_not_exists=True)) + + table.create(connection, checkfirst=False) + + is_true(inspect(connection).has_table("test_table")) + + # works! + table.create(connection, checkfirst=False) + @requirements.index_ddl_if_exists @util.provide_metadata def test_create_index_if_not_exists(self, connection): @@ -164,6 +178,23 @@ class TableDDLTest(fixtures.TestBase): connection.execute(schema.DropTable(table, if_exists=True)) + @requirements.table_ddl_if_exists + @util.provide_metadata + def test_drop_table_if_exists_via_drop(self, connection): + table = self._simple_fixture() + + table.create(connection) + + is_true(inspect(connection).has_table("test_table")) + + table.set_dropper_ddl(schema.DropTable(table, if_exists=True)) + table.drop(connection, checkfirst=False) + + is_false(inspect(connection).has_table("test_table")) + + # works!! + table.drop(connection, checkfirst=False) + @requirements.index_ddl_if_exists @util.provide_metadata def test_drop_index_if_exists(self, connection): diff --git a/lib/sqlalchemy/testing/suite/test_reflection.py b/lib/sqlalchemy/testing/suite/test_reflection.py index 86427a6a68..bbbaa20246 100644 --- a/lib/sqlalchemy/testing/suite/test_reflection.py +++ b/lib/sqlalchemy/testing/suite/test_reflection.py @@ -99,24 +99,23 @@ class HasTableTest(OneConnectionTablesTest): @classmethod def define_views(cls, metadata): - query = "CREATE VIEW vv AS SELECT id, data FROM test_table" - event.listen(metadata, "after_create", DDL(query)) - event.listen(metadata, "before_drop", DDL("DROP VIEW vv")) + test_table = metadata.tables["test_table"] + sa.CreateView( + sa.select(test_table.c.id, test_table.c.data), + "vv", + metadata=metadata, + ) if testing.requires.schemas.enabled: - query = ( - "CREATE VIEW %s.vv AS SELECT id, data FROM %s.test_table_s" - % ( - config.test_schema, - config.test_schema, - ) - ) - event.listen(metadata, "after_create", DDL(query)) - event.listen( - metadata, - "before_drop", - DDL("DROP VIEW %s.vv" % (config.test_schema)), + test_table_s = metadata.tables[ + f"{config.test_schema}.test_table_s" + ] + sa.CreateView( + sa.select(test_table_s.c.id, test_table_s.c.data), + "vv", + metadata=metadata, + schema=config.test_schema, ) @classmethod diff --git a/lib/sqlalchemy/testing/suite/test_table_via_select.py b/lib/sqlalchemy/testing/suite/test_table_via_select.py new file mode 100644 index 0000000000..7d0fa1008b --- /dev/null +++ b/lib/sqlalchemy/testing/suite/test_table_via_select.py @@ -0,0 +1,688 @@ +# testing/suite/test_table_via_select.py +# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors +# +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + +from .. import fixtures +from ..assertions import eq_ +from ..provision import get_temp_table_name +from ... import bindparam +from ... import Column +from ... import func +from ... import inspect +from ... import Integer +from ... import literal +from ... import MetaData +from ... import select +from ... import String +from ... import testing +from ...schema import CreateTableAs +from ...schema import CreateView +from ...schema import DropTable +from ...schema import DropView +from ...schema import Table +from ...testing import config + + +class TableViaSelectTest(fixtures.TablesTest): + __backend__ = True + + @classmethod + def temp_table_name(cls): + return get_temp_table_name( + config, config.db, f"user_tmp_{config.ident}" + ) + + @classmethod + def temp_view_name(cls): + return get_temp_table_name( + config, config.db, f"user_tmp_view_{config.ident}" + ) + + @classmethod + def define_tables(cls, metadata): + Table( + "source_table", + metadata, + Column("id", Integer, primary_key=True, autoincrement=False), + Column("name", String(50)), + Column("value", Integer), + ) + Table("a", metadata, Column("id", Integer)) + Table("b", metadata, Column("id", Integer)) + + if testing.requires.schemas.enabled: + Table( + "source_table_s", + metadata, + Column("id", Integer, primary_key=True, autoincrement=False), + Column("name", String(50)), + Column("value", Integer), + schema=config.test_schema, + ) + + @classmethod + def insert_data(cls, connection): + table = cls.tables.source_table + connection.execute( + table.insert(), + [ + {"id": 1, "name": "alice", "value": 100}, + {"id": 2, "name": "bob", "value": 200}, + {"id": 3, "name": "charlie", "value": 300}, + ], + ) + + a = cls.tables.a + b = cls.tables.b + + connection.execute(a.insert(), [{"id": v} for v in [1, 3]]) + connection.execute(b.insert(), [{"id": v} for v in [2, 4]]) + + if testing.requires.schemas.enabled: + table = cls.tables[f"{config.test_schema}.source_table_s"] + connection.execute( + table.insert(), + [ + {"id": 1, "name": "alice", "value": 100}, + {"id": 2, "name": "bob", "value": 200}, + {"id": 3, "name": "charlie", "value": 300}, + ], + ) + + @testing.fixture(scope="function", autouse=True) + def drop_dest_table(self, connection): + for schema in None, config.test_schema: + for name in ("dest_table", self.temp_table_name()): + if inspect(connection).has_table(name, schema=schema): + connection.execute( + DropTable(Table(name, MetaData(), schema=schema)) + ) + for name in ("dest_view", self.temp_view_name()): + if inspect(connection).has_table(name, schema=schema): + connection.execute( + DropView(Table(name, MetaData(), schema=schema)) + ) + connection.commit() + + @testing.combinations( + ("plain", False, False, False), + ( + "use_temp", + False, + True, + False, + testing.requires.create_temp_table_as, + ), + ("use_schema", True, False, False, testing.requires.schemas), + ("plain", False, False, False), + ("use_temp", False, True, True, testing.requires.temporary_views), + ("use_schema", True, False, True, testing.requires.schemas), + argnames="use_schemas,use_temp,use_view", + id_="iaaa", + ) + def test_without_metadata( + self, connection, use_temp, use_schemas, use_view + ): + source_table = self.tables.source_table + + if not use_view: + tablename = self.temp_table_name() if use_temp else "dest_table" + stmt = CreateTableAs( + select(source_table.c.id, source_table.c.name).where( + source_table.c.value > 100 + ), + tablename, + temporary=bool(use_temp), + schema=config.test_schema if use_schemas else None, + ) + else: + if use_schemas: + source_table = self.tables[ + f"{config.test_schema}.source_table_s" + ] + + tablename = self.temp_view_name() if use_temp else "dest_view" + stmt = CreateView( + select(source_table.c.id, source_table.c.name).where( + source_table.c.value > 100 + ), + tablename, + temporary=bool(use_temp), + schema=config.test_schema if use_schemas else None, + ) + + connection.execute(stmt) + + # Verify we can SELECT from the generated table + dest = stmt.table + result = connection.execute( + select(dest.c.id, dest.c.name).order_by(dest.c.id) + ).fetchall() + + eq_(result, [(2, "bob"), (3, "charlie")]) + + # Verify reflection works + insp = inspect(connection) + cols = insp.get_columns( + tablename, + schema=config.test_schema if use_schemas else None, + ) + + eq_(len(cols), 2) + eq_(cols[0]["name"], "id") + eq_(cols[1]["name"], "name") + + # Verify type affinity + eq_(cols[0]["type"]._type_affinity, Integer) + eq_(cols[1]["type"]._type_affinity, String) + + @testing.variation( + "table_type", + [ + ("create_table_as", testing.requires.create_table_as), + ("select_into", testing.requires.create_table_as), + ("create_view", testing.requires.views), + ], + ) + @testing.variation( + "use_temp", + [ + False, + ( + True, + testing.requires.create_temp_table_as + + testing.requires.temporary_views, + ), + ], + ) + @testing.variation("use_drop_all", [True, False]) + def test_with_metadata( + self, + connection, + metadata, + use_temp, + table_type, + use_drop_all, + ): + source_table = self.tables.source_table + + select_stmt = select( + source_table.c.id, + source_table.c.name, + source_table.c.value, + ).where(source_table.c.value > 100) + + match table_type: + case "create_table_as": + tablename = ( + self.temp_table_name() if use_temp else "dest_table" + ) + stmt = CreateTableAs( + select_stmt, + tablename, + metadata=metadata, + temporary=bool(use_temp), + ) + case "select_into": + tablename = ( + self.temp_table_name() if use_temp else "dest_table" + ) + stmt = select_stmt.into( + tablename, + temporary=use_temp, + metadata=metadata, + ) + case "create_view": + tablename = self.temp_view_name() if use_temp else "dest_view" + stmt = CreateView( + select_stmt, + tablename, + metadata=metadata, + temporary=bool(use_temp), + ) + case _: + table_type.fail() + + # these are metadata attached, create all + metadata.create_all(connection) + + # Verify the generated table is a proper Table object + dest = stmt.table + assert isinstance(dest, Table) + assert dest.metadata is metadata + eq_(dest.name, tablename) + + # SELECT from the generated table - should only have rows with + # value > 100 (bob and charlie) + result = connection.execute( + select(dest.c.id, dest.c.name).order_by(dest.c.id) + ).fetchall() + + eq_(result, [(2, "bob"), (3, "charlie")]) + + # Drop the table using either metadata.drop_all() or dest.drop() + if use_drop_all: + metadata.drop_all(connection) + else: + dest.drop(connection) + + # Verify it's gone + if use_temp: + if testing.requires.temp_table_names.enabled: + insp = inspect(connection) + assert tablename not in insp.get_temp_table_names() + else: + insp = inspect(connection) + if table_type.create_view: + assert tablename not in insp.get_view_names() + else: + assert tablename not in insp.get_table_names() + + @testing.variation( + "table_type", + [ + ("create_table_as", testing.requires.create_table_as), + ("create_view", testing.requires.views), + ], + ) + def test_with_labels(self, connection, table_type): + source_table = self.tables.source_table + + match table_type: + case "create_table_as": + tablename = "dest_table" + stmt = CreateTableAs( + select( + source_table.c.id.label("user_id"), + source_table.c.name.label("user_name"), + ), + tablename, + ) + case "create_view": + tablename = "dest_view" + stmt = CreateView( + select( + source_table.c.id.label("user_id"), + source_table.c.name.label("user_name"), + ), + tablename, + ) + case _: + table_type.fail() + + connection.execute(stmt) + + # Verify column names from labels + insp = inspect(connection) + cols = insp.get_columns(tablename) + eq_(len(cols), 2) + eq_(cols[0]["name"], "user_id") + eq_(cols[1]["name"], "user_name") + + # Verify we can query using the labels + dest = stmt.table + result = connection.execute( + select(dest.c.user_id, dest.c.user_name).where(dest.c.user_id == 1) + ).fetchall() + + eq_(result, [(1, "alice")]) + + @testing.requires.table_ddl_if_exists + @testing.requires.create_table_as + def test_create_table_as_if_not_exists(self, connection): + source_table = self.tables.source_table + tablename = "dest_table" + + stmt = CreateTableAs( + select(source_table.c.id).select_from(source_table), + tablename, + if_not_exists=True, + ) + + insp = inspect(connection) + assert tablename not in insp.get_table_names() + + connection.execute(stmt) + + insp = inspect(connection) + assert tablename in insp.get_table_names() + + # succeeds even though table exists + connection.execute(stmt) + + @testing.requires.create_or_replace_view + def test_create_or_replace_view(self, connection): + source_table = self.tables.source_table + viewname = "dest_view" + + # Create initial view that selects all rows + stmt = CreateView( + select(source_table.c.id).select_from(source_table), + viewname, + or_replace=True, + ) + + insp = inspect(connection) + assert viewname not in insp.get_view_names() + + connection.execute(stmt) + + insp = inspect(connection) + assert viewname in insp.get_view_names() + + # Verify initial view returns all 3 rows + dst_view = Table(viewname, MetaData(), autoload_with=connection) + result = connection.execute(select(dst_view)).fetchall() + eq_(len(result), 3) + + # Replace view with filtered query (only id > 1) + stmt = CreateView( + select(source_table.c.id) + .select_from(source_table) + .where(source_table.c.id > 1), + viewname, + or_replace=True, + ) + connection.execute(stmt) + + # Verify view was replaced - should now return only 2 rows + insp = inspect(connection) + assert viewname in insp.get_view_names() + + dst_view = Table(viewname, MetaData(), autoload_with=connection) + result = connection.execute(select(dst_view)).fetchall() + eq_(len(result), 2) + + @testing.requires.materialized_views + @testing.variation("use_metadata", [True, False]) + def test_create_drop_materialized_view(self, connection, use_metadata): + source_table = self.tables.source_table + viewname = "dest_mat_view" + + if use_metadata: + # Create with metadata + metadata = MetaData() + stmt = CreateView( + select(source_table.c.id, source_table.c.name).select_from( + source_table + ), + viewname, + materialized=True, + metadata=metadata, + ) + else: + # Create without metadata + stmt = CreateView( + select(source_table.c.id, source_table.c.name).select_from( + source_table + ), + viewname, + materialized=True, + ) + + insp = inspect(connection) + assert viewname not in insp.get_materialized_view_names() + + if use_metadata: + metadata.create_all(connection) + else: + connection.execute(stmt) + + insp = inspect(connection) + assert viewname in insp.get_materialized_view_names() + + # Verify materialized view returns data + dst_view = stmt.table + result = connection.execute(select(dst_view)).fetchall() + eq_(len(result), 3) + eq_(set(r[0] for r in result), {1, 2, 3}) + + # Drop materialized view + if use_metadata: + metadata.drop_all(connection) + else: + drop_stmt = DropView(dst_view, materialized=True) + connection.execute(drop_stmt) + + insp = inspect(connection) + assert viewname not in insp.get_materialized_view_names() + + @testing.variation( + "table_type", + [ + ("create_table_as", testing.requires.create_table_as), + ("create_view", testing.requires.views), + ], + ) + def test_literal_inlining_inside_select(self, connection, table_type): + src = self.tables.source_table + sel = select( + (src.c.id + 1).label("id2"), + literal("x").label("tag"), + ).select_from(src) + + match table_type: + case "create_table_as": + tablename = "dest_table" + stmt = CreateTableAs(sel, tablename) + case "create_view": + tablename = "dest_view" + stmt = CreateView(sel, tablename) + case _: + table_type.fail() + + connection.execute(stmt) + + tbl = stmt.table + row = connection.execute( + select(func.count(), func.min(tbl.c.tag), func.max(tbl.c.tag)) + ).first() + eq_(row, (3, "x", "x")) + + @testing.variation( + "table_type", + [ + ("create_table_as", testing.requires.create_table_as), + ("create_view", testing.requires.views), + ], + ) + def test_with_bind_param_executes(self, connection, table_type): + src = self.tables.source_table + + sel = ( + select(src.c.id, src.c.name) + .select_from(src) + .where(src.c.name == bindparam("p", value="alice")) + ) + + match table_type: + case "create_table_as": + tablename = "dest_table" + stmt = CreateTableAs(sel, tablename) + case "create_view": + tablename = "dest_view" + stmt = CreateView(sel, tablename) + case _: + table_type.fail() + + connection.execute(stmt) + + tbl = stmt.table + + row = connection.execute( + select(func.count(), func.min(tbl.c.name), func.max(tbl.c.name)) + ).first() + eq_(row, (1, "alice", "alice")) + + @testing.variation( + "table_type", + [ + ("create_table_as", testing.requires.create_table_as), + ("create_view", testing.requires.views), + ], + ) + def test_compound_select_smoke(self, connection, table_type): + a, b = self.tables("a", "b") + + sel = select(a.c.id).union_all(select(b.c.id)) + + match table_type: + case "create_table_as": + tablename = "dest_table" + stmt = CreateTableAs(sel, tablename) + case "create_view": + tablename = "dest_view" + stmt = CreateView(sel, tablename) + case _: + table_type.fail() + + connection.execute(stmt) + + vals = ( + connection.execute( + select(stmt.table.c.id).order_by(stmt.table.c.id) + ) + .scalars() + .all() + ) + eq_(vals, [1, 2, 3, 4]) + + @testing.requires.views + def test_view_dependencies_with_metadata(self, connection, metadata): + """Test that views with dependencies are created/dropped in correct + order. + + This validates that when views are attached to metadata: - create_all() + creates base tables first, then dependent views in order - drop_all() + drops dependent views first, then base tables in reverse order + """ + # Create three base tables + table1 = Table( + "base_table1", + metadata, + Column("id", Integer, primary_key=True, autoincrement=False), + Column("value", Integer), + ) + table2 = Table( + "base_table2", + metadata, + Column("id", Integer, primary_key=True, autoincrement=False), + Column("amount", Integer), + ) + table3 = Table( + "base_table3", + metadata, + Column("id", Integer, primary_key=True, autoincrement=False), + Column("total", Integer), + ) + + # First view depends on table1 and table2 + view1_stmt = CreateView( + select( + table1.c.id, + table1.c.value, + table2.c.amount, + ) + .select_from(table1.join(table2, table1.c.id == table2.c.id)) + .where(table1.c.value > 0), + "view1", + metadata=metadata, + ) + + # Second view depends on table3 and view1 + view2_stmt = CreateView( + select( + view1_stmt.table.c.id, + view1_stmt.table.c.value, + table3.c.total, + ) + .select_from( + view1_stmt.table.join( + table3, view1_stmt.table.c.id == table3.c.id + ) + ) + .where(table3.c.total > 100), + "view2", + metadata=metadata, + ) + + # Verify metadata knows about all objects + eq_( + {"base_table1", "base_table2", "base_table3", "view1", "view2"}, + set(metadata.tables), + ) + + # Create all in correct dependency order + metadata.create_all(connection) + + # Verify all tables and views were created + insp = inspect(connection) + assert {"base_table1", "base_table2", "base_table3"}.issubset( + insp.get_table_names() + ) + assert {"view1", "view2"}.issubset(insp.get_view_names()) + + # Insert test data + connection.execute( + table1.insert(), + [ + {"id": 1, "value": 10}, + {"id": 2, "value": 20}, + {"id": 3, "value": 30}, + ], + ) + connection.execute( + table2.insert(), + [ + {"id": 1, "amount": 100}, + {"id": 2, "amount": 200}, + {"id": 3, "amount": 300}, + ], + ) + connection.execute( + table3.insert(), + [ + {"id": 1, "total": 50}, + {"id": 2, "total": 150}, + {"id": 3, "total": 250}, + ], + ) + + # Query view1 to verify it works + view1_results = connection.execute( + select(view1_stmt.table).order_by(view1_stmt.table.c.id) + ).fetchall() + eq_( + view1_results, + [ + (1, 10, 100), + (2, 20, 200), + (3, 30, 300), + ], + ) + + # Query view2 to verify it works (should filter total > 100) + view2_results = connection.execute( + select(view2_stmt.table).order_by(view2_stmt.table.c.id) + ).fetchall() + eq_( + view2_results, + [ + (2, 20, 150), + (3, 30, 250), + ], + ) + + # Drop all in correct reverse dependency order + metadata.drop_all(connection) + + # Verify all tables and views were dropped + insp = inspect(connection) + assert {"base_table1", "base_table2", "base_table3"}.isdisjoint( + insp.get_table_names() + ) + assert {"view1", "view2"}.isdisjoint(insp.get_view_names()) diff --git a/test/base/test_tutorials.py b/test/base/test_tutorials.py index 616da4d581..f47e97777c 100644 --- a/test/base/test_tutorials.py +++ b/test/base/test_tutorials.py @@ -14,7 +14,9 @@ from sqlalchemy.testing import skip_test class DocTest(fixtures.TestBase): - __requires__ = ("insert_returning", "insertmanyvalues") + # getting engine cleanup issues with free-threaded python. can't + # isolate what the cause would be except for GC not being very good + __requires__ = ("insert_returning", "insertmanyvalues", "gil_enabled") __only_on__ = "sqlite+pysqlite" def _setup_logger(self): diff --git a/test/base/test_utils.py b/test/base/test_utils.py index 77ab9ff222..b228bcc580 100644 --- a/test/base/test_utils.py +++ b/test/base/test_utils.py @@ -3657,3 +3657,22 @@ class CyExtensionTest(fixtures.TestBase): print(expected) print(setup_modules) eq_(setup_modules, expected) + + +class TestTest(fixtures.TestBase): + """Test of test things""" + + @testing.variation("foo", ["foo", "bar", "baz"]) + def test_variations(self, foo): + match foo: + case "foo": + is_true(foo.foo) + is_false(foo.bar) + case "bar": + is_true(foo.bar) + is_false(foo.foo) + case "baz": + is_true(foo.baz) + is_false(foo.foo) + case _: + foo.fail() diff --git a/test/dialect/mssql/test_compiler.py b/test/dialect/mssql/test_compiler.py index 627738f713..f8d9d54886 100644 --- a/test/dialect/mssql/test_compiler.py +++ b/test/dialect/mssql/test_compiler.py @@ -29,6 +29,7 @@ from sqlalchemy.dialects.mssql import base as mssql_base from sqlalchemy.sql import column from sqlalchemy.sql import quoted_name from sqlalchemy.sql import table +from sqlalchemy.sql.ddl import CreateView from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import AssertsCompiledSQL from sqlalchemy.testing import eq_ @@ -1870,6 +1871,30 @@ class CompileTest(fixtures.TestBase, AssertsCompiledSQL): expr = resolve_lambda(fn, t=t) self.assert_compile(expr, string, params) + def test_create_view_or_replace(self): + t = Table("t", MetaData(), Column("a", Integer), Column("b", String)) + stmt = CreateView( + select(t.c.a, t.c.b).where(t.c.a > 5), + "my_view", + or_replace=True, + ) + self.assert_compile( + stmt, + "CREATE OR ALTER VIEW my_view AS " + "SELECT t.a, t.b FROM t WHERE t.a > 5", + ) + + def test_create_view_basic(self): + t = Table("t", MetaData(), Column("a", Integer), Column("b", String)) + stmt = CreateView( + select(t.c.a, t.c.b).where(t.c.a > 5), + "my_view", + ) + self.assert_compile( + stmt, + "CREATE VIEW my_view AS SELECT t.a, t.b FROM t WHERE t.a > 5", + ) + class CompileIdentityTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = mssql.dialect() diff --git a/test/dialect/sqlite/test_compiler.py b/test/dialect/sqlite/test_compiler.py index 651265570b..de01542503 100644 --- a/test/dialect/sqlite/test_compiler.py +++ b/test/dialect/sqlite/test_compiler.py @@ -7,6 +7,7 @@ from sqlalchemy import CheckConstraint from sqlalchemy import Column from sqlalchemy import column from sqlalchemy import create_engine +from sqlalchemy import CreateView from sqlalchemy import event from sqlalchemy import exc from sqlalchemy import extract @@ -858,3 +859,36 @@ class OnConflictCompileTest( Column("login_email", String(50)), Column("lets_index_this", String(50)), ) + + def test_create_view_if_not_exists(self): + """Test SQLite if_not_exists dialect option for CREATE VIEW.""" + src = table("src", column("id"), column("name")) + stmt = CreateView( + select(src.c.id, src.c.name), + "my_view", + sqlite_if_not_exists=True, + ) + + self.assert_compile( + stmt, + "CREATE VIEW IF NOT EXISTS my_view AS " + "SELECT src.id, src.name FROM src", + dialect=sqlite.dialect(), + ) + + def test_create_view_temporary_if_not_exists(self): + """Test SQLite TEMPORARY VIEW with if_not_exists.""" + src = table("src", column("id"), column("name")) + stmt = CreateView( + select(src.c.id, src.c.name), + "temp_view", + temporary=True, + sqlite_if_not_exists=True, + ) + + self.assert_compile( + stmt, + "CREATE TEMPORARY VIEW IF NOT EXISTS temp_view AS " + "SELECT src.id, src.name FROM src", + dialect=sqlite.dialect(), + ) diff --git a/test/requirements.py b/test/requirements.py index c23e290428..d5e56d66ce 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -735,6 +735,11 @@ class DefaultRequirements(SuiteRequirements): self._sqlite_file_db ) + @property + def create_or_replace_view(self): + """target database supports CREATE OR REPLACE VIEW""" + return only_on(["postgresql", "mysql", "mariadb", "oracle", "mssql"]) + @property def table_value_constructor(self): return only_on(["postgresql", "mssql"]) diff --git a/test/sql/test_create_table_as.py b/test/sql/test_create_table_as.py deleted file mode 100644 index bf05837d4d..0000000000 --- a/test/sql/test_create_table_as.py +++ /dev/null @@ -1,357 +0,0 @@ -import re - -from sqlalchemy import bindparam -from sqlalchemy import Column -from sqlalchemy import Integer -from sqlalchemy import literal -from sqlalchemy import MetaData -from sqlalchemy import String -from sqlalchemy import Table -from sqlalchemy import testing -from sqlalchemy.exc import ArgumentError -from sqlalchemy.schema import CreateTable -from sqlalchemy.sql import column -from sqlalchemy.sql import select -from sqlalchemy.sql import table -from sqlalchemy.sql.ddl import CreateTableAs -from sqlalchemy.testing import fixtures -from sqlalchemy.testing import is_ -from sqlalchemy.testing.assertions import AssertsCompiledSQL -from sqlalchemy.testing.assertions import expect_raises_message -from sqlalchemy.testing.assertions import expect_warnings - - -class CreateTableAsDefaultDialectTest(fixtures.TestBase, AssertsCompiledSQL): - __dialect__ = "default" - - @testing.fixture - def src_table(self): - return Table( - "src", - MetaData(), - Column("id", Integer), - Column("name", String(50)), - ) - - @testing.fixture - def src_two_tables(self): - a = table("a", column("id"), column("name")) - b = table("b", column("id"), column("status")) - return a, b - - def test_basic_element(self, src_table): - src = src_table - stmt = CreateTableAs( - select(src.c.id, src.c.name).select_from(src), - "dst", - ) - self.assert_compile( - stmt, - "CREATE TABLE dst AS SELECT src.id, src.name FROM src", - ) - - def test_schema_element_qualified(self, src_table): - src = src_table - stmt = CreateTableAs( - select(src.c.id).select_from(src), - "dst", - schema="analytics", - ) - self.assert_compile( - stmt, - "CREATE TABLE analytics.dst AS SELECT src.id FROM src", - ) - - def test_blank_schema_treated_as_none(self, src_table): - src = src_table - stmt = CreateTableAs( - select(src.c.id).select_from(src), "dst", schema="" - ) - self.assert_compile(stmt, "CREATE TABLE dst AS SELECT src.id FROM src") - - def test_binds_rendered_inline(self, src_table): - src = src_table - stmt = CreateTableAs( - select(literal("x").label("tag")).select_from(src), - "dst", - ) - self.assert_compile( - stmt, - "CREATE TABLE dst AS SELECT 'x' AS tag FROM src", - ) - - def test_temporary_no_schema(self, src_table): - src = src_table - stmt = CreateTableAs( - select(src.c.id, src.c.name).select_from(src), - "dst", - temporary=True, - ) - self.assert_compile( - stmt, - "CREATE TEMPORARY TABLE dst AS " - "SELECT src.id, src.name FROM src", - ) - - def test_temporary_exists_flags(self, src_table): - src = src_table - stmt = CreateTableAs( - select(src.c.id).select_from(src), - "dst", - schema="sch", - temporary=True, - if_not_exists=True, - ) - self.assert_compile( - stmt, - "CREATE TEMPORARY TABLE " - "IF NOT EXISTS sch.dst AS SELECT src.id FROM src", - ) - - def test_if_not_exists(self, src_table): - src = src_table - stmt = CreateTableAs( - select(src.c.id, src.c.name).select_from(src), - "dst", - if_not_exists=True, - ) - self.assert_compile( - stmt, - "CREATE TABLE IF NOT EXISTS dst AS " - "SELECT src.id, src.name FROM src", - ) - - def test_join_with_binds_rendered_inline(self, src_two_tables): - a, b = src_two_tables - - s = ( - select(a.c.id, a.c.name) - .select_from(a.join(b, a.c.id == b.c.id)) - .where(b.c.status == "active") - ).into("dst") - - # Ensure WHERE survives into CTAS and binds are rendered inline - self.assert_compile( - s, - "CREATE TABLE dst AS " - "SELECT a.id, a.name FROM a JOIN b ON a.id = b.id " - "WHERE b.status = 'active'", - ) - - def test_into_equivalent_to_element(self, src_table): - src = src_table - s = select(src.c.id).select_from(src).where(src.c.id == 2) - via_into = s.into("dst") - via_element = CreateTableAs(s, "dst") - - self.assert_compile( - via_into, - "CREATE TABLE dst AS SELECT src.id FROM src WHERE src.id = 2", - ) - self.assert_compile( - via_element, - "CREATE TABLE dst AS SELECT src.id FROM src WHERE src.id = 2", - ) - - def test_into_does_not_mutate_original_select(self, src_table): - src = src_table - s = select(src.c.id).select_from(src).where(src.c.id == 5) - - # compile original SELECT - self.assert_compile( - s, - "SELECT src.id FROM src WHERE src.id = :id_1", - ) - - # build CTAS - _ = s.into("dst") - - # original is still a SELECT - self.assert_compile( - s, - "SELECT src.id FROM src WHERE src.id = :id_1", - ) - - def test_into_with_schema_argument(self, src_table): - src = src_table - s = select(src.c.id).select_from(src).into("t", schema="analytics") - self.assert_compile( - s, - "CREATE TABLE analytics.t AS SELECT src.id FROM src", - ) - - def test_target_string_must_be_unqualified(self, src_table): - src = src_table - with expect_raises_message( - ArgumentError, - re.escape("Target string must be unqualified (use schema=)."), - ): - CreateTableAs(select(src.c.id).select_from(src), "sch.dst") - - def test_empty_name(self): - with expect_raises_message( - ArgumentError, "Table name must be non-empty" - ): - CreateTableAs(select(literal(1)), "") - - @testing.variation("provide_metadata", [True, False]) - def test_generated_metadata_table_property( - self, src_table, provide_metadata - ): - src = src_table - - if provide_metadata: - metadata = MetaData() - else: - metadata = None - - stmt = CreateTableAs( - select(src.c.name.label("thename"), src.c.id).select_from(src), - "dst", - schema="sch", - metadata=metadata, - ) - - if metadata is not None: - is_(stmt.metadata, metadata) - - assert isinstance(stmt.table, Table) - is_(stmt.table.metadata, stmt.metadata) - - self.assert_compile( - CreateTable(stmt.table), - "CREATE TABLE sch.dst (thename VARCHAR(50), id INTEGER)", - ) - - def test_labels_in_select_list_preserved(self, src_table): - src = src_table - stmt = CreateTableAs( - select( - src.c.id.label("user_id"), src.c.name.label("user_name") - ).select_from(src), - "dst", - ) - self.assert_compile( - stmt, - "CREATE TABLE dst AS " - "SELECT src.id AS user_id, src.name AS user_name FROM src", - ) - - def test_distinct_and_group_by_survive(self, src_table): - src = src_table - sel = ( - select(src.c.name).select_from(src).distinct().group_by(src.c.name) - ) - stmt = CreateTableAs(sel, "dst") - self.assert_compile( - stmt, - "CREATE TABLE dst AS " - "SELECT DISTINCT src.name FROM src GROUP BY src.name", - ) - - def test_bindparam_no_value_raises(self, src_table): - src = src_table - sel = select(src.c.name).where(src.c.name == bindparam("x")) - stmt = CreateTableAs(sel, "dst") - - with expect_warnings( - "Bound parameter 'x' rendering literal NULL in a SQL expression;" - ): - self.assert_compile( - stmt, - "CREATE TABLE dst AS SELECT src.name FROM src " - "WHERE src.name = NULL", - ) - - def test_union_all_with_binds_rendered_inline(self, src_two_tables): - a, b = src_two_tables - - # Named binds so params are deterministic - s1 = ( - select(a.c.id) - .select_from(a) - .where(a.c.id == bindparam("p_a", value=1)) - ) - s2 = ( - select(b.c.id) - .select_from(b) - .where(b.c.id == bindparam("p_b", value=2)) - ) - - u_all = s1.union_all(s2) - stmt = CreateTableAs(u_all, "dst") - - self.assert_compile( - stmt, - "CREATE TABLE dst AS " - "SELECT a.id FROM a WHERE a.id = 1 " - "UNION ALL SELECT b.id FROM b WHERE b.id = 2", - ) - - def test_union_labels_follow_first_select(self, src_two_tables): - # Many engines take column names - # of a UNION from the first SELECT’s labels. - a = table("a", column("val")) - b = table("b", column("val")) - - s1 = select(a.c.val.label("first_name")).select_from(a) - s2 = select(b.c.val).select_from(b) # unlabeled second branch - - u = s1.union(s2) - stmt = CreateTableAs(u, "dst") - - # We only assert what’s stable across dialects: - # - first SELECT has the label - # - a UNION occurs - self.assert_compile( - stmt, - "CREATE TABLE dst AS " - "SELECT a.val AS first_name FROM a " - "UNION " - "SELECT b.val FROM b", - ) - - def test_union_all_with_inlined_literals_smoke(self, src_two_tables): - # Proves literal_binds=True behavior applies across branches. - a, b = src_two_tables - u = ( - select(literal(1).label("x")) - .select_from(a) - .union_all(select(literal("b").label("x")).select_from(b)) - ) - stmt = CreateTableAs(u, "dst") - self.assert_compile( - stmt, - "CREATE TABLE dst AS " - "SELECT 1 AS x FROM a UNION ALL SELECT 'b' AS x FROM b", - ) - - def test_select_shape_where_order_limit(self, src_table): - src = src_table - sel = ( - select(src.c.id, src.c.name) - .select_from(src) - .where(src.c.id > literal(10)) - .order_by(src.c.name) - .limit(5) - .offset(0) - ) - stmt = CreateTableAs(sel, "dst") - self.assert_compile( - stmt, - "CREATE TABLE dst AS " - "SELECT src.id, src.name FROM src " - "WHERE src.id > 10 ORDER BY src.name LIMIT 5 OFFSET 0", - ) - - def test_cte_smoke(self, src_two_tables): - # Proves CTAS works with a WITH-CTE wrapper and labeled column. - a, _ = src_two_tables - cte = select(a.c.id.label("aid")).select_from(a).cte("u") - stmt = CreateTableAs(select(cte.c.aid), "dst") - self.assert_compile( - stmt, - "CREATE TABLE dst AS " - "WITH u AS (SELECT a.id AS aid FROM a) " - "SELECT u.aid FROM u", - ) diff --git a/test/sql/test_ddlemit.py b/test/sql/test_ddlemit.py index a3c1567bbe..703d866320 100644 --- a/test/sql/test_ddlemit.py +++ b/test/sql/test_ddlemit.py @@ -2,11 +2,13 @@ from unittest import mock from unittest.mock import Mock from sqlalchemy import Column +from sqlalchemy import CreateView from sqlalchemy import ForeignKey from sqlalchemy import Index from sqlalchemy import Integer from sqlalchemy import MetaData from sqlalchemy import schema +from sqlalchemy import select from sqlalchemy import Sequence from sqlalchemy import Table from sqlalchemy import testing @@ -69,6 +71,20 @@ class EmitDDLTest(fixtures.TestBase): Table("t%d" % i, m, Column("x", Integer)) for i in range(1, 6) ) + def _table_and_view_fixture(self): + m = MetaData() + + tables = [ + Table("t%d" % i, m, Column("x", Integer)) for i in range(1, 4) + ] + + t1, t2, t3 = tables + views = [ + CreateView(select(t1), "v1", metadata=m).table, + CreateView(select(t3), "v2", metadata=m).table, + ] + return (m,) + tuple(tables) + tuple(views) + def _use_alter_fixture_one(self): m = MetaData() @@ -368,6 +384,78 @@ class EmitDDLTest(fixtures.TestBase): self._assert_drop_tables([t1, t2, t3, t4, t5], generator, m, False) + def test_create_metadata_wviews_checkfirst(self): + m, t1, t2, t3, v1, v2 = self._table_and_view_fixture() + generator = self._mock_create_fixture( + True, None, item_exists=lambda t: t not in ("t2", "v2") + ) + + self._assert_create_tables([t2, v2], generator, m, True) + + def test_drop_metadata_wviews_checkfirst(self): + m, t1, t2, t3, v1, v2 = self._table_and_view_fixture() + generator = self._mock_drop_fixture( + True, None, item_exists=lambda t: t in ("t2", "v2") + ) + + self._assert_drop_tables([t2, v2], generator, m, True) + + def test_create_metadata_wviews_check_tables_only(self): + m, t1, t2, t3, v1, v2 = self._table_and_view_fixture() + generator = self._mock_create_fixture( + CheckFirst.TABLES, + None, + item_exists=lambda t: t not in ("t2", "v2"), + ) + + self._assert_create_tables( + [t2, v1, v2], generator, m, CheckFirst.TABLES + ) + + def test_drop_metadata_wviews_check_tables_only(self): + m, t1, t2, t3, v1, v2 = self._table_and_view_fixture() + generator = self._mock_drop_fixture( + CheckFirst.TABLES, None, item_exists=lambda t: t in ("t2", "v2") + ) + + self._assert_drop_tables([t2, v1, v2], generator, m, CheckFirst.TABLES) + + def test_create_metadata_wviews_check_views_only(self): + m, t1, t2, t3, v1, v2 = self._table_and_view_fixture() + generator = self._mock_create_fixture( + CheckFirst.VIEWS, None, item_exists=lambda t: t not in ("t2", "v2") + ) + + self._assert_create_tables( + [t1, t2, t3, v2], generator, m, CheckFirst.VIEWS + ) + + def test_drop_metadata_wviews_check_views_only(self): + m, t1, t2, t3, v1, v2 = self._table_and_view_fixture() + generator = self._mock_drop_fixture( + CheckFirst.VIEWS, None, item_exists=lambda t: t in ("t2", "v2") + ) + + self._assert_drop_tables( + [t1, t2, t3, v2], generator, m, CheckFirst.VIEWS + ) + + def test_create_metadata_wviews_nocheck(self): + m, t1, t2, t3, v1, v2 = self._table_and_view_fixture() + generator = self._mock_create_fixture( + False, None, item_exists=lambda t: t not in ("t2", "v2") + ) + + self._assert_create_tables([t1, t2, t3, v1, v2], generator, m, False) + + def test_drop_metadata_wviews_nocheck(self): + m, t1, t2, t3, v1, v2 = self._table_and_view_fixture() + generator = self._mock_drop_fixture( + False, None, item_exists=lambda t: t in ("t2", "v2") + ) + + self._assert_drop_tables([t1, t2, t3, v1, v2], generator, m, False) + def test_create_metadata_auto_alter_fk(self): m, t1, t2 = self._use_alter_fixture_one() generator = self._mock_create_fixture(False, [t1, t2]) @@ -391,15 +479,35 @@ class EmitDDLTest(fixtures.TestBase): ) def _assert_create_tables(self, elements, generator, argument, checkfirst): - self._assert_ddl(schema.CreateTable, elements, generator, argument) + self._assert_ddl( + (schema.CreateTable, schema.CreateView), + elements, + generator, + argument, + ) + tables = [] if CheckFirst(checkfirst) & CheckFirst.TABLES: if generator.tables is not None: - tables = generator.tables + tables.extend([t for t in generator.tables if not t.is_view]) elif isinstance(argument, MetaData): - tables = argument.tables.values() + tables.extend( + [t for t in argument.tables.values() if not t.is_view] + ) else: assert False, "don't know what tables we are checking" + + if CheckFirst(checkfirst) & CheckFirst.VIEWS: + if generator.tables is not None: + tables.extend([t for t in generator.tables if t.is_view]) + elif isinstance(argument, MetaData): + tables.extend( + [t for t in argument.tables.values() if t.is_view] + ) + else: + assert False, "don't know what views we are checking" + + if tables: eq_( generator.dialect.has_table.mock_calls, [ @@ -414,15 +522,32 @@ class EmitDDLTest(fixtures.TestBase): ) def _assert_drop_tables(self, elements, generator, argument, checkfirst): - self._assert_ddl(schema.DropTable, elements, generator, argument) + self._assert_ddl( + (schema.DropTable, schema.DropView), elements, generator, argument + ) + tables = [] if CheckFirst(checkfirst) & CheckFirst.TABLES: if generator.tables is not None: - tables = generator.tables + tables.extend([t for t in generator.tables if not t.is_view]) elif isinstance(argument, MetaData): - tables = argument.tables.values() + tables.extend( + [t for t in argument.tables.values() if not t.is_view] + ) else: assert False, "don't know what tables we are checking" + + if CheckFirst(checkfirst) & CheckFirst.VIEWS: + if generator.tables is not None: + tables.extend([t for t in generator.tables if t.is_view]) + elif isinstance(argument, MetaData): + tables.extend( + [t for t in argument.tables.values() if t.is_view] + ) + else: + assert False, "don't know what views we are checking" + + if tables: eq_( generator.dialect.has_table.mock_calls, [ diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py index 76d5d33b0f..f9cc8662a9 100644 --- a/test/sql/test_metadata.py +++ b/test/sql/test_metadata.py @@ -13,7 +13,10 @@ from sqlalchemy import Column from sqlalchemy import column from sqlalchemy import ColumnDefault from sqlalchemy import Computed +from sqlalchemy import CreateTable +from sqlalchemy import CreateView from sqlalchemy import desc +from sqlalchemy import DropTable from sqlalchemy import Enum from sqlalchemy import event from sqlalchemy import exc @@ -1736,6 +1739,49 @@ class ToMetaDataTest(fixtures.TestBase, AssertsCompiledSQL, ComparesTables): eq_(len(t2.indexes), 1) + @testing.variation("type_", ["create_table_as", "create_view"]) + def test_table_via_select(self, type_: testing.Variation): + meta = MetaData() + + table = Table("mytable", meta, Column("x", Integer)) + + m2 = MetaData() + + if type_.create_table_as: + target = select(table).into("ctas", metadata=meta) + elif type_.create_view: + target = CreateView(select(table), "tview", metadata=meta) + else: + type_.fail() + + is_(target.table.metadata, meta) + + tt2 = target.table.to_metadata(m2) + if type_.create_view: + is_true(tt2.is_view) + + ttarget = tt2._creator_ddl + is_(ttarget.metadata, m2) + is_(ttarget.table, tt2) + + if tt2.is_view: + is_(tt2._dropper_ddl.element, tt2) + + def test_alternate_create_drop(self): + meta = MetaData() + + table = Table("mytable", meta, Column("x", Integer)) + + table.set_creator_ddl(CreateTable(table, if_not_exists=True)) + table.set_dropper_ddl(DropTable(table, if_exists=True)) + + m2 = MetaData() + + ttarget = table.to_metadata(m2) + + is_(ttarget._creator_ddl.element, ttarget) + is_(ttarget._dropper_ddl.element, ttarget) + class InfoTest(fixtures.TestBase): def test_metadata_info(self): diff --git a/test/sql/test_table_via_select.py b/test/sql/test_table_via_select.py new file mode 100644 index 0000000000..ef975fa0e2 --- /dev/null +++ b/test/sql/test_table_via_select.py @@ -0,0 +1,753 @@ +from sqlalchemy import bindparam +from sqlalchemy import Column +from sqlalchemy import CreateTableAs +from sqlalchemy import CreateView +from sqlalchemy import Integer +from sqlalchemy import literal +from sqlalchemy import MetaData +from sqlalchemy import String +from sqlalchemy import Table +from sqlalchemy import testing +from sqlalchemy import text +from sqlalchemy.schema import CreateTable +from sqlalchemy.schema import DropView +from sqlalchemy.sql import column +from sqlalchemy.sql import select +from sqlalchemy.sql import table +from sqlalchemy.testing import fixtures +from sqlalchemy.testing import is_ +from sqlalchemy.testing.assertions import AssertsCompiledSQL +from sqlalchemy.testing.assertions import expect_warnings + + +class TableViaSelectTest(fixtures.TestBase, AssertsCompiledSQL): + __dialect__ = "default" + + @testing.fixture + def src_table(self): + return Table( + "src", + MetaData(), + Column("id", Integer), + Column("name", String(50)), + ) + + @testing.fixture + def src_two_tables(self): + a = table("a", column("id"), column("name")) + b = table("b", column("id"), column("status")) + return a, b + + @testing.variation("type_", ["create_table_as", "create_view"]) + def test_basic_element(self, src_table, type_: testing.Variation): + src = src_table + if type_.create_table_as: + stmt = CreateTableAs( + select(src.c.id, src.c.name), + "dst", + ) + elif type_.create_view: + stmt = CreateView( + select(src.c.id, src.c.name), + "dst", + ) + else: + type_.fail() + + self.assert_compile( + stmt, + f'CREATE {"TABLE" if type_.create_table_as else "VIEW"} ' + f"dst AS SELECT src.id, src.name FROM src", + ) + + @testing.variation("type_", ["create_table_as", "create_view"]) + def test_schema_element_qualified( + self, src_table, type_: testing.Variation + ): + src = src_table + if type_.create_table_as: + stmt = CreateTableAs( + select(src.c.id), + "dst", + schema="analytics", + ) + elif type_.create_view: + stmt = CreateView( + select(src.c.id), + "dst", + schema="analytics", + ) + else: + type_.fail() + + self.assert_compile( + stmt, + f'CREATE {"TABLE" if type_.create_table_as else "VIEW"} ' + f"analytics.dst AS SELECT src.id FROM src", + ) + + @testing.variation("type_", ["create_table_as", "create_view"]) + def test_quoting(self, type_: testing.Variation): + + src = Table( + "SourceTable", + MetaData(), + Column("Some Name", Integer), + Column("Other Col", String), + ) + if type_.create_table_as: + stmt = CreateTableAs( + select(src), + "My Analytic Table", + schema="Analysis", + ) + elif type_.create_view: + stmt = CreateView( + select(src), + "My Analytic View", + schema="Analysis", + ) + else: + type_.fail() + + self.assert_compile( + stmt, + f'CREATE {"TABLE" if type_.create_table_as else "VIEW"} ' + f'"Analysis"."My Analytic ' + f'{"Table" if type_.create_table_as else "View"}" AS SELECT ' + f'"SourceTable"."Some Name", "SourceTable"."Other Col" ' + f'FROM "SourceTable"', + ) + + @testing.variation("type_", ["create_table_as", "create_view"]) + def test_blank_schema_treated_as_none( + self, src_table, type_: testing.Variation + ): + src = src_table + if type_.create_table_as: + stmt = CreateTableAs(select(src.c.id), "dst", schema="") + elif type_.create_view: + stmt = CreateView(select(src.c.id), "dst", schema="") + else: + type_.fail() + + self.assert_compile( + stmt, + f'CREATE {"TABLE" if type_.create_table_as else "VIEW"} ' + f"dst AS SELECT src.id FROM src", + ) + + @testing.variation("type_", ["create_table_as", "create_view"]) + def test_binds_rendered_inline(self, src_table, type_: testing.Variation): + src = src_table + if type_.create_table_as: + stmt = CreateTableAs( + select(literal("x").label("tag")).select_from(src), + "dst", + ) + elif type_.create_view: + stmt = CreateView( + select(literal("x").label("tag")).select_from(src), + "dst", + ) + else: + type_.fail() + + self.assert_compile( + stmt, + f'CREATE {"TABLE" if type_.create_table_as else "VIEW"} ' + f"dst AS SELECT 'x' AS tag FROM src", + ) + + @testing.variation("type_", ["create_table_as", "create_view"]) + def test_temporary_no_schema(self, src_table, type_: testing.Variation): + src = src_table + if type_.create_table_as: + stmt = CreateTableAs( + select(src.c.id, src.c.name), + "dst", + temporary=True, + ) + elif type_.create_view: + stmt = CreateView( + select(src.c.id, src.c.name), + "dst", + temporary=True, + ) + else: + type_.fail() + + self.assert_compile( + stmt, + f'CREATE TEMPORARY {"TABLE" if type_.create_table_as else "VIEW"} ' + f"dst AS SELECT src.id, src.name FROM src", + ) + + @testing.variation("temporary", [True, False]) + @testing.variation("if_not_exists", [True, False]) + def test_create_table_as_flags( + self, + src_table, + temporary: testing.Variation, + if_not_exists: testing.Variation, + ): + src = src_table + stmt = CreateTableAs( + select(src.c.id), + "dst", + schema="sch", + temporary=bool(temporary), + if_not_exists=bool(if_not_exists), + ) + + self.assert_compile( + stmt, + f"""CREATE { + 'TEMPORARY ' if temporary else '' + }TABLE { + 'IF NOT EXISTS ' if if_not_exists else '' + }sch.dst AS SELECT src.id FROM src""", + ) + + def test_temporary_or_replace_create_view(self, src_table): + src = src_table + stmt = CreateView( + select(src.c.id), + "dst", + schema="sch", + temporary=True, + or_replace=True, + ) + + self.assert_compile( + stmt, + "CREATE OR REPLACE TEMPORARY VIEW sch.dst AS " + "SELECT src.id FROM src", + ) + + def test_or_replace(self, src_table): + src = src_table + stmt = CreateView( + select(src.c.id, src.c.name), + "dst", + or_replace=True, + ) + + self.assert_compile( + stmt, + "CREATE OR REPLACE VIEW dst AS SELECT src.id, src.name FROM src", + ) + + def test_join_with_binds_rendered_inline(self, src_two_tables): + a, b = src_two_tables + + s = ( + select(a.c.id, a.c.name) + .select_from(a.join(b, a.c.id == b.c.id)) + .where(b.c.status == "active") + ).into("dst") + + # Ensure WHERE survives into CTAS and binds are rendered inline + self.assert_compile( + s, + "CREATE TABLE dst AS " + "SELECT a.id, a.name FROM a JOIN b ON a.id = b.id " + "WHERE b.status = 'active'", + ) + + def test_into_equivalent_to_element(self, src_table): + src = src_table + s = select(src.c.id).where(src.c.id == 2) + via_into = s.into("dst") + via_element = CreateTableAs(s, "dst") + + self.assert_compile( + via_into, + "CREATE TABLE dst AS SELECT src.id FROM src WHERE src.id = 2", + ) + self.assert_compile( + via_element, + "CREATE TABLE dst AS SELECT src.id FROM src WHERE src.id = 2", + ) + + def test_into_does_not_mutate_original_select(self, src_table): + src = src_table + s = select(src.c.id).where(src.c.id == 5) + + # compile original SELECT + self.assert_compile( + s, + "SELECT src.id FROM src WHERE src.id = :id_1", + ) + + # build CTAS + _ = s.into("dst") + + # original is still a SELECT + self.assert_compile( + s, + "SELECT src.id FROM src WHERE src.id = :id_1", + ) + + def test_into_with_schema_argument(self, src_table): + src = src_table + s = select(src.c.id).into("t", schema="analytics") + self.assert_compile( + s, + "CREATE TABLE analytics.t AS SELECT src.id FROM src", + ) + + @testing.variation("provide_metadata", [True, False]) + @testing.variation("type_", ["create_table_as", "create_view"]) + def test_generated_metadata_table_property( + self, src_table, provide_metadata, type_: testing.Variation + ): + src = src_table + + if provide_metadata: + metadata = MetaData() + else: + metadata = None + + if type_.create_table_as: + stmt = CreateTableAs( + select(src.c.name.label("thename"), src.c.id), + "dst", + schema="sch", + metadata=metadata, + ) + elif type_.create_view: + stmt = CreateView( + select(src.c.name.label("thename"), src.c.id), + "dst", + schema="sch", + metadata=metadata, + ) + else: + type_.fail() + + if metadata is not None: + is_(stmt.metadata, metadata) + + assert isinstance(stmt.table, Table) + is_(stmt.table.metadata, stmt.metadata) + + # this is validating the structure of the table but is not + # looking at CreateTable being the appropriate construct + self.assert_compile( + CreateTable(stmt.table), + "CREATE TABLE sch.dst (thename VARCHAR(50), id INTEGER)", + ) + + @testing.variation("type_", ["create_table_as", "create_view"]) + def test_labels_in_select_list_preserved( + self, src_table, type_: testing.Variation + ): + src = src_table + if type_.create_table_as: + stmt = CreateTableAs( + select( + src.c.id.label("user_id"), + src.c.name.label("user_name"), + ), + "dst", + ) + elif type_.create_view: + stmt = CreateView( + select( + src.c.id.label("user_id"), + src.c.name.label("user_name"), + ), + "dst", + ) + else: + type_.fail() + + self.assert_compile( + stmt, + f'CREATE {"TABLE" if type_.create_table_as else "VIEW"} ' + f"dst AS SELECT src.id AS user_id, src.name AS user_name FROM src", + ) + + @testing.variation("type_", ["create_table_as", "create_view"]) + def test_distinct_and_group_by_survive( + self, src_table, type_: testing.Variation + ): + src = src_table + sel = select(src.c.name).distinct().group_by(src.c.name) + if type_.create_table_as: + stmt = CreateTableAs(sel, "dst") + elif type_.create_view: + stmt = CreateView(sel, "dst") + else: + type_.fail() + + self.assert_compile( + stmt, + f'CREATE {"TABLE" if type_.create_table_as else "VIEW"} ' + f"dst AS SELECT DISTINCT src.name FROM src GROUP BY src.name", + ) + + @testing.variation("type_", ["create_table_as", "create_view"]) + def test_bindparam_no_value_raises( + self, src_table, type_: testing.Variation + ): + src = src_table + sel = select(src.c.name).where(src.c.name == bindparam("x")) + if type_.create_table_as: + stmt = CreateTableAs(sel, "dst") + elif type_.create_view: + stmt = CreateView(sel, "dst") + else: + type_.fail() + + with expect_warnings( + "Bound parameter 'x' rendering literal NULL in a SQL expression;" + ): + self.assert_compile( + stmt, + f'CREATE {"TABLE" if type_.create_table_as else "VIEW"} ' + f"dst AS SELECT src.name FROM src WHERE src.name = NULL", + ) + + @testing.variation("type_", ["create_table_as", "create_view"]) + def test_union_all_with_binds_rendered_inline( + self, src_two_tables, type_: testing.Variation + ): + a, b = src_two_tables + + # Named binds so params are deterministic + s1 = select(a.c.id).where(a.c.id == bindparam("p_a", value=1)) + s2 = select(b.c.id).where(b.c.id == bindparam("p_b", value=2)) + + u_all = s1.union_all(s2) + if type_.create_table_as: + stmt = CreateTableAs(u_all, "dst") + elif type_.create_view: + stmt = CreateView(u_all, "dst") + else: + type_.fail() + + self.assert_compile( + stmt, + f'CREATE {"TABLE" if type_.create_table_as else "VIEW"} dst AS ' + f"SELECT a.id FROM a WHERE a.id = 1 " + f"UNION ALL SELECT b.id FROM b WHERE b.id = 2", + ) + + @testing.variation("type_", ["create_table_as", "create_view"]) + def test_union_labels_follow_first_select( + self, src_two_tables, type_: testing.Variation + ): + # Many engines take column names + # of a UNION from the first SELECT's labels. + a = table("a", column("val")) + b = table("b", column("val")) + + s1 = select(a.c.val.label("first_name")) + s2 = select(b.c.val) # unlabeled second branch + + u = s1.union(s2) + if type_.create_table_as: + stmt = CreateTableAs(u, "dst") + elif type_.create_view: + stmt = CreateView(u, "dst") + else: + type_.fail() + + # We only assert what's stable across dialects: + # - first SELECT has the label + # - a UNION occurs + self.assert_compile( + stmt, + f'CREATE {"TABLE" if type_.create_table_as else "VIEW"} dst AS ' + f"SELECT a.val AS first_name FROM a UNION SELECT b.val FROM b", + ) + + self.assert_compile( + select(stmt.table), "SELECT dst.first_name FROM dst" + ) + + @testing.variation("type_", ["create_table_as", "create_view"]) + def test_union_all_with_inlined_literals_smoke( + self, src_two_tables, type_: testing.Variation + ): + # Proves literal_binds=True behavior applies across branches. + a, b = src_two_tables + u = ( + select(literal(1).label("x")) + .select_from(a) + .union_all(select(literal("b").label("x")).select_from(b)) + ) + if type_.create_table_as: + stmt = CreateTableAs(u, "dst") + elif type_.create_view: + stmt = CreateView(u, "dst") + else: + type_.fail() + + self.assert_compile( + stmt, + f'CREATE {"TABLE" if type_.create_table_as else "VIEW"} dst AS ' + f"SELECT 1 AS x FROM a UNION ALL SELECT 'b' AS x FROM b", + ) + + @testing.variation("type_", ["create_table_as", "create_view"]) + def test_select_shape_where_order_limit( + self, src_table, type_: testing.Variation + ): + src = src_table + sel = ( + select(src.c.id, src.c.name) + .where(src.c.id > literal(10)) + .order_by(src.c.name) + .limit(5) + .offset(0) + ) + if type_.create_table_as: + stmt = CreateTableAs(sel, "dst") + elif type_.create_view: + stmt = CreateView(sel, "dst") + else: + type_.fail() + + self.assert_compile( + stmt, + f'CREATE {"TABLE" if type_.create_table_as else "VIEW"} dst AS ' + f"SELECT src.id, src.name FROM src " + f"WHERE src.id > 10 ORDER BY src.name LIMIT 5 OFFSET 0", + ) + + @testing.variation("type_", ["create_table_as", "create_view"]) + def test_cte_smoke(self, src_two_tables, type_: testing.Variation): + # Proves CTAS works with a WITH-CTE wrapper and labeled column. + a, _ = src_two_tables + cte = select(a.c.id.label("aid")).cte("u") + if type_.create_table_as: + stmt = CreateTableAs(select(cte.c.aid), "dst") + elif type_.create_view: + stmt = CreateView(select(cte.c.aid), "dst") + else: + type_.fail() + + self.assert_compile( + stmt, + f'CREATE {"TABLE" if type_.create_table_as else "VIEW"} dst AS ' + f"WITH u AS (SELECT a.id AS aid FROM a) SELECT u.aid FROM u", + ) + + # Verify the created table uses the label name for its column + from sqlalchemy.testing import eq_ + + eq_(list(stmt.table.c.keys()), ["aid"]) + + def test_materialized_view_basic(self, src_table): + src = src_table + stmt = CreateView( + select(src.c.id, src.c.name), + "dst", + materialized=True, + ) + self.assert_compile( + stmt, + "CREATE MATERIALIZED VIEW dst AS SELECT src.id, src.name FROM src", + ) + + def test_materialized_view_with_schema(self, src_table): + src = src_table + stmt = CreateView( + select(src.c.id), + "dst", + schema="analytics", + materialized=True, + ) + self.assert_compile( + stmt, + "CREATE MATERIALIZED VIEW analytics.dst AS SELECT src.id FROM src", + ) + + def test_materialized_view_or_replace(self, src_table): + src = src_table + stmt = CreateView( + select(src.c.id, src.c.name), + "dst", + materialized=True, + or_replace=True, + ) + self.assert_compile( + stmt, + "CREATE OR REPLACE MATERIALIZED VIEW dst AS " + "SELECT src.id, src.name FROM src", + ) + + def test_materialized_view_temporary(self, src_table): + src = src_table + stmt = CreateView( + select(src.c.id, src.c.name), + "dst", + materialized=True, + temporary=True, + ) + self.assert_compile( + stmt, + "CREATE TEMPORARY MATERIALIZED VIEW dst AS " + "SELECT src.id, src.name FROM src", + ) + + def test_materialized_view_all_flags(self, src_table): + src = src_table + stmt = CreateView( + select(src.c.id), + "dst", + schema="sch", + materialized=True, + temporary=True, + or_replace=True, + ) + self.assert_compile( + stmt, + "CREATE OR REPLACE TEMPORARY MATERIALIZED VIEW " + "sch.dst AS SELECT src.id FROM src", + ) + + @testing.variation("type_", ["create_table_as", "create_view"]) + @testing.variation("use_schema", [True, False]) + def test_textual_select( + self, type_: testing.Variation, use_schema: testing.Variation + ): + """Test using text().columns() with CreateView and CreateTableAs. + + This is likely how we will get alembic to autogenerate a CreateView() + construct since we dont want to rewrite a whole select() construct + in a migration file. + + """ + textual = text( + "SELECT a, b, c FROM source_table WHERE x > 10" + ).columns( + column("a", Integer), + column("b", String), + column("c", Integer), + ) + + schema = "analytics" if use_schema else None + + if type_.create_table_as: + stmt = CreateTableAs(textual, "dst", schema=schema) + elif type_.create_view: + stmt = CreateView(textual, "dst", schema=schema) + else: + type_.fail() + + self.assert_compile( + stmt, + f'CREATE {"TABLE" if type_.create_table_as else "VIEW"} ' + f'{"analytics." if use_schema else ""}dst AS ' + f"SELECT a, b, c FROM source_table WHERE x > 10", + ) + + # Verify the generated table has the correct columns + assert "a" in stmt.table.c + assert "b" in stmt.table.c + assert "c" in stmt.table.c + assert isinstance(stmt.table.c.a.type, Integer) + assert isinstance(stmt.table.c.b.type, String) + assert isinstance(stmt.table.c.c.type, Integer) + + @testing.variation("temporary", [True, False]) + @testing.variation("if_not_exists", [True, False]) + def test_textual_select_with_flags_create_table_as( + self, temporary: testing.Variation, if_not_exists: testing.Variation + ): + """Test TextualSelect with flags for CREATE TABLE AS.""" + textual = text("SELECT * FROM temp_data").columns( + column("x", Integer), + column("y", String), + ) + + stmt = CreateTableAs( + textual, + "snapshot", + temporary=bool(temporary), + if_not_exists=bool(if_not_exists), + ) + + self.assert_compile( + stmt, + f"""CREATE { + 'TEMPORARY ' if temporary else '' + }TABLE { + 'IF NOT EXISTS ' if if_not_exists else '' + }snapshot AS SELECT * FROM temp_data""", + ) + + @testing.variation("temporary", [True, False]) + @testing.variation("or_replace", [True, False]) + def test_textual_select_with_flags_create_view( + self, temporary: testing.Variation, or_replace: testing.Variation + ): + """Test TextualSelect with flags for CREATE VIEW.""" + textual = text("SELECT * FROM temp_data").columns( + column("x", Integer), + column("y", String), + ) + + stmt = CreateView( + textual, + "snapshot_view", + temporary=bool(temporary), + or_replace=bool(or_replace), + ) + + self.assert_compile( + stmt, + f"""CREATE { + 'OR REPLACE ' if or_replace else '' + }{ + 'TEMPORARY ' if temporary else '' + }VIEW snapshot_view AS SELECT * FROM temp_data""", + ) + + def test_drop_view_basic(self): + """Test basic DROP VIEW compilation.""" + src = table("src", column("id"), column("name")) + create_view = CreateView(select(src), "my_view") + view_table = create_view.table + + drop_stmt = DropView(view_table) + + self.assert_compile(drop_stmt, "DROP VIEW my_view") + + def test_drop_view_materialized(self): + """Test DROP MATERIALIZED VIEW compilation.""" + src = table("src", column("id"), column("name")) + create_view = CreateView(select(src), "my_mat_view", materialized=True) + view_table = create_view.table + + drop_stmt = DropView(view_table, materialized=True) + + self.assert_compile(drop_stmt, "DROP MATERIALIZED VIEW my_mat_view") + + def test_drop_view_from_create_view(self): + """Test that CreateView automatically creates proper DropView.""" + src = table("src", column("id"), column("name")) + + # Regular view + create_view = CreateView(select(src), "regular_view") + drop_stmt = create_view.table._dropper_ddl + + self.assert_compile(drop_stmt, "DROP VIEW regular_view") + + def test_drop_materialized_view_from_create_view(self): + """Test CreateView with materialized=True creates proper DropView.""" + src = table("src", column("id"), column("name")) + + # Materialized view + create_mat_view = CreateView( + select(src), "materialized_view", materialized=True + ) + drop_stmt = create_mat_view.table._dropper_ddl + + self.assert_compile( + drop_stmt, "DROP MATERIALIZED VIEW materialized_view" + ) -- 2.47.3