]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
The resolution of :class:`.ForeignKey` objects to their
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 23 Jun 2013 18:03:47 +0000 (14:03 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 23 Jun 2013 19:58:07 +0000 (15:58 -0400)
target :class:`.Column` has been reworked to be as
immediate as possible, based on the moment that the
target :class:`.Column` is associated with the same
:class:`.MetaData` as this :class:`.ForeignKey`, rather
than waiting for the first time a join is constructed,
or similar. This along with other improvements allows
earlier detection of some foreign key configuration
issues.  Also included here is a rework of the
type-propagation system, so that
it should be reliable now to set the type as ``None``
on any :class:`.Column` that refers to another via
:class:`.ForeignKey` - the type will be copied from the
target column as soon as that other column is associated,
and now works for composite foreign keys as well.
[ticket:1765]

doc/build/changelog/changelog_09.rst
lib/sqlalchemy/schema.py
lib/sqlalchemy/sql/compiler.py
test/orm/inheritance/test_basic.py
test/sql/test_metadata.py
test/sql/test_selectable.py

index d3aa7815f8e5ef0ce20d0138c5494b40ccedb871..a00b63aec21ad0233090dd380d95956df44a5aa2 100644 (file)
@@ -6,6 +6,26 @@
 .. changelog::
     :version: 0.9.0
 
+    .. change::
+        :tags: bug, sql
+        :tickets: 1765
+
+        The resolution of :class:`.ForeignKey` objects to their
+        target :class:`.Column` has been reworked to be as
+        immediate as possible, based on the moment that the
+        target :class:`.Column` is associated with the same
+        :class:`.MetaData` as this :class:`.ForeignKey`, rather
+        than waiting for the first time a join is constructed,
+        or similar. This along with other improvements allows
+        earlier detection of some foreign key configuration
+        issues.  Also included here is a rework of the
+        type-propagation system, so that
+        it should be reliable now to set the type as ``None``
+        on any :class:`.Column` that refers to another via
+        :class:`.ForeignKey` - the type will be copied from the
+        target column as soon as that other column is associated,
+        and now works for composite foreign keys as well.
+
     .. change::
         :tags: feature, sql
         :tickets: 2744, 2734
index 94df6751c747d702e7f61d85c461281c22fa65d4..d2df3de1d724f076c410436d2a15257b488340f0 100644 (file)
@@ -32,6 +32,7 @@ import re
 import inspect
 from . import exc, util, dialects, event, events, inspection
 from .sql import expression, visitors
+import collections
 
 ddl = util.importlater("sqlalchemy.engine", "ddl")
 sqlutil = util.importlater("sqlalchemy.sql", "util")
@@ -728,11 +729,19 @@ class Column(SchemaItem, expression.ColumnClause):
           The ``type`` argument may be the second positional argument
           or specified by keyword.
 
-          There is partial support for automatic detection of the
-          type based on that of a :class:`.ForeignKey` associated
-          with this column, if the type is specified as ``None``.
-          However, this feature is not fully implemented and
-          may not function in all cases.
+          If the ``type`` is ``None``, it will first default to the special
+          type :class:`.NullType`.  If and when this :class:`.Column` is
+          made to refer to another column using :class:`.ForeignKey`
+          and/or :class:`.ForeignKeyConstraint`, the type of the remote-referenced
+          column will be copied to this column as well, at the moment that
+          the foreign key is resolved against that remote :class:`.Column`
+          object.
+
+          .. versionchanged:: 0.9.0
+
+            Support for propagation of type to a :class:`.Column` from its
+            :class:`.ForeignKey` object has been improved and should be
+            more reliable and timely.
 
         :param \*args: Additional positional arguments include various
           :class:`.SchemaItem` derived constructs which will be applied
@@ -914,8 +923,6 @@ class Column(SchemaItem, expression.ColumnClause):
                         "May not pass type_ positionally and as a keyword.")
                 type_ = args.pop(0)
 
-        no_type = type_ is None
-
         super(Column, self).__init__(name, None, type_)
         self.key = kwargs.pop('key', name)
         self.primary_key = kwargs.pop('primary_key', False)
@@ -969,9 +976,6 @@ class Column(SchemaItem, expression.ColumnClause):
                                             for_update=True))
         self._init_items(*args)
 
-        if not self.foreign_keys and no_type:
-            raise exc.ArgumentError("'type' is required on Column objects "
-                                        "which have no foreign keys.")
         util.set_creation_order(self)
 
         if 'info' in kwargs:
@@ -1082,6 +1086,11 @@ class Column(SchemaItem, expression.ColumnClause):
                     "Index object external to the Table.")
             table.append_constraint(UniqueConstraint(self.key))
 
+        fk_key = (table.key, self.key)
+        if fk_key in self.table.metadata._fk_memos:
+            for fk in self.table.metadata._fk_memos[fk_key]:
+                fk._set_remote_table(table)
+
     def _on_table_attach(self, fn):
         if self.table is not None:
             fn(self, self.table)
@@ -1280,7 +1289,7 @@ class ForeignKey(SchemaItem):
         # object passes itself in when creating ForeignKey
         # markers.
         self.constraint = _constraint
-
+        self.parent = None
         self.use_alter = use_alter
         self.name = name
         self.onupdate = onupdate
@@ -1343,6 +1352,7 @@ class ForeignKey(SchemaItem):
 
         return "%s.%s" % (_column.table.fullname, _column.key)
 
+
     target_fullname = property(_get_colspec)
 
     def references(self, table):
@@ -1362,132 +1372,199 @@ class ForeignKey(SchemaItem):
 
         return table.corresponding_column(self.column)
 
+    @util.memoized_property
+    def _column_tokens(self):
+        """parse a string-based _colspec into its component parts."""
+
+        m = self._colspec.split('.')
+        if m is None:
+            raise exc.ArgumentError(
+                "Invalid foreign key column specification: %s" %
+                self._colspec)
+        if (len(m) == 1):
+            tname = m.pop()
+            colname = None
+        else:
+            colname = m.pop()
+            tname = m.pop()
+
+        # A FK between column 'bar' and table 'foo' can be
+        # specified as 'foo', 'foo.bar', 'dbo.foo.bar',
+        # 'otherdb.dbo.foo.bar'. Once we have the column name and
+        # the table name, treat everything else as the schema
+        # name. Some databases (e.g. Sybase) support
+        # inter-database foreign keys. See tickets#1341 and --
+        # indirectly related -- Ticket #594. This assumes that '.'
+        # will never appear *within* any component of the FK.
+
+        if (len(m) > 0):
+            schema = '.'.join(m)
+        else:
+            schema = None
+        return schema, tname, colname
+
+    def _table_key(self):
+        if isinstance(self._colspec, util.string_types):
+            schema, tname, colname = self._column_tokens
+            return _get_table_key(tname, schema)
+        elif hasattr(self._colspec, '__clause_element__'):
+            _column = self._colspec.__clause_element__()
+        else:
+            _column = self._colspec
+
+        if _column.table is None:
+            return None
+        else:
+            return _column.table.key
+
+    def _resolve_col_tokens(self):
+        if self.parent is None:
+            raise exc.InvalidRequestError(
+                    "this ForeignKey object does not yet have a "
+                    "parent Column associated with it.")
+
+        elif self.parent.table is None:
+            raise exc.InvalidRequestError(
+                    "this ForeignKey's parent column is not yet associated "
+                    "with a Table.")
+
+        parenttable = self.parent.table
+
+        # assertion, can be commented out.
+        # basically Column._make_proxy() sends the actual
+        # target Column to the ForeignKey object, so the
+        # string resolution here is never called.
+        for c in self.parent.base_columns:
+            if isinstance(c, Column):
+                assert c.table is parenttable
+                break
+        else:
+            assert False
+        ######################
+
+        schema, tname, colname = self._column_tokens
+
+        if schema is None and parenttable.metadata.schema is not None:
+            schema = parenttable.metadata.schema
+
+        tablekey = _get_table_key(tname, schema)
+        return parenttable, tablekey, colname
+
+
+    def _link_to_col_by_colstring(self, parenttable, table, colname):
+        if not hasattr(self.constraint, '_referred_table'):
+            self.constraint._referred_table = table
+        else:
+            assert self.constraint._referred_table is table
+
+        _column = None
+        if colname is None:
+            # colname is None in the case that ForeignKey argument
+            # was specified as table name only, in which case we
+            # match the column name to the same column on the
+            # parent.
+            key = self.parent
+            _column = table.c.get(self.parent.key, None)
+        elif self.link_to_name:
+            key = colname
+            for c in table.c:
+                if c.name == colname:
+                    _column = c
+        else:
+            key = colname
+            _column = table.c.get(colname, None)
+
+        if _column is None:
+            raise exc.NoReferencedColumnError(
+                "Could not initialize target column for ForeignKey '%s' on table '%s': "
+                "table '%s' has no column named '%s'" % (
+                self._colspec, parenttable.name, table.name, key),
+                table.name, key)
+
+        self._set_target_column(_column)
+
+    def _set_target_column(self, column):
+        # propagate TypeEngine to parent if it didn't have one
+        if isinstance(self.parent.type, sqltypes.NullType):
+            self.parent.type = column.type
+
+        # super-edgy case, if other FKs point to our column,
+        # they'd get the type propagated out also.
+        if isinstance(self.parent.table, Table):
+            fk_key = (self.parent.table.key, self.parent.key)
+            if fk_key in self.parent.table.metadata._fk_memos:
+                for fk in self.parent.table.metadata._fk_memos[fk_key]:
+                    if isinstance(fk.parent.type, sqltypes.NullType):
+                        fk.parent.type = column.type
+
+        self.column = column
+
     @util.memoized_property
     def column(self):
         """Return the target :class:`.Column` referenced by this
         :class:`.ForeignKey`.
 
-        If this :class:`.ForeignKey` was created using a
-        string-based target column specification, this
-        attribute will on first access initiate a resolution
-        process to locate the referenced remote
-        :class:`.Column`.  The resolution process traverses
-        to the parent :class:`.Column`, :class:`.Table`, and
-        :class:`.MetaData` to proceed - if any of these aren't
-        yet present, an error is raised.
+        If no target column has been established, an exception
+        is raised.
 
-        """
-        # ForeignKey inits its remote column as late as possible, so tables
-        # can be defined without dependencies
-        if isinstance(self._colspec, util.string_types):
-            # locate the parent table this foreign key is attached to.  we
-            # use the "original" column which our parent column represents
-            # (its a list of columns/other ColumnElements if the parent
-            # table is a UNION)
-            for c in self.parent.base_columns:
-                if isinstance(c, Column):
-                    parenttable = c.table
-                    break
-            else:
-                raise exc.ArgumentError(
-                    "Parent column '%s' does not descend from a "
-                    "table-attached Column" % str(self.parent))
+        .. versionchanged:: 0.9.0
 
-            m = self._colspec.split('.')
+            Foreign key target column resolution now occurs as soon as both
+            the ForeignKey object and the remote Column to which it refers
+            are both associated with the same MetaData object.
 
-            if m is None:
-                raise exc.ArgumentError(
-                    "Invalid foreign key column specification: %s" %
-                    self._colspec)
-
-            # A FK between column 'bar' and table 'foo' can be
-            # specified as 'foo', 'foo.bar', 'dbo.foo.bar',
-            # 'otherdb.dbo.foo.bar'. Once we have the column name and
-            # the table name, treat everything else as the schema
-            # name. Some databases (e.g. Sybase) support
-            # inter-database foreign keys. See tickets#1341 and --
-            # indirectly related -- Ticket #594. This assumes that '.'
-            # will never appear *within* any component of the FK.
-
-            (schema, tname, colname) = (None, None, None)
-            if schema is None and parenttable.metadata.schema is not None:
-                schema = parenttable.metadata.schema
-
-            if (len(m) == 1):
-                tname = m.pop()
-            else:
-                colname = m.pop()
-                tname = m.pop()
+        """
 
-            if (len(m) > 0):
-                schema = '.'.join(m)
+        if isinstance(self._colspec, util.string_types):
+
+            parenttable, tablekey, colname = self._resolve_col_tokens()
 
-            if _get_table_key(tname, schema) not in parenttable.metadata:
+            if tablekey not in parenttable.metadata:
                 raise exc.NoReferencedTableError(
                     "Foreign key associated with column '%s' could not find "
                     "table '%s' with which to generate a "
                     "foreign key to target column '%s'" %
-                    (self.parent, tname, colname),
-                    tname)
-            table = Table(tname, parenttable.metadata,
-                          mustexist=True, schema=schema)
-
-            if not hasattr(self.constraint, '_referred_table'):
-                self.constraint._referred_table = table
-            elif self.constraint._referred_table is not table:
-                raise exc.ArgumentError(
-                    'ForeignKeyConstraint on %s(%s) refers to '
-                    'multiple remote tables: %s and %s' % (
-                    parenttable,
-                    self.constraint._col_description,
-                    self.constraint._referred_table,
-                    table
-                ))
-
-            _column = None
-            if colname is None:
-                # colname is None in the case that ForeignKey argument
-                # was specified as table name only, in which case we
-                # match the column name to the same column on the
-                # parent.
-                key = self.parent
-                _column = table.c.get(self.parent.key, None)
-            elif self.link_to_name:
-                key = colname
-                for c in table.c:
-                    if c.name == colname:
-                        _column = c
+                    (self.parent, tablekey, colname),
+                    tablekey)
+            elif parenttable.key not in parenttable.metadata:
+                raise exc.InvalidRequestError(
+                    "Table %s is no longer associated with its "
+                    "parent MetaData" % parenttable)
             else:
-                key = colname
-                _column = table.c.get(colname, None)
-
-            if _column is None:
                 raise exc.NoReferencedColumnError(
-                    "Could not create ForeignKey '%s' on table '%s': "
+                    "Could not initialize target column for "
+                    "ForeignKey '%s' on table '%s': "
                     "table '%s' has no column named '%s'" % (
-                    self._colspec, parenttable.name, table.name, key),
-                    table.name, key)
-
+                    self._colspec, parenttable.name, tablekey, colname),
+                    tablekey, colname)
         elif hasattr(self._colspec, '__clause_element__'):
             _column = self._colspec.__clause_element__()
+            return _column
         else:
             _column = self._colspec
-
-        # propagate TypeEngine to parent if it didn't have one
-        if isinstance(self.parent.type, sqltypes.NullType):
-            self.parent.type = _column.type
-        return _column
+            return _column
 
     def _set_parent(self, column):
-        if hasattr(self, 'parent'):
-            if self.parent is column:
-                return
+        if self.parent is not None and self.parent is not column:
             raise exc.InvalidRequestError(
                     "This ForeignKey already has a parent !")
         self.parent = column
         self.parent.foreign_keys.add(self)
         self.parent._on_table_attach(self._set_table)
 
+    def _set_remote_table(self, table):
+        parenttable, tablekey, colname = self._resolve_col_tokens()
+        self._link_to_col_by_colstring(parenttable, table, colname)
+        self.constraint._validate_dest_table(table)
+
+    def _remove_from_metadata(self, metadata):
+        parenttable, table_key, colname = self._resolve_col_tokens()
+        fk_key = (table_key, colname)
+        try:
+            metadata._fk_memos[fk_key].remove(self)
+        except:
+            pass
+
     def _set_table(self, column, table):
         # standalone ForeignKey - create ForeignKeyConstraint
         # on the hosting Table when attached to the Table.
@@ -1502,6 +1579,27 @@ class ForeignKey(SchemaItem):
             self.constraint._set_parent_with_dispatch(table)
         table.foreign_keys.add(self)
 
+        # set up remote ".column" attribute, or a note to pick it
+        # up when the other Table/Column shows up
+        if isinstance(self._colspec, util.string_types):
+            parenttable, table_key, colname = self._resolve_col_tokens()
+            fk_key = (table_key, colname)
+            if table_key in parenttable.metadata.tables:
+                table = parenttable.metadata.tables[table_key]
+                try:
+                    self._link_to_col_by_colstring(parenttable, table, colname)
+                except exc.NoReferencedColumnError:
+                    # this is OK, we'll try later
+                    pass
+            parenttable.metadata._fk_memos[fk_key].append(self)
+        elif hasattr(self._colspec, '__clause_element__'):
+            _column = self._colspec.__clause_element__()
+            self._set_target_column(_column)
+        else:
+            _column = self._colspec
+            self._set_target_column(_column)
+
+
 
 class _NotAColumnExpr(object):
     def _not_a_column_expr(self):
@@ -2239,6 +2337,19 @@ class ForeignKeyConstraint(Constraint):
                 columns[0].table is not None:
             self._set_parent_with_dispatch(columns[0].table)
 
+    def _validate_dest_table(self, table):
+        table_keys = set([elem._table_key() for elem in self._elements.values()])
+        if None not in table_keys and len(table_keys) > 1:
+            elem0, elem1 = list(table_keys)[0:2]
+            raise exc.ArgumentError(
+                'ForeignKeyConstraint on %s(%s) refers to '
+                'multiple remote tables: %s and %s' % (
+                table.fullname,
+                self._col_description,
+                elem0,
+                elem1
+            ))
+
     @property
     def _col_description(self):
         return ", ".join(self._elements)
@@ -2254,6 +2365,8 @@ class ForeignKeyConstraint(Constraint):
     def _set_parent(self, table):
         super(ForeignKeyConstraint, self)._set_parent(table)
 
+        self._validate_dest_table(table)
+
         for col, fk in self._elements.items():
             # string-specified column names now get
             # resolved to Column objects
@@ -2544,6 +2657,8 @@ class MetaData(SchemaItem):
         self.quote_schema = quote_schema
         self._schemas = set()
         self._sequences = {}
+        self._fk_memos = collections.defaultdict(list)
+
         self.bind = bind
         if reflect:
             util.warn("reflect=True is deprecate; please "
@@ -2568,20 +2683,27 @@ class MetaData(SchemaItem):
         if schema:
             self._schemas.add(schema)
 
+
+
     def _remove_table(self, name, schema):
         key = _get_table_key(name, schema)
-        dict.pop(self.tables, key, None)
+        removed = dict.pop(self.tables, key, None)
+        if removed is not None:
+            for fk in removed.foreign_keys:
+                fk._remove_from_metadata(self)
         if self._schemas:
             self._schemas = set([t.schema
                                 for t in self.tables.values()
                                 if t.schema is not None])
 
+
     def __getstate__(self):
         return {'tables': self.tables,
                 'schema': self.schema,
                 'quote_schema': self.quote_schema,
                 'schemas': self._schemas,
-                'sequences': self._sequences}
+                'sequences': self._sequences,
+                'fk_memos': self._fk_memos}
 
     def __setstate__(self, state):
         self.tables = state['tables']
@@ -2590,6 +2712,7 @@ class MetaData(SchemaItem):
         self._bind = None
         self._sequences = state['sequences']
         self._schemas = state['schemas']
+        self._fk_memos = state['fk_memos']
 
     def is_bound(self):
         """True if this MetaData is bound to an Engine or Connection."""
@@ -2630,6 +2753,7 @@ class MetaData(SchemaItem):
 
         dict.clear(self.tables)
         self._schemas.clear()
+        self._fk_memos.clear()
 
     def remove(self, table):
         """Remove the given Table object from this MetaData."""
index dd2a6e08cd7c16d48dd58963408b78c9a0f86652..f1fe53b73ca2b0114887e80fde7d0e43244e886d 100644 (file)
@@ -2415,7 +2415,9 @@ class GenericTypeCompiler(engine.TypeCompiler):
         return self.visit_VARCHAR(type_)
 
     def visit_null(self, type_):
-        raise NotImplementedError("Can't generate DDL for the null type")
+        raise exc.CompileError("Can't generate DDL for %r; "
+                            "did you forget to specify a "
+                            "type on this Column?" % type_)
 
     def visit_type_decorator(self, type_):
         return self.process(type_.type_engine(self.dialect))
index afd63f2b41f0c17d5e14616746005f04cdd862da..612d6e8ca3d3dc68f3e4ff69feca18a68ee55572 100644 (file)
@@ -2143,7 +2143,8 @@ class InhCondTest(fixtures.TestBase):
 
         assert_raises_message(
             sa_exc.NoReferencedColumnError,
-            "Could not create ForeignKey 'base.q' on table "
+            "Could not initialize target column for ForeignKey "
+            "'base.q' on table "
             "'derived': table 'base' has no column named 'q'",
             mapper,
             Derived, derived_table,  inherits=Base
index 8f0280765ff466b2995be94bbd686750c2785c3b..7b6c8497e212d66a717fc6882854e8892f897710 100644 (file)
@@ -236,23 +236,36 @@ class MetaDataTest(fixtures.TestBase, ComparesTables):
             go
         )
 
-    def test_fk_no_such_target_col_error(self):
+    def test_fk_no_such_target_col_error_upfront(self):
         meta = MetaData()
         a = Table('a', meta, Column('a', Integer))
         Table('b', meta, Column('b', Integer))
-        a.append_constraint(
-            ForeignKeyConstraint(['a'], ['b.x'])
+
+        a.append_constraint(ForeignKeyConstraint(['a'], ['b.x']))
+
+        assert_raises_message(
+            exc.NoReferencedColumnError,
+            "Could not initialize target column for ForeignKey 'b.x' on "
+            "table 'a': table 'b' has no column named 'x'",
+            getattr, list(a.foreign_keys)[0], "column"
         )
 
-        def go():
-            list(a.c.a.foreign_keys)[0].column
+    def test_fk_no_such_target_col_error_delayed(self):
+        meta = MetaData()
+        a = Table('a', meta, Column('a', Integer))
+        a.append_constraint(
+            ForeignKeyConstraint(['a'], ['b.x']))
+
+        b = Table('b', meta, Column('b', Integer))
+
         assert_raises_message(
             exc.NoReferencedColumnError,
-            "Could not create ForeignKey 'b.x' on "
+            "Could not initialize target column for ForeignKey 'b.x' on "
             "table 'a': table 'b' has no column named 'x'",
-            go
+            getattr, list(a.foreign_keys)[0], "column"
         )
 
+
     @testing.exclude('mysql', '<', (4, 1, 1), 'early types are squirrely')
     def test_to_metadata(self):
         meta = MetaData()
@@ -1183,34 +1196,62 @@ class ConstraintTest(fixtures.TestBase):
         assert s1.c.a.references(t1.c.a)
         assert not s1.c.a.references(t1.c.b)
 
-    def test_invalid_composite_fk_check(self):
+    def test_related_column_not_present_atfirst_ok(self):
         m = MetaData()
-        t1 = Table('t1', m, Column('x', Integer), Column('y', Integer),
-            ForeignKeyConstraint(['x', 'y'], ['t2.x', 't3.y'])
+        base_table = Table("base", m,
+            Column("id", Integer, primary_key=True)
         )
-        t2 = Table('t2', m, Column('x', Integer))
-        t3 = Table('t3', m, Column('y', Integer))
-
-        assert_raises_message(
-            exc.ArgumentError,
-            r"ForeignKeyConstraint on t1\(x, y\) refers to "
-                "multiple remote tables: t2 and t3",
-            t1.join, t2
+        fk = ForeignKey('base.q')
+        derived_table = Table("derived", m,
+            Column("id", None, fk,
+                primary_key=True),
         )
+
+        base_table.append_column(Column('q', Integer))
+        assert fk.column is base_table.c.q
+        assert isinstance(derived_table.c.id.type, Integer)
+
+    def test_invalid_composite_fk_check_strings(self):
+        m = MetaData()
+
         assert_raises_message(
             exc.ArgumentError,
             r"ForeignKeyConstraint on t1\(x, y\) refers to "
                 "multiple remote tables: t2 and t3",
-            t1.join, t3
+            Table,
+            't1', m, Column('x', Integer), Column('y', Integer),
+            ForeignKeyConstraint(['x', 'y'], ['t2.x', 't3.y'])
         )
 
+    def test_invalid_composite_fk_check_columns(self):
+        m = MetaData()
+
+        t2 = Table('t2', m, Column('x', Integer))
+        t3 = Table('t3', m, Column('y', Integer))
+
         assert_raises_message(
             exc.ArgumentError,
             r"ForeignKeyConstraint on t1\(x, y\) refers to "
                 "multiple remote tables: t2 and t3",
-            schema.CreateTable(t1).compile
+            Table,
+            't1', m, Column('x', Integer), Column('y', Integer),
+            ForeignKeyConstraint(['x', 'y'], [t2.c.x, t3.c.y])
         )
 
+    def test_invalid_composite_fk_check_columns_notattached(self):
+        m = MetaData()
+        x = Column('x', Integer)
+        y = Column('y', Integer)
+
+        # no error is raised for this one right now.
+        # which is a minor bug.
+        Table('t1', m, Column('x', Integer), Column('y', Integer),
+                ForeignKeyConstraint(['x', 'y'], [x, y])
+            )
+
+        t2 = Table('t2', m, x)
+        t3 = Table('t3', m, y)
+
     def test_constraint_copied_to_proxy_ok(self):
         m = MetaData()
         t1 = Table('t1', m, Column('id', Integer, primary_key=True))
@@ -1234,6 +1275,220 @@ class ConstraintTest(fixtures.TestBase):
             [t2fk]
         )
 
+    def test_type_propagate_composite_fk_string(self):
+        metadata = MetaData()
+        a = Table('a', metadata,
+                  Column('key1', Integer, primary_key=True),
+                  Column('key2', String(40), primary_key=True))
+
+        b = Table('b', metadata,
+                  Column('a_key1', None),
+                  Column('a_key2', None),
+                  Column('id', Integer, primary_key=True),
+                  ForeignKeyConstraint(['a_key1', 'a_key2'],
+                                       ['a.key1', 'a.key2'])
+                  )
+
+        assert isinstance(b.c.a_key1.type, Integer)
+        assert isinstance(b.c.a_key2.type, String)
+
+    def test_type_propagate_composite_fk_col(self):
+        metadata = MetaData()
+        a = Table('a', metadata,
+                  Column('key1', Integer, primary_key=True),
+                  Column('key2', String(40), primary_key=True))
+
+        b = Table('b', metadata,
+                  Column('a_key1', None),
+                  Column('a_key2', None),
+                  Column('id', Integer, primary_key=True),
+                  ForeignKeyConstraint(['a_key1', 'a_key2'],
+                                       [a.c.key1, a.c.key2])
+                  )
+
+        assert isinstance(b.c.a_key1.type, Integer)
+        assert isinstance(b.c.a_key2.type, String)
+
+    def test_type_propagate_standalone_fk_string(self):
+        metadata = MetaData()
+        a = Table('a', metadata,
+                  Column('key1', Integer, primary_key=True))
+
+        b = Table('b', metadata,
+                  Column('a_key1', None, ForeignKey("a.key1")),
+                  )
+
+        assert isinstance(b.c.a_key1.type, Integer)
+
+    def test_type_propagate_standalone_fk_col(self):
+        metadata = MetaData()
+        a = Table('a', metadata,
+                  Column('key1', Integer, primary_key=True))
+
+        b = Table('b', metadata,
+                  Column('a_key1', None, ForeignKey(a.c.key1)),
+                  )
+
+        assert isinstance(b.c.a_key1.type, Integer)
+
+    def test_type_propagate_chained_string_source_first(self):
+        metadata = MetaData()
+        a = Table('a', metadata,
+                  Column('key1', Integer, primary_key=True))
+
+        b = Table('b', metadata,
+                  Column('a_key1', None, ForeignKey("a.key1")),
+                  )
+
+        c = Table('c', metadata,
+                  Column('b_key1', None, ForeignKey("b.a_key1")),
+                  )
+
+        assert isinstance(b.c.a_key1.type, Integer)
+        assert isinstance(c.c.b_key1.type, Integer)
+
+    def test_type_propagate_chained_string_source_last(self):
+        metadata = MetaData()
+
+        b = Table('b', metadata,
+                  Column('a_key1', None, ForeignKey("a.key1")),
+                  )
+
+        c = Table('c', metadata,
+                  Column('b_key1', None, ForeignKey("b.a_key1")),
+                  )
+
+        a = Table('a', metadata,
+                  Column('key1', Integer, primary_key=True))
+
+        assert isinstance(b.c.a_key1.type, Integer)
+        assert isinstance(c.c.b_key1.type, Integer)
+
+    def test_type_propagate_chained_col_orig_first(self):
+        metadata = MetaData()
+        a = Table('a', metadata,
+                  Column('key1', Integer, primary_key=True))
+
+        b = Table('b', metadata,
+                  Column('a_key1', None, ForeignKey(a.c.key1)),
+                  )
+
+        c = Table('c', metadata,
+                  Column('b_key1', None, ForeignKey(b.c.a_key1)),
+                  )
+
+        assert isinstance(b.c.a_key1.type, Integer)
+        assert isinstance(c.c.b_key1.type, Integer)
+
+    def test_column_accessor_col(self):
+        c1 = Column('x', Integer)
+        fk = ForeignKey(c1)
+        is_(fk.column, c1)
+
+    def test_column_accessor_clause_element(self):
+        c1 = Column('x', Integer)
+
+        class CThing(object):
+            def __init__(self, c):
+                self.c = c
+            def __clause_element__(self):
+                return self.c
+
+        fk = ForeignKey(CThing(c1))
+        is_(fk.column, c1)
+
+    def test_column_accessor_string_no_parent(self):
+        fk = ForeignKey("sometable.somecol")
+        assert_raises_message(
+            exc.InvalidRequestError,
+            "this ForeignKey object does not yet have a parent "
+            "Column associated with it.",
+            getattr, fk, "column"
+        )
+
+    def test_column_accessor_string_no_parent_table(self):
+        fk = ForeignKey("sometable.somecol")
+        c1 = Column('x', fk)
+        assert_raises_message(
+            exc.InvalidRequestError,
+            "this ForeignKey's parent column is not yet "
+            "associated with a Table.",
+            getattr, fk, "column"
+        )
+
+    def test_column_accessor_string_no_target_table(self):
+        fk = ForeignKey("sometable.somecol")
+        c1 = Column('x', fk)
+        t1 = Table('t', MetaData(), c1)
+        assert_raises_message(
+            exc.NoReferencedTableError,
+            "Foreign key associated with column 't.x' could not find "
+            "table 'sometable' with which to generate a "
+            "foreign key to target column 'somecol'",
+            getattr, fk, "column"
+        )
+
+    def test_column_accessor_string_no_target_column(self):
+        fk = ForeignKey("sometable.somecol")
+        c1 = Column('x', fk)
+        m = MetaData()
+        t1 = Table('t', m, c1)
+        t2 = Table("sometable", m, Column('notsomecol', Integer))
+        assert_raises_message(
+            exc.NoReferencedColumnError,
+            "Could not initialize target column for ForeignKey "
+            "'sometable.somecol' on table 't': "
+            "table 'sometable' has no column named 'somecol'",
+            getattr, fk, "column"
+        )
+
+    def test_remove_table_fk_bookkeeping(self):
+        metadata = MetaData()
+        fk = ForeignKey('t1.x')
+        t2 = Table('t2', metadata, Column('y', Integer, fk))
+        t3 = Table('t3', metadata, Column('y', Integer, ForeignKey('t1.x')))
+
+        assert t2.key in metadata.tables
+        assert ("t1", "x") in metadata._fk_memos
+
+        metadata.remove(t2)
+
+        # key is removed
+        assert t2.key not in metadata.tables
+
+        # the memo for the FK is still there
+        assert ("t1", "x") in metadata._fk_memos
+
+        # fk is not in the collection
+        assert fk not in metadata._fk_memos[("t1", "x")]
+
+        # make the referenced table
+        t1 = Table('t1', metadata, Column('x', Integer))
+
+        # t2 tells us exactly what's wrong
+        assert_raises_message(
+            exc.InvalidRequestError,
+            "Table t2 is no longer associated with its parent MetaData",
+            getattr, fk, "column"
+        )
+
+        # t3 is unaffected
+        assert t3.c.y.references(t1.c.x)
+
+        # remove twice OK
+        metadata.remove(t2)
+
+    def test_remove_failed(self):
+        metadata = MetaData()
+        fk = ForeignKey('t1.x')
+        t3 = Table('t3', metadata, Column('y', Integer, ForeignKey('t1.x')))
+
+        try:
+            Table('t2', metadata, Column('y', Integer, fk))
+        except:
+            raise
+
+
 
 class ColumnDefinitionTest(AssertsCompiledSQL, fixtures.TestBase):
     """Test Column() construction."""
@@ -1541,19 +1796,48 @@ class ColumnOptionsTest(fixtures.TestBase):
         assert Column(String, default=g2).default is g2
         assert Column(String, onupdate=g2).onupdate is g2
 
-    def test_type_required(self):
-        assert_raises(exc.ArgumentError, Column)
-        assert_raises(exc.ArgumentError, Column, "foo")
-        assert_raises(exc.ArgumentError, Column, default="foo")
-        assert_raises(exc.ArgumentError, Column, Sequence("a"))
-        assert_raises(exc.ArgumentError, Column, "foo", default="foo")
-        assert_raises(exc.ArgumentError, Column, "foo", Sequence("a"))
-        Column(ForeignKey('bar.id'))
-        Column("foo", ForeignKey('bar.id'))
-        Column(ForeignKey('bar.id'), default="foo")
-        Column(ForeignKey('bar.id'), Sequence("a"))
-        Column("foo", ForeignKey('bar.id'), default="foo")
-        Column("foo", ForeignKey('bar.id'), Sequence("a"))
+    def _null_type_error(self, col):
+        t = Table('t', MetaData(), col)
+        assert_raises_message(
+            exc.CompileError,
+            r"\(in table 't', column 'foo'\): Can't generate DDL for NullType",
+            schema.CreateTable(t).compile
+        )
+
+    def _no_name_error(self, col):
+        assert_raises_message(
+            exc.ArgumentError,
+            "Column must be constructed with a non-blank name or "
+            "assign a non-blank .name",
+            Table, 't', MetaData(), col
+        )
+
+    def _no_error(self, col):
+        m = MetaData()
+        b = Table('bar', m, Column('id', Integer))
+        t = Table('t', m, col)
+        schema.CreateTable(t).compile()
+
+    def test_argument_signatures(self):
+        self._no_name_error(Column())
+        self._null_type_error(Column("foo"))
+        self._no_name_error(Column(default="foo"))
+
+        self._no_name_error(Column(Sequence("a")))
+        self._null_type_error(Column("foo", default="foo"))
+
+        self._null_type_error(Column("foo", Sequence("a")))
+
+        self._no_name_error(Column(ForeignKey('bar.id')))
+
+        self._no_error(Column("foo", ForeignKey('bar.id')))
+
+        self._no_name_error(Column(ForeignKey('bar.id'), default="foo"))
+
+        self._no_name_error(Column(ForeignKey('bar.id'), Sequence("a")))
+        self._no_error(Column("foo", ForeignKey('bar.id'), default="foo"))
+        self._no_error(Column("foo", ForeignKey('bar.id'), Sequence("a")))
+
 
     def test_column_info(self):
 
index 501cd37767ba064054f603f2b7149a0bed1f1826..6a0511faa94b8549ccfd0901a19997901644be6f 100644 (file)
@@ -985,14 +985,16 @@ class JoinConditionTest(fixtures.TestBase, AssertsCompiledSQL):
         t2 = Table('t2', m, Column('id', Integer))
         assert_raises_message(
             exc.NoReferencedColumnError,
-            "Could not create ForeignKey 't2.q' on table 't1': "
+            "Could not initialize target column for "
+            "ForeignKey 't2.q' on table 't1': "
                 "table 't2' has no column named 'q'",
             sql_util.join_condition, t1, t2
         )
 
         assert_raises_message(
             exc.NoReferencedColumnError,
-            "Could not create ForeignKey 't2.q' on table 't1': "
+            "Could not initialize target column for "
+            "ForeignKey 't2.q' on table 't1': "
                 "table 't2' has no column named 'q'",
             sql_util.join_condition, t2, t1
         )