return schema + "." + name
+# this should really be in sql/util.py but we'd have to
+# break an import cycle
+def _copy_expression(expression, source_table, target_table):
+ def replace(col):
+ if source_table.c.contains_column(col):
+ return target_table.c[col.key]
+ else:
+ return None
+ return visitors.replacement_traverse(expression, {}, replace)
+
+
@inspection._self_inspects
class SchemaItem(SchemaEventTarget, visitors.Visitable):
"""Base class for items that define a database schema."""
elif not c._type_bound:
# skip unique constraints that would be generated
# by the 'unique' flag on Column
- if isinstance(c, UniqueConstraint) and \
- len(c.columns) == 1 and \
- list(c.columns)[0].unique:
+ if c._column_flag:
continue
table.append_constraint(
for index in self.indexes:
# skip indexes that would be generated
# by the 'index' flag on Column
- if len(index.columns) == 1 and \
- list(index.columns)[0].index:
+ if index._column_flag:
continue
Index(index.name,
unique=index.unique,
- *[table.c[col] for col in index.columns.keys()],
+ *[_copy_expression(expr, self, table)
+ for expr in index.expressions],
+ _table=table,
**index.kwargs)
return self._schema_item_copy(table)
"The 'index' keyword argument on Column is boolean only. "
"To create indexes with a specific name, create an "
"explicit Index object external to the Table.")
- Index(None, self, unique=bool(self.unique))
+ Index(None, self, unique=bool(self.unique), _column_flag=True)
elif self.unique:
if isinstance(self.unique, util.string_types):
raise exc.ArgumentError(
"specific name, append an explicit UniqueConstraint to "
"the Table's list of elements, or create an explicit "
"Index object external to the Table.")
- table.append_constraint(UniqueConstraint(self.key))
+ table.append_constraint(
+ UniqueConstraint(self.key, _column_flag=True))
self._setup_on_memoized_fks(lambda fk: fk._set_remote_table(table))
def __init__(self, *columns, **kw):
_autoattach = kw.pop('_autoattach', True)
+ self._column_flag = kw.pop('_column_flag', False)
self.columns = ColumnCollection()
self._pending_colargs = [_to_schema_column_or_string(c)
for c in columns]
"""
_autoattach = kw.pop('_autoattach', True)
+ _column_flag = kw.pop('_column_flag', False)
Constraint.__init__(self, **kw)
- ColumnCollectionMixin.__init__(self, *columns, _autoattach=_autoattach)
+ ColumnCollectionMixin.__init__(
+ self, *columns, _autoattach=_autoattach, _column_flag=_column_flag)
columns = None
"""A :class:`.ColumnCollection` representing the set of columns
def copy(self, target_table=None, **kw):
if target_table is not None:
- def replace(col):
- if self.table.c.contains_column(col):
- return target_table.c[col.key]
- else:
- return None
- sqltext = visitors.replacement_traverse(self.sqltext, {}, replace)
+ sqltext = _copy_expression(
+ self.sqltext, self.table, target_table)
else:
sqltext = self.sqltext
c = CheckConstraint(sqltext,
self.expressions = processed_expressions
self.name = quoted_name(name, kw.pop("quote", None))
self.unique = kw.pop('unique', False)
+ _column_flag = kw.pop('_column_flag', False)
if 'info' in kw:
self.info = kw.pop('info')
# will call _set_parent() if table-bound column
# objects are present
- ColumnCollectionMixin.__init__(self, *columns)
+ ColumnCollectionMixin.__init__(
+ self, *columns, _column_flag=_column_flag)
if table is not None:
self._set_parent(table)
Column('id', Integer, primary_key=True),
Column('data1', Integer, index=True),
Column('data2', Integer),
+ Index('text', text('data1 + 1')),
)
- Index('multi', table.c.data1, table.c.data2),
+ Index('multi', table.c.data1, table.c.data2)
+ Index('func', func.abs(table.c.data1))
+ Index('multi-func', table.c.data1, func.abs(table.c.data2))
meta2 = MetaData()
table_c = table.tometadata(meta2)
def _get_key(i):
return [i.name, i.unique] + \
sorted(i.kwargs.items()) + \
- list(i.columns.keys())
+ [str(col) for col in i.expressions]
eq_(
sorted([_get_key(i) for i in table.indexes]),