args = []
for c in self.columns:
args.append(c.copy(schema=schema))
- for c in self.constraints:
- args.append(c.copy(schema=schema))
table = Table(
self.name, metadata, schema=schema,
*args, **self.kwargs
)
+ for c in self.constraints:
+ table.append_constraint(c.copy(schema=schema, target_table=table))
+
for index in self.indexes:
# skip indexes that would be generated
# by the 'index' flag on Column
"""
def __init__(self, sqltext, name=None, deferrable=None,
- initially=None, table=None, _create_rule=None):
+ initially=None, table=None, _create_rule=None,
+ _autoattach=True):
"""Construct a CHECK constraint.
:param sqltext:
self.sqltext = expression._literal_as_text(sqltext)
if table is not None:
self._set_parent_with_dispatch(table)
- else:
+ elif _autoattach:
cols = sqlutil.find_columns(self.sqltext)
tables = set([c.table for c in cols
if c.table is not None])
return "column_check_constraint"
__visit_name__ = property(__visit_name__)
- def copy(self, **kw):
- c = CheckConstraint(self.sqltext,
+ def copy(self, target_table=None, **kw):
+ if target_table is not None:
+ def replace(col):
+ if self.table.c.contains_column(col):
+ return target_table.c[col.key]
+ else:
+ return None
+ sqltext = visitors.replacement_traverse(self.sqltext, {}, replace)
+ else:
+ sqltext = self.sqltext
+ c = CheckConstraint(sqltext,
name=self.name,
initially=self.initially,
deferrable=self.deferrable,
- _create_rule=self._create_rule)
+ _create_rule=self._create_rule,
+ table=target_table,
+ _autoattach=False)
c.dispatch._update(self.dispatch)
return c
event.listen(table.metadata, "before_drop",
DropConstraint(self, on=supports_alter))
- def copy(self, **kw):
+ def copy(self, schema=None, **kw):
fkc = ForeignKeyConstraint(
[x.parent.name for x in self._elements.values()],
- [x._get_colspec(**kw) for x in self._elements.values()],
+ [x._get_colspec(schema=schema) for x in self._elements.values()],
name=self.name,
onupdate=self.onupdate,
ondelete=self.ondelete,
if to_inspect is None:
to_inspect = obj
+ missing = object()
+
def genargs():
try:
(args, vargs, vkw, defaults) = \
yield repr(getattr(obj, arg, None))
for (arg, defval) in zip(args[-default_len:], defaults):
try:
- val = getattr(obj, arg, None)
- if val != defval:
+ val = getattr(obj, arg, missing)
+ if val is not missing and val != defval:
yield '%s=%r' % (arg, val)
except:
pass
if additional_kw:
for arg, defval in additional_kw:
try:
- val = getattr(obj, arg, None)
- if val != defval:
+ val = getattr(obj, arg, missing)
+ if val is not missing and val != defval:
yield '%s=%r' % (arg, val)
except:
pass
c = Index('foo', t.c.a)
assert c in t.indexes
+ def test_tometadata_ok(self):
+ m = MetaData()
+
+ t = Table('tbl', m,
+ Column('a', Integer),
+ Column('b', Integer)
+ )
+
+ t2 = Table('t2', m,
+ Column('a', Integer),
+ Column('b', Integer)
+ )
+
+ uq = UniqueConstraint(t.c.a)
+ ck = CheckConstraint(t.c.a > 5)
+ fk = ForeignKeyConstraint([t.c.a], [t2.c.a])
+ pk = PrimaryKeyConstraint(t.c.a)
+
+ m2 = MetaData()
+
+ t3 = t.tometadata(m2)
+
+ eq_(len(t3.constraints), 4)
+
+ for c in t3.constraints:
+ assert c.table is t3
+
+ def test_check_constraint_copy(self):
+ m = MetaData()
+ t = Table('tbl', m,
+ Column('a', Integer),
+ Column('b', Integer)
+ )
+ ck = CheckConstraint(t.c.a > 5)
+ ck2 = ck.copy()
+ assert ck in t.constraints
+ assert ck2 not in t.constraints
+
+
def test_ambig_check_constraint_auto_append(self):
m = MetaData()