From: Mike Bayer Date: Sat, 30 Sep 2006 16:26:52 +0000 (+0000) Subject: - fixed condition that occurred during reflection when a primary key X-Git-Tag: rel_0_3_0~100 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8a1706b494e5e164ea584dc4a573fa0873482dc7;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - fixed condition that occurred during reflection when a primary key column was explciitly overridden, where the PrimaryKeyConstraint would get both the reflected and the programmatic column doubled up --- diff --git a/CHANGES b/CHANGES index 369f875ec2..5e35e4eaae 100644 --- a/CHANGES +++ b/CHANGES @@ -28,6 +28,9 @@ explicitly set to False - TypeEngine objects now have methods to deal with copying and comparing values of their specific type. Currently used by the ORM, see below. + - fixed condition that occurred during reflection when a primary key + column was explciitly overridden, where the PrimaryKeyConstraint would + get both the reflected and the programmatic column doubled up - Connections/Pooling/Execution: - connection pool tracks open cursors and automatically closes them if connection is returned to pool with cursors still opened. Can be diff --git a/lib/sqlalchemy/orm/mapper.py b/lib/sqlalchemy/orm/mapper.py index fd912e617f..af54f6df80 100644 --- a/lib/sqlalchemy/orm/mapper.py +++ b/lib/sqlalchemy/orm/mapper.py @@ -343,6 +343,10 @@ class Mapper(object): except KeyError: l = self.pks_by_table.setdefault(t, util.OrderedSet()) for k in t.primary_key: + #if k.key not in t.c and k._label not in t.c: + # this is a condition that was occurring when table reflection was doubling up primary keys + # that were overridden in the Table constructor + # raise exceptions.AssertionError("Column " + str(k) + " not located in the column set of table " + str(t)) l.add(k) if len(self.pks_by_table[self.mapped_table]) == 0: @@ -392,6 +396,7 @@ class Mapper(object): self.__log("adding ColumnProperty %s" % (column.key)) elif isinstance(prop, ColumnProperty): prop.columns.append(column) + self.__log("appending to existing ColumnProperty %s" % (column.key)) else: if not self.allow_column_override: raise exceptions.ArgumentError("WARNING: column '%s' not being added due to property '%s'. Specify 'allow_column_override=True' to mapper() to ignore this condition." % (column.key, repr(prop))) diff --git a/lib/sqlalchemy/schema.py b/lib/sqlalchemy/schema.py index 3dab1f8679..72109379d7 100644 --- a/lib/sqlalchemy/schema.py +++ b/lib/sqlalchemy/schema.py @@ -440,6 +440,8 @@ class Column(SchemaItem, sql.ColumnClause): else: return self.name return self.name + else: + return self.name def _derived_metadata(self): return self.table.metadata @@ -764,6 +766,10 @@ class PrimaryKeyConstraint(Constraint): def accept_schema_visitor(self, visitor): visitor.visit_primary_key_constraint(self) def append(self, col): + # TODO: change "columns" to a key-sensitive set ? + for c in self.columns: + if c.key == col.key: + self.columns.remove(c) self.columns.append(col) col.primary_key=True def copy(self): diff --git a/lib/sqlalchemy/util.py b/lib/sqlalchemy/util.py index 3664b8e6da..91e295435b 100644 --- a/lib/sqlalchemy/util.py +++ b/lib/sqlalchemy/util.py @@ -102,6 +102,11 @@ class OrderedProperties(object): raise AttributeError(key) def __contains__(self, key): return key in self.__data + def get(self, key, default=None): + if self.has_key(key): + return self[key] + else: + return default def keys(self): return self.__data.keys() def has_key(self, key): diff --git a/test/engine/reflection.py b/test/engine/reflection.py index c31e5454f8..2e2c501278 100644 --- a/test/engine/reflection.py +++ b/test/engine/reflection.py @@ -100,6 +100,86 @@ class ReflectionTest(PersistTest): finally: addresses.drop() users.drop() + + def testoverridecolumns(self): + """test that you can override columns which contain foreign keys to other reflected tables""" + meta = BoundMetaData(testbase.db) + users = Table('users', meta, + Column('id', Integer, primary_key=True), + Column('name', String(30))) + addresses = Table('addresses', meta, + Column('id', Integer, primary_key=True), + Column('street', String(30)), + Column('user_id', Integer)) + + meta.create_all() + try: + meta2 = BoundMetaData(testbase.db) + a2 = Table('addresses', meta2, + Column('user_id', Integer, ForeignKey('users.id')), + autoload=True) + u2 = Table('users', meta2, autoload=True) + + assert a2.c.user_id.foreign_key is not None + assert a2.c.user_id.foreign_key.parent is a2.c.user_id + assert u2.join(a2).onclause == u2.c.id==a2.c.user_id + + meta3 = BoundMetaData(testbase.db) + u3 = Table('users', meta3, autoload=True) + a3 = Table('addresses', meta3, + Column('user_id', Integer, ForeignKey('users.id')), + autoload=True) + + assert u3.join(a3).onclause == u3.c.id==a3.c.user_id + + finally: + meta.drop_all() + + def testoverridecolumns2(self): + """test that you can override columns which contain foreign keys to other reflected tables, + where the foreign key column is also a primary key column""" + meta = BoundMetaData(testbase.db) + users = Table('users', meta, + Column('id', Integer, primary_key=True), + Column('name', String(30))) + addresses = Table('addresses', meta, + Column('id', Integer, primary_key=True), + Column('street', String(30))) + + + meta.create_all() + try: + meta2 = BoundMetaData(testbase.db) + a2 = Table('addresses', meta2, + Column('id', Integer, ForeignKey('users.id'), primary_key=True, ), + autoload=True) + u2 = Table('users', meta2, autoload=True) + + assert list(a2.primary_key) == [a2.c.id] + assert list(u2.primary_key) == [u2.c.id] + assert u2.join(a2).onclause == u2.c.id==a2.c.id + + # heres what was originally failing, because a2's primary key + # had two "id" columns, one of which was not part of a2's "c" collection + #class Address(object):pass + #mapper(Address, a2) + #add1 = Address() + #sess = create_session() + #sess.save(add1) + #sess.flush() + + meta3 = BoundMetaData(testbase.db) + u3 = Table('users', meta3, autoload=True) + a3 = Table('addresses', meta3, + Column('id', Integer, ForeignKey('users.id'), primary_key=True), + autoload=True) + + assert list(a3.primary_key) == [a3.c.id] + assert list(u3.primary_key) == [u3.c.id] + assert u3.join(a3).onclause == u3.c.id==a3.c.id + + finally: + meta.drop_all() @testbase.supported('mysql') def testmysqltypes(self): @@ -146,6 +226,7 @@ class ReflectionTest(PersistTest): t.drop() def testmultipk(self): + """test that creating a table checks for a sequence before creating it""" table = Table( 'engine_multi', testbase.db, Column('multi_id', Integer, Sequence('multi_id_seq'), primary_key=True),