--- /dev/null
+.. change::
+ :tags: usecase, schema
+ :tickets: 8394
+
+ Implemented the DDL event hooks :meth:`.DDLEvents.before_create`,
+ :meth:`.DDLEvents.after_create`, :meth:`.DDLEvents.before_drop`,
+ :meth:`.DDLEvents.after_drop` for all :class:`.SchemaItem` objects that
+ include a distinct CREATE or DROP step, when that step is invoked as a
+ distinct SQL statement, including for :class:`.ForeignKeyConstraint`,
+ :class:`.Sequence`, :class:`.Index`, and PostgreSQL's
+ :class:`_postgresql.ENUM`.
from ...sql import roles
from ...sql import sqltypes
from ...sql import type_api
-from ...sql.ddl import InvokeDDLBase
+from ...sql.ddl import InvokeCreateDDLBase
+from ...sql.ddl import InvokeDropDDLBase
if TYPE_CHECKING:
from ...sql._typing import _TypeEngineArgument
self.drop(bind=bind, checkfirst=checkfirst)
-class NamedTypeGenerator(InvokeDDLBase):
+class NamedTypeGenerator(InvokeCreateDDLBase):
def __init__(self, dialect, connection, checkfirst=False, **kwargs):
super().__init__(connection, **kwargs)
self.checkfirst = checkfirst
)
-class NamedTypeDropper(InvokeDDLBase):
+class NamedTypeDropper(InvokeDropDDLBase):
def __init__(self, dialect, connection, checkfirst=False, **kwargs):
super().__init__(connection, **kwargs)
self.checkfirst = checkfirst
if not self._can_create_type(enum):
return
- self.connection.execute(CreateEnumType(enum))
+ with self.with_ddl_events(enum):
+ self.connection.execute(CreateEnumType(enum))
class EnumDropper(NamedTypeDropper):
if not self._can_drop_type(enum):
return
- self.connection.execute(DropEnumType(enum))
+ with self.with_ddl_events(enum):
+ self.connection.execute(DropEnumType(enum))
class ENUM(NamedType, sqltypes.NativeForEmulated, sqltypes.Enum):
kw.setdefault("_create_events", False)
kw.setdefault("values_callable", impl.values_callable)
kw.setdefault("omit_aliases", impl._omit_aliases)
+ kw.setdefault("_adapted_from", impl)
return cls(**kw)
def create(self, bind=None, checkfirst=True):
def visit_DOMAIN(self, domain):
if not self._can_create_type(domain):
return
- self.connection.execute(CreateDomainType(domain))
+ with self.with_ddl_events(domain):
+ self.connection.execute(CreateDomainType(domain))
class DomainDropper(NamedTypeDropper):
if not self._can_drop_type(domain):
return
- self.connection.execute(DropDomainType(domain))
+ with self.with_ddl_events(domain):
+ self.connection.execute(DropDomainType(domain))
class DOMAIN(NamedType, sqltypes.SchemaType):
"""
from __future__ import annotations
+import contextlib
import typing
from typing import Any
from typing import Callable
.. versionadded:: 1.4.0b2
"""
- super(CreateTable, self).__init__(element, if_not_exists=if_not_exists)
+ super().__init__(element, if_not_exists=if_not_exists)
self.columns = [CreateColumn(column) for column in element.columns]
self.include_foreign_key_constraints = include_foreign_key_constraints
.. versionadded:: 1.4.0b2
"""
- super(DropTable, self).__init__(element, if_exists=if_exists)
+ super().__init__(element, if_exists=if_exists)
class CreateSequence(_CreateDropBase):
.. versionadded:: 1.4.0b2
"""
- super(CreateIndex, self).__init__(element, if_not_exists=if_not_exists)
+ super().__init__(element, if_not_exists=if_not_exists)
class DropIndex(_CreateDropBase):
.. versionadded:: 1.4.0b2
"""
- super(DropIndex, self).__init__(element, if_exists=if_exists)
+ super().__init__(element, if_exists=if_exists)
class AddConstraint(_CreateDropBase):
__visit_name__ = "add_constraint"
def __init__(self, element, *args, **kw):
- super(AddConstraint, self).__init__(element, *args, **kw)
+ super().__init__(element, *args, **kw)
element._create_rule = util.portable_instancemethod(
self._create_rule_disable
)
def __init__(self, element, cascade=False, **kw):
self.cascade = cascade
- super(DropConstraint, self).__init__(element, **kw)
+ super().__init__(element, **kw)
element._create_rule = util.portable_instancemethod(
self._create_rule_disable
)
def __init__(self, connection):
self.connection = connection
+ @contextlib.contextmanager
+ def with_ddl_events(self, target, **kw):
+ """helper context manager that will apply appropriate DDL events
+ to a CREATE or DROP operation."""
-class SchemaGenerator(InvokeDDLBase):
+ raise NotImplementedError()
+
+
+class InvokeCreateDDLBase(InvokeDDLBase):
+ @contextlib.contextmanager
+ def with_ddl_events(self, target, **kw):
+ """helper context manager that will apply appropriate DDL events
+ to a CREATE or DROP operation."""
+
+ target.dispatch.before_create(
+ target, self.connection, _ddl_runner=self, **kw
+ )
+ yield
+ target.dispatch.after_create(
+ target, self.connection, _ddl_runner=self, **kw
+ )
+
+
+class InvokeDropDDLBase(InvokeDDLBase):
+ @contextlib.contextmanager
+ def with_ddl_events(self, target, **kw):
+ """helper context manager that will apply appropriate DDL events
+ to a CREATE or DROP operation."""
+
+ target.dispatch.before_drop(
+ target, self.connection, _ddl_runner=self, **kw
+ )
+ yield
+ target.dispatch.after_drop(
+ target, self.connection, _ddl_runner=self, **kw
+ )
+
+
+class SchemaGenerator(InvokeCreateDDLBase):
def __init__(
self, dialect, connection, checkfirst=False, tables=None, **kwargs
):
- super(SchemaGenerator, self).__init__(connection, **kwargs)
+ super().__init__(connection, **kwargs)
self.checkfirst = checkfirst
self.tables = tables
self.preparer = dialect.identifier_preparer
]
event_collection = [t for (t, fks) in collection if t is not None]
- metadata.dispatch.before_create(
- metadata,
- self.connection,
- tables=event_collection,
- checkfirst=self.checkfirst,
- _ddl_runner=self,
- )
-
- for seq in seq_coll:
- self.traverse_single(seq, create_ok=True)
-
- for table, fkcs in collection:
- if table is not None:
- self.traverse_single(
- table,
- create_ok=True,
- include_foreign_key_constraints=fkcs,
- _is_metadata_operation=True,
- )
- else:
- for fkc in fkcs:
- self.traverse_single(fkc)
- metadata.dispatch.after_create(
+ with self.with_ddl_events(
metadata,
- self.connection,
tables=event_collection,
checkfirst=self.checkfirst,
- _ddl_runner=self,
- )
+ ):
+ for seq in seq_coll:
+ self.traverse_single(seq, create_ok=True)
+
+ for table, fkcs in collection:
+ if table is not None:
+ self.traverse_single(
+ table,
+ create_ok=True,
+ include_foreign_key_constraints=fkcs,
+ _is_metadata_operation=True,
+ )
+ else:
+ for fkc in fkcs:
+ self.traverse_single(fkc)
def visit_table(
self,
if not create_ok and not self._can_create_table(table):
return
- table.dispatch.before_create(
+ with self.with_ddl_events(
table,
- self.connection,
checkfirst=self.checkfirst,
- _ddl_runner=self,
_is_metadata_operation=_is_metadata_operation,
- )
-
- for column in table.columns:
- if column.default is not None:
- self.traverse_single(column.default)
+ ):
- if not self.dialect.supports_alter:
- # e.g., don't omit any foreign key constraints
- include_foreign_key_constraints = None
+ for column in table.columns:
+ if column.default is not None:
+ self.traverse_single(column.default)
- CreateTable(
- table,
- include_foreign_key_constraints=include_foreign_key_constraints,
- )._invoke_with(self.connection)
+ if not self.dialect.supports_alter:
+ # e.g., don't omit any foreign key constraints
+ include_foreign_key_constraints = None
- if hasattr(table, "indexes"):
- for index in table.indexes:
- self.traverse_single(index, create_ok=True)
+ CreateTable(
+ table,
+ include_foreign_key_constraints=(
+ include_foreign_key_constraints
+ ),
+ )._invoke_with(self.connection)
- if self.dialect.supports_comments and not self.dialect.inline_comments:
- if table.comment is not None:
- SetTableComment(table)._invoke_with(self.connection)
+ if hasattr(table, "indexes"):
+ for index in table.indexes:
+ self.traverse_single(index, create_ok=True)
- for column in table.columns:
- if column.comment is not None:
- SetColumnComment(column)._invoke_with(self.connection)
+ if (
+ self.dialect.supports_comments
+ and not self.dialect.inline_comments
+ ):
+ if table.comment is not None:
+ SetTableComment(table)._invoke_with(self.connection)
- if self.dialect.supports_constraint_comments:
- for constraint in table.constraints:
- if constraint.comment is not None:
- self.connection.execute(
- SetConstraintComment(constraint)
- )
+ for column in table.columns:
+ if column.comment is not None:
+ SetColumnComment(column)._invoke_with(self.connection)
- table.dispatch.after_create(
- table,
- self.connection,
- checkfirst=self.checkfirst,
- _ddl_runner=self,
- _is_metadata_operation=_is_metadata_operation,
- )
+ if self.dialect.supports_constraint_comments:
+ for constraint in table.constraints:
+ if constraint.comment is not None:
+ self.connection.execute(
+ SetConstraintComment(constraint)
+ )
def visit_foreign_key_constraint(self, constraint):
if not self.dialect.supports_alter:
return
- AddConstraint(constraint)._invoke_with(self.connection)
+
+ with self.with_ddl_events(constraint):
+ AddConstraint(constraint)._invoke_with(self.connection)
def visit_sequence(self, sequence, create_ok=False):
if not create_ok and not self._can_create_sequence(sequence):
return
- CreateSequence(sequence)._invoke_with(self.connection)
+ with self.with_ddl_events(sequence):
+ CreateSequence(sequence)._invoke_with(self.connection)
def visit_index(self, index, create_ok=False):
if not create_ok and not self._can_create_index(index):
return
- CreateIndex(index)._invoke_with(self.connection)
+ with self.with_ddl_events(index):
+ CreateIndex(index)._invoke_with(self.connection)
-class SchemaDropper(InvokeDDLBase):
+class SchemaDropper(InvokeDropDDLBase):
def __init__(
self, dialect, connection, checkfirst=False, tables=None, **kwargs
):
- super(SchemaDropper, self).__init__(connection, **kwargs)
+ super().__init__(connection, **kwargs)
self.checkfirst = checkfirst
self.tables = tables
self.preparer = dialect.identifier_preparer
event_collection = [t for (t, fks) in collection if t is not None]
- metadata.dispatch.before_drop(
+ with self.with_ddl_events(
metadata,
- self.connection,
tables=event_collection,
checkfirst=self.checkfirst,
- _ddl_runner=self,
- )
+ ):
- for table, fkcs in collection:
- if table is not None:
- self.traverse_single(
- table,
- drop_ok=True,
- _is_metadata_operation=True,
- _ignore_sequences=seq_coll,
- )
- else:
- for fkc in fkcs:
- self.traverse_single(fkc)
+ for table, fkcs in collection:
+ if table is not None:
+ self.traverse_single(
+ table,
+ drop_ok=True,
+ _is_metadata_operation=True,
+ _ignore_sequences=seq_coll,
+ )
+ else:
+ for fkc in fkcs:
+ self.traverse_single(fkc)
- for seq in seq_coll:
- self.traverse_single(seq, drop_ok=seq.column is None)
-
- metadata.dispatch.after_drop(
- metadata,
- self.connection,
- tables=event_collection,
- checkfirst=self.checkfirst,
- _ddl_runner=self,
- )
+ for seq in seq_coll:
+ self.traverse_single(seq, drop_ok=seq.column is None)
def _can_drop_table(self, table):
self.dialect.validate_identifier(table.name)
if not drop_ok and not self._can_drop_index(index):
return
- DropIndex(index)(index, self.connection)
+ with self.with_ddl_events(index):
+ DropIndex(index)(index, self.connection)
def visit_table(
self,
if not drop_ok and not self._can_drop_table(table):
return
- table.dispatch.before_drop(
+ with self.with_ddl_events(
table,
- self.connection,
checkfirst=self.checkfirst,
- _ddl_runner=self,
_is_metadata_operation=_is_metadata_operation,
- )
-
- DropTable(table)._invoke_with(self.connection)
+ ):
- # traverse client side defaults which may refer to server-side
- # sequences. noting that some of these client side defaults may also be
- # set up as server side defaults (see https://docs.sqlalchemy.org/en/
- # latest/core/defaults.html#associating-a-sequence-as-the-server-side-
- # default), so have to be dropped after the table is dropped.
- for column in table.columns:
- if (
- column.default is not None
- and column.default not in _ignore_sequences
- ):
- self.traverse_single(column.default)
+ DropTable(table)._invoke_with(self.connection)
- table.dispatch.after_drop(
- table,
- self.connection,
- checkfirst=self.checkfirst,
- _ddl_runner=self,
- _is_metadata_operation=_is_metadata_operation,
- )
+ # traverse client side defaults which may refer to server-side
+ # sequences. noting that some of these client side defaults may
+ # also be set up as server side defaults
+ # (see https://docs.sqlalchemy.org/en/
+ # latest/core/defaults.html
+ # #associating-a-sequence-as-the-server-side-
+ # default), so have to be dropped after the table is dropped.
+ for column in table.columns:
+ if (
+ column.default is not None
+ and column.default not in _ignore_sequences
+ ):
+ self.traverse_single(column.default)
def visit_foreign_key_constraint(self, constraint):
if not self.dialect.supports_alter:
return
- DropConstraint(constraint)._invoke_with(self.connection)
+ with self.with_ddl_events(constraint):
+ DropConstraint(constraint)._invoke_with(self.connection)
def visit_sequence(self, sequence, drop_ok=False):
if not drop_ok and not self._can_drop_sequence(sequence):
return
- DropSequence(sequence)._invoke_with(self.connection)
+ with self.with_ddl_events(sequence):
+ DropSequence(sequence)._invoke_with(self.connection)
def sort_tables(
Define event listeners for schema objects,
that is, :class:`.SchemaItem` and other :class:`.SchemaEventTarget`
subclasses, including :class:`_schema.MetaData`, :class:`_schema.Table`,
- :class:`_schema.Column`.
+ :class:`_schema.Column`, etc.
- :class:`_schema.MetaData` and :class:`_schema.Table` support events
- specifically regarding when CREATE and DROP
- DDL is emitted to the database.
+ **Create / Drop Events**
- Attachment events are also provided to customize
- behavior whenever a child schema element is associated
- with a parent, such as, when a :class:`_schema.Column` is associated
- with its :class:`_schema.Table`, when a
- :class:`_schema.ForeignKeyConstraint`
- is associated with a :class:`_schema.Table`, etc.
+ Events emitted when CREATE and DROP commands are emitted to the database.
+ The event hooks in this category include :meth:`.DDLEvents.before_create`,
+ :meth:`.DDLEvents.after_create`, :meth:`.DDLEvents.before_drop`, and
+ :meth:`.DDLEvents.after_drop`.
+
+ These events are emitted when using schema-level methods such as
+ :meth:`.MetaData.create_all` and :meth:`.MetaData.drop_all`. Per-object
+ create/drop methods such as :meth:`.Table.create`, :meth:`.Table.drop`,
+ :meth:`.Index.create` are also included, as well as dialect-specific
+ methods such as :meth:`_postgresql.ENUM.create`.
- Example using the ``after_create`` event::
+ .. versionadded:: 2.0 :class:`.DDLEvents` event hooks now take place
+ for non-table objects including constraints, indexes, and
+ dialect-specific schema types.
+ Event hooks may be attached directly to a :class:`_schema.Table` object or
+ to a :class:`_schema.MetaData` collection, as well as to any
+ :class:`.SchemaItem` class or object that can be individually created and
+ dropped using a distinct SQL command. Such classes include :class:`.Index`,
+ :class:`.Sequence`, and dialect-specific classes such as
+ :class:`_postgresql.ENUM`.
+
+ Example using the :meth:`.DDLEvents.after_create` event, where a custom
+ event hook will emit an ``ALTER TABLE`` command on the current connection,
+ after ``CREATE TABLE`` is emitted::
+
+ from sqlalchemy import create_engine
from sqlalchemy import event
from sqlalchemy import Table, Column, Metadata, Integer
m = MetaData()
some_table = Table('some_table', m, Column('data', Integer))
+ @event.listens_for(some_table, "after_create")
def after_create(target, connection, **kw):
connection.execute(text(
"ALTER TABLE %s SET name=foo_%s" % (target.name, target.name)
))
- event.listen(some_table, "after_create", after_create)
+
+ some_engine = create_engine("postgresql://scott:tiger@host/test")
+
+ # will emit "CREATE TABLE some_table" as well as the above
+ # "ALTER TABLE" statement afterwards
+ m.create_all(some_engine)
+
+ Constraint objects such as :class:`.ForeignKeyConstraint`,
+ :class:`.UniqueConstraint`, :class:`.CheckConstraint` may also be
+ subscribed to these events, however they will **not** normally produce
+ events as these objects are usually rendered inline within an
+ enclosing ``CREATE TABLE`` statement and implicitly dropped from a
+ ``DROP TABLE`` statement.
+
+ For the :class:`.Index` construct, the event hook will be emitted
+ for ``CREATE INDEX``, however SQLAlchemy does not normally emit
+ ``DROP INDEX`` when dropping tables as this is again implicit within the
+ ``DROP TABLE`` statement.
+
+ .. versionadded:: 2.0 Support for :class:`.SchemaItem` objects
+ for create/drop events was expanded from its previous support for
+ :class:`.MetaData` and :class:`.Table` to also include
+ :class:`.Constraint` and all subclasses, :class:`.Index`,
+ :class:`.Sequence` and some type-related constructs such as
+ :class:`_postgresql.ENUM`.
+
+ .. note:: These event hooks are only emitted within the scope of
+ SQLAlchemy's create/drop methods; they are not necessarily supported
+ by tools such as `alembic <https://alembic.sqlalchemy.org>`_.
+
+
+ **Attachment Events**
+
+ Attachment events are provided to customize
+ behavior whenever a child schema element is associated
+ with a parent, such as when a :class:`_schema.Column` is associated
+ with its :class:`_schema.Table`, when a
+ :class:`_schema.ForeignKeyConstraint`
+ is associated with a :class:`_schema.Table`, etc. These events include
+ :meth:`.DDLEvents.before_parent_attach` and
+ :meth:`.DDLEvents.after_parent_attach`.
+
+ **Reflection Events**
+
+ The :meth:`.DDLEvents.column_reflect` event is used to intercept
+ and modify the in-Python definition of database columns when
+ :term:`reflection` of database tables proceeds.
+
+ **Use with Generic DDL**
DDL events integrate closely with the
:class:`.DDL` class and the :class:`.ExecutableDDLElement` hierarchy
DDL("ALTER TABLE %(table)s SET name=foo_%(table)s")
)
- The methods here define the name of an event as well
- as the names of members that are passed to listener
- functions.
+ **Event Propagation to MetaData Copies**
For all :class:`.DDLEvent` events, the ``propagate=True`` keyword argument
will ensure that a given event handler is propagated to copies of the
method::
from sqlalchemy import DDL
+
+ metadata = MetaData()
+ some_table = Table("some_table", metadata, Column("data", Integer))
+
event.listen(
some_table,
"after_create",
propagate=True
)
+ new_metadata = MetaData()
new_table = some_table.to_metadata(new_metadata)
- The above :class:`.DDL` object will also be associated with the
- :class:`_schema.Table` object represented by ``new_table``.
+ The above :class:`.DDL` object will be associated with the
+ :meth:`.DDLEvents.after_create` event for both the ``some_table`` and
+ the ``new_table`` :class:`.Table` objects.
.. seealso::
) -> None:
r"""Called before CREATE statements are emitted.
- :param target: the :class:`_schema.MetaData` or :class:`_schema.Table`
+ :param target: the :class:`.SchemaObject`, such as a
+ :class:`_schema.MetaData` or :class:`_schema.Table`
+ but also including all create/drop objects such as
+ :class:`.Index`, :class:`.Sequence`, etc.,
object which is the target of the event.
+
+ .. versionadded:: 2.0 Support for all :class:`.SchemaItem` objects
+ was added.
+
:param connection: the :class:`_engine.Connection` where the
CREATE statement or statements will be emitted.
:param \**kw: additional keyword arguments relevant
) -> None:
r"""Called after CREATE statements are emitted.
- :param target: the :class:`_schema.MetaData` or :class:`_schema.Table`
+ :param target: the :class:`.SchemaObject`, such as a
+ :class:`_schema.MetaData` or :class:`_schema.Table`
+ but also including all create/drop objects such as
+ :class:`.Index`, :class:`.Sequence`, etc.,
object which is the target of the event.
+
+ .. versionadded:: 2.0 Support for all :class:`.SchemaItem` objects
+ was added.
+
:param connection: the :class:`_engine.Connection` where the
CREATE statement or statements have been emitted.
:param \**kw: additional keyword arguments relevant
) -> None:
r"""Called before DROP statements are emitted.
- :param target: the :class:`_schema.MetaData` or :class:`_schema.Table`
+ :param target: the :class:`.SchemaObject`, such as a
+ :class:`_schema.MetaData` or :class:`_schema.Table`
+ but also including all create/drop objects such as
+ :class:`.Index`, :class:`.Sequence`, etc.,
object which is the target of the event.
+
+ .. versionadded:: 2.0 Support for all :class:`.SchemaItem` objects
+ was added.
+
:param connection: the :class:`_engine.Connection` where the
DROP statement or statements will be emitted.
:param \**kw: additional keyword arguments relevant
) -> None:
r"""Called after DROP statements are emitted.
- :param target: the :class:`_schema.MetaData` or :class:`_schema.Table`
+ :param target: the :class:`.SchemaObject`, such as a
+ :class:`_schema.MetaData` or :class:`_schema.Table`
+ but also including all create/drop objects such as
+ :class:`.Index`, :class:`.Sequence`, etc.,
object which is the target of the event.
+
+ .. versionadded:: 2.0 Support for all :class:`.SchemaItem` objects
+ was added.
+
:param connection: the :class:`_engine.Connection` where the
DROP statement or statements have been emitted.
:param \**kw: additional keyword arguments relevant
inherit_schema: bool = False,
quote: Optional[bool] = None,
_create_events: bool = True,
+ _adapted_from: Optional[SchemaType] = None,
):
if name is not None:
self.name = quoted_name(name, quote)
util.portable_instancemethod(self._on_metadata_drop),
)
+ if _adapted_from:
+ self.dispatch = self.dispatch._join(_adapted_from.dispatch)
+
def _set_parent(self, column, **kw):
# set parent hook is when this type is associated with a column.
# Column calls it for all SchemaEventTarget instances, either the
self, cls: Type[Union[TypeEngine[Any], TypeEngineMixin]], **kw: Any
) -> TypeEngine[Any]:
kw.setdefault("_create_events", False)
+ kw.setdefault("_adapted_from", self)
return super().adapt(cls, **kw)
def create(self, bind, checkfirst=False):
inherit_schema=kw.pop("inherit_schema", False),
quote=kw.pop("quote", None),
_create_events=kw.pop("_create_events", True),
+ _adapted_from=kw.pop("_adapted_from", None),
)
def _parse_into_values(self, enums, kw):
create_constraint: bool = False,
name: Optional[str] = None,
_create_events: bool = True,
+ _adapted_from: Optional[SchemaType] = None,
):
"""Construct a Boolean.
self.create_constraint = create_constraint
self.name = name
self._create_events = _create_events
+ if _adapted_from:
+ self.dispatch = self.dispatch._join(_adapted_from.dispatch)
def _should_create_constraint(self, compiler, **kw):
if not self._is_impl_for_variant(compiler.dialect, kw):
from sqlalchemy.testing.suite import test_types as suite
from sqlalchemy.testing.util import round_decimal
from sqlalchemy.types import UserDefinedType
+from ...engine.test_ddlevents import DDLEventWCreateHarness
class FloatCoercionTest(fixtures.TablesTest, AssertsExecutionResults):
]
+class DomainDDLEventTest(DDLEventWCreateHarness, fixtures.TestBase):
+ __backend__ = True
+
+ __only_on__ = "postgresql > 8.3"
+
+ creates_implicitly_with_table = False
+ drops_implicitly_with_table = False
+ requires_table_to_exist = False
+
+ @testing.fixture
+ def produce_subject(self):
+ return DOMAIN(
+ name="email",
+ data_type=Text,
+ check=r"VALUE ~ '[^@]+@[^@]+\.[^@]+'",
+ )
+
+ @testing.fixture
+ def produce_table_integrated_subject(self, metadata, produce_subject):
+ return Table(
+ "table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("email", produce_subject),
+ )
+
+
+class EnumDDLEventTest(DDLEventWCreateHarness, fixtures.TestBase):
+ __backend__ = True
+
+ __only_on__ = "postgresql > 8.3"
+
+ creates_implicitly_with_table = False
+ drops_implicitly_with_table = False
+ requires_table_to_exist = False
+
+ @testing.fixture
+ def produce_subject(self):
+ return Enum(
+ "x",
+ "y",
+ "z",
+ name="status",
+ )
+
+ @testing.fixture
+ def produce_event_target(self, produce_subject, connection):
+ return produce_subject.dialect_impl(connection.dialect)
+
+ @testing.fixture
+ def produce_table_integrated_subject(self, metadata, produce_subject):
+ return Table(
+ "table",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("status", produce_subject),
+ )
+
+
+class NativeEnumDDLEventTest(EnumDDLEventTest):
+ @testing.fixture
+ def produce_event_target(self, produce_subject, connection):
+ return produce_subject
+
+ @testing.fixture
+ def produce_subject(self):
+ return ENUM(
+ "x",
+ "y",
+ "z",
+ name="status",
+ )
+
+
class OIDTest(fixtures.TestBase):
__only_on__ = "postgresql"
__backend__ = True
from sqlalchemy.schema import CheckConstraint
from sqlalchemy.schema import DDL
from sqlalchemy.schema import DropConstraint
+from sqlalchemy.schema import ForeignKeyConstraint
+from sqlalchemy.schema import Sequence
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
eq_(metadata_canary.mock_calls, [])
+class DDLEventHarness:
+ creates_implicitly_with_table = True
+ drops_implicitly_with_table = True
+
+ @testing.fixture
+ def produce_subject(self):
+ raise NotImplementedError()
+
+ @testing.fixture
+ def produce_event_target(self, produce_subject, connection):
+ """subclasses may want to override this for cases where the target
+ sent to the event is not the same object as that which was
+ listened on.
+
+ the example here is for :class:`.SchemaType` objects like
+ :class:`.Enum` that produce a dialect-specific implementation
+ which is where the actual CREATE/DROP happens.
+
+ """
+ return produce_subject
+
+ @testing.fixture
+ def produce_table_integrated_subject(self, metadata, produce_subject):
+ raise NotImplementedError()
+
+ def test_table_integrated(
+ self,
+ metadata,
+ connection,
+ produce_subject,
+ produce_table_integrated_subject,
+ produce_event_target,
+ ):
+ subject = produce_subject
+ assert_subject = produce_event_target
+
+ canary = mock.Mock()
+ event.listen(subject, "before_create", canary.before_create)
+ event.listen(subject, "after_create", canary.after_create)
+ event.listen(subject, "before_drop", canary.before_drop)
+ event.listen(subject, "after_drop", canary.after_drop)
+
+ metadata.create_all(connection, checkfirst=False)
+
+ if self.creates_implicitly_with_table:
+ create_calls = []
+ else:
+ create_calls = [
+ mock.call.before_create(
+ assert_subject,
+ connection,
+ _ddl_runner=mock.ANY,
+ ),
+ mock.call.after_create(
+ assert_subject,
+ connection,
+ _ddl_runner=mock.ANY,
+ ),
+ ]
+ eq_(canary.mock_calls, create_calls)
+ metadata.drop_all(connection, checkfirst=False)
+
+ if self.drops_implicitly_with_table:
+ eq_(canary.mock_calls, create_calls + [])
+ else:
+ eq_(
+ canary.mock_calls,
+ create_calls
+ + [
+ mock.call.before_drop(
+ assert_subject,
+ connection,
+ _ddl_runner=mock.ANY,
+ ),
+ mock.call.after_drop(
+ assert_subject,
+ connection,
+ _ddl_runner=mock.ANY,
+ ),
+ ],
+ )
+
+
+class DDLEventWCreateHarness(DDLEventHarness):
+
+ requires_table_to_exist = True
+
+ def test_straight_create_drop(
+ self,
+ metadata,
+ connection,
+ produce_subject,
+ produce_table_integrated_subject,
+ produce_event_target,
+ ):
+ subject = produce_subject
+ assert_subject = produce_event_target
+
+ if self.requires_table_to_exist:
+ metadata.create_all(connection, checkfirst=False)
+ subject.drop(connection)
+
+ canary = mock.Mock()
+ event.listen(subject, "before_create", canary.before_create)
+ event.listen(subject, "after_create", canary.after_create)
+ event.listen(subject, "before_drop", canary.before_drop)
+ event.listen(subject, "after_drop", canary.after_drop)
+
+ subject.create(connection)
+
+ eq_(
+ canary.mock_calls,
+ [
+ mock.call.before_create(
+ assert_subject,
+ connection,
+ _ddl_runner=mock.ANY,
+ ),
+ mock.call.after_create(
+ assert_subject,
+ connection,
+ _ddl_runner=mock.ANY,
+ ),
+ ],
+ )
+
+ subject.drop(connection)
+
+ eq_(
+ canary.mock_calls,
+ [
+ mock.call.before_create(
+ assert_subject,
+ connection,
+ _ddl_runner=mock.ANY,
+ ),
+ mock.call.after_create(
+ assert_subject,
+ connection,
+ _ddl_runner=mock.ANY,
+ ),
+ mock.call.before_drop(
+ assert_subject,
+ connection,
+ _ddl_runner=mock.ANY,
+ ),
+ mock.call.after_drop(
+ assert_subject,
+ connection,
+ _ddl_runner=mock.ANY,
+ ),
+ ],
+ )
+
+
+class SequenceDDLEventTest(DDLEventWCreateHarness, fixtures.TestBase):
+ __requires__ = ("sequences",)
+
+ creates_implicitly_with_table = False
+ drops_implicitly_with_table = False
+ supports_standalone_create = True
+
+ @testing.fixture
+ def produce_subject(self):
+ return Sequence("my_seq")
+
+ @testing.fixture
+ def produce_table_integrated_subject(self, metadata, produce_subject):
+ return Table(
+ "t",
+ metadata,
+ Column("id", Integer, produce_subject, primary_key=True),
+ )
+
+
+class IndexDDLEventTest(DDLEventWCreateHarness, fixtures.TestBase):
+ creates_implicitly_with_table = False
+ drops_implicitly_with_table = True
+ supports_standalone_create = False
+
+ @testing.fixture
+ def produce_subject(self):
+ return Index("my_idx", "key")
+
+ @testing.fixture
+ def produce_table_integrated_subject(self, metadata, produce_subject):
+ return Table(
+ "t",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("key", String(50)),
+ produce_subject,
+ )
+
+
+class ForeignKeyConstraintDDLEventTest(DDLEventHarness, fixtures.TestBase):
+ creates_implicitly_with_table = True
+ drops_implicitly_with_table = True
+ supports_standalone_create = False
+
+ @testing.fixture
+ def produce_subject(self):
+ return ForeignKeyConstraint(["related_id"], ["related.id"], name="fkc")
+
+ @testing.fixture
+ def produce_table_integrated_subject(self, metadata, produce_subject):
+ Table(
+ "t",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("related_id", Integer),
+ produce_subject,
+ )
+ Table("related", metadata, Column("id", Integer, primary_key=True))
+
+
class DDLExecutionTest(AssertsCompiledSQL, fixtures.TestBase):
def setup_test(self):
self.engine = engines.mock_engine()