unique=index.unique,
*[table.c[col] for col in index.columns.keys()],
**index.kwargs)
+ table.dispatch._update(self.dispatch)
return table
class Column(SchemaItem, expression.ColumnClause):
[c.copy(**kw) for c in self.constraints] + \
[c.copy(**kw) for c in self.foreign_keys if not c.constraint]
- return Column(
+ c = Column(
name=self.name,
type_=self.type,
key = self.key,
doc=self.doc,
*args
)
+ c.dispatch._update(self.dispatch)
+ return c
def _make_proxy(self, selectable, name=None):
"""Create a *proxy* for this column.
"""
- return ForeignKey(
+ fk = ForeignKey(
self._get_colspec(schema=schema),
use_alter=self.use_alter,
name=self.name,
initially=self.initially,
link_to_name=self.link_to_name
)
+ fk.dispatch._update(self.dispatch)
+ return fk
def _get_colspec(self, schema=None):
"""Return a string based 'column specification' for this :class:`ForeignKey`.
return x in self.columns
def copy(self, **kw):
- return self.__class__(name=self.name, deferrable=self.deferrable,
+ c = self.__class__(name=self.name, deferrable=self.deferrable,
initially=self.initially, *self.columns.keys())
+ c.dispatch._update(self.dispatch)
+ return c
def contains_column(self, col):
return self.columns.contains_column(col)
__visit_name__ = property(__visit_name__)
def copy(self, **kw):
- return CheckConstraint(self.sqltext,
+ c = CheckConstraint(self.sqltext,
name=self.name,
initially=self.initially,
deferrable=self.deferrable,
_create_rule=self._create_rule)
+ c.dispatch._update(self.dispatch)
+ return c
class ForeignKeyConstraint(Constraint):
"""A table-level FOREIGN KEY constraint.
def copy(self, **kw):
- return ForeignKeyConstraint(
+ fkc = ForeignKeyConstraint(
[x.parent.name for x in self._elements.values()],
[x._get_colspec(**kw) for x in self._elements.values()],
name=self.name,
initially=self.initially,
link_to_name=self.link_to_name
)
+ fkc.dispatch._update(self.dispatch)
+ return fkc
class PrimaryKeyConstraint(ColumnCollectionConstraint):
"""A table-level PRIMARY KEY constraint.
"""
from test.lib import testing
-from sqlalchemy import schema
+from sqlalchemy import schema, event
__all__ = 'Table', 'Column',
if 'test_needs_autoincrement' in test_opts and \
kw.get('primary_key', False) and \
testing.against('firebird', 'oracle'):
- def add_seq(tbl, c):
+ def add_seq(c, tbl):
c._init_items(
schema.Sequence(_truncate_name(testing.db.dialect, tbl.name + '_' + c.name + '_seq'), optional=True)
)
- col._on_table_attach(add_seq)
+ event.listen(col, 'after_parent_attach', add_seq, propagate=True)
return col
def _truncate_name(dialect, name):