From 8a04f997843b87a426ee4821dc31c35acfc90a3c Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Fri, 7 Nov 2008 16:19:24 +0000 Subject: [PATCH] - Repaired the table.tometadata() method so that a passed-in schema argument is propigated to ForeignKey constructs. --- CHANGES | 3 +++ lib/sqlalchemy/schema.py | 34 ++++++++++++++++++---------------- test/engine/metadata.py | 27 ++++++++++++++++++++++++++- 3 files changed, 47 insertions(+), 17 deletions(-) diff --git a/CHANGES b/CHANGES index c0ac0625f6..ace4452359 100644 --- a/CHANGES +++ b/CHANGES @@ -98,6 +98,9 @@ CHANGES - Removed "default_order_by()" method on all FromClause objects. + - Repaired the table.tometadata() method so that a passed-in + schema argument is propigated to ForeignKey constructs. + - Slightly changed behavior of IN operator for comparing to empty collections. Now results in inequality comparison against self. More portable, but breaks with stored procedures diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py index 964c81508a..48090e8b7c 100644 --- a/lib/sqlalchemy/schema.py +++ b/lib/sqlalchemy/schema.py @@ -396,16 +396,16 @@ class Table(SchemaItem, expression.TableClause): """Return a copy of this ``Table`` associated with a different ``MetaData``.""" try: - if schema is None: + if not schema: schema = self.schema key = _get_table_key(self.name, schema) return metadata.tables[key] except KeyError: args = [] for c in self.columns: - args.append(c.copy()) + args.append(c.copy(schema=schema)) for c in self.constraints: - args.append(c.copy()) + args.append(c.copy(schema=schema)) return Table(self.name, metadata, schema=schema, *args) class Column(SchemaItem, expression._ColumnClause): @@ -688,13 +688,13 @@ class Column(SchemaItem, expression._ColumnClause): self._init_items(*toinit) self.args = None - def copy(self): + def copy(self, **kw): """Create a copy of this ``Column``, unitialized. This is used in ``Table.tometadata``. """ - return Column(self.name, self.type, self.default, key = self.key, primary_key = self.primary_key, nullable = self.nullable, quote=self.quote, index=self.index, autoincrement=self.autoincrement, *[c.copy() for c in self.constraints]) + return Column(self.name, self.type, self.default, key = self.key, primary_key = self.primary_key, nullable = self.nullable, quote=self.quote, index=self.index, autoincrement=self.autoincrement, *[c.copy(**kw) for c in self.constraints]) def _make_proxy(self, selectable, name=None): """Create a *proxy* for this column. @@ -783,15 +783,17 @@ class ForeignKey(SchemaItem): self.initially = initially def __repr__(self): - return "ForeignKey(%s)" % repr(self._get_colspec()) + return "ForeignKey(%r)" % self._get_colspec() - def copy(self): + def copy(self, schema=None): """Produce a copy of this ForeignKey object.""" - return ForeignKey(self._get_colspec()) + return ForeignKey(self._get_colspec(schema=schema)) - def _get_colspec(self): - if isinstance(self._colspec, basestring): + def _get_colspec(self, schema=None): + if schema: + return schema + "." + self.column.table.name + "." + self.column.key + elif isinstance(self._colspec, basestring): return self._colspec else: return "%s.%s" % (self._colspec.table.fullname, self._colspec.key) @@ -1091,7 +1093,7 @@ class Constraint(SchemaItem): def __len__(self): return len(self.columns) - def copy(self): + def copy(self, **kw): raise NotImplementedError() class CheckConstraint(Constraint): @@ -1136,7 +1138,7 @@ class CheckConstraint(Constraint): self.parent = parent parent.constraints.add(self) - def copy(self): + def copy(self, **kw): return CheckConstraint(self.sqltext, name=self.name) class ForeignKeyConstraint(Constraint): @@ -1212,8 +1214,8 @@ class ForeignKeyConstraint(Constraint): self.columns.add(self.table.c[fk.parent.key]) self.elements.add(fk) - def copy(self): - return ForeignKeyConstraint([x.parent.name for x in self.elements], [x._get_colspec() for x in self.elements], name=self.name, onupdate=self.onupdate, ondelete=self.ondelete, use_alter=self.use_alter) + def copy(self, **kw): + return ForeignKeyConstraint([x.parent.name for x in self.elements], [x._get_colspec(**kw) for x in self.elements], name=self.name, onupdate=self.onupdate, ondelete=self.ondelete, use_alter=self.use_alter) class PrimaryKeyConstraint(Constraint): """A table-level PRIMARY KEY constraint. @@ -1272,7 +1274,7 @@ class PrimaryKeyConstraint(Constraint): col.primary_key = False del self.columns[col.key] - def copy(self): + def copy(self, **kw): return PrimaryKeyConstraint(name=self.name, *[c.key for c in self]) def __eq__(self, other): @@ -1326,7 +1328,7 @@ class UniqueConstraint(Constraint): def append_column(self, col): self.columns.add(col) - def copy(self): + def copy(self, **kw): return UniqueConstraint(name=self.name, *self.__colnames) class Index(SchemaItem): diff --git a/test/engine/metadata.py b/test/engine/metadata.py index ba7cddb0ef..c8fc6f7e0f 100644 --- a/test/engine/metadata.py +++ b/test/engine/metadata.py @@ -5,7 +5,7 @@ from testlib.sa import Table, Column, Integer, String, UniqueConstraint, \ CheckConstraint, ForeignKey import testlib.sa as tsa from testlib import TestBase, ComparesTables, testing, engines - +from testlib.testing import eq_ class MetaDataTest(TestBase, ComparesTables): def test_metadata_connect(self): @@ -110,7 +110,32 @@ class MetaDataTest(TestBase, ComparesTables): assert not c.columns.contains_column(table.c.name) finally: meta.drop_all(testing.db) + + def test_tometadata_with_schema(self): + meta = MetaData() + + table = Table('mytable', meta, + Column('myid', Integer, primary_key=True), + Column('name', String(40), nullable=True), + Column('description', String(30), CheckConstraint("description='hi'")), + UniqueConstraint('name'), + test_needs_fk=True, + ) + + table2 = Table('othertable', meta, + Column('id', Integer, primary_key=True), + Column('myid', Integer, ForeignKey('mytable.myid')), + test_needs_fk=True, + ) + + meta2 = MetaData() + table_c = table.tometadata(meta2, schema='someschema') + table2_c = table2.tometadata(meta2, schema='someschema') + eq_(str(table_c.join(table2_c).onclause), str(table_c.c.myid == table2_c.c.myid)) + eq_(str(table_c.join(table2_c).onclause), "someschema.mytable.myid = someschema.othertable.myid") + + def test_nonexistent(self): self.assertRaises(tsa.exc.NoSuchTableError, Table, 'fake_table', -- 2.47.3