if mapper.polymorphic_map and not _polymorphic_from and not refresh_state:
# if we are doing polymorphic, dispatch to a different _instance()
# method specific to the subclass mapper
+
+ def ensure_no_pk(row):
+ identitykey = (
+ identity_class,
+ tuple([row[column] for column in pk_cols]),
+ identity_token,
+ )
+ if not is_not_primary_key(identitykey[1]):
+ raise sa_exc.InvalidRequestError(
+ "Row with identity key %s can't be loaded into an "
+ "object; the polymorphic discriminator column '%s' is "
+ "NULL" % (identitykey, polymorphic_discriminator)
+ )
+
_instance = _decorate_polymorphic_switch(
_instance,
context,
path,
polymorphic_discriminator,
adapter,
+ ensure_no_pk,
)
return _instance
path,
polymorphic_discriminator,
adapter,
+ ensure_no_pk,
):
if polymorphic_discriminator is not None:
polymorphic_on = polymorphic_discriminator
_instance = polymorphic_instances[discriminator]
if _instance:
return _instance(row)
- return instance_fn(row)
+ else:
+ return instance_fn(row)
+ else:
+ ensure_no_pk(row)
+ return None
return polymorphic_instance
)
+class DiscriminatorOrPkNoneTest(fixtures.DeclarativeMappedTest):
+
+ run_setup_mappers = "once"
+ __dialect__ = "default"
+
+ @classmethod
+ def setup_classes(cls):
+ Base = cls.DeclarativeBasic
+
+ class Parent(fixtures.ComparableEntity, Base):
+ __tablename__ = "parent"
+ id = Column(Integer, primary_key=True)
+
+ class A(fixtures.ComparableEntity, Base):
+ __tablename__ = "a"
+ id = Column(Integer, primary_key=True)
+ parent_id = Column(ForeignKey("parent.id"))
+ type = Column(String(50))
+ __mapper_args__ = {
+ "polymorphic_on": type,
+ "polymorphic_identity": "a",
+ }
+
+ class B(A):
+ __tablename__ = "b"
+ id = Column(ForeignKey("a.id"), primary_key=True)
+ __mapper_args__ = {"polymorphic_identity": "b"}
+
+ @classmethod
+ def insert_data(cls):
+ Parent, A, B = cls.classes("Parent", "A", "B")
+ s = Session()
+
+ p1 = Parent(id=1)
+ p2 = Parent(id=2)
+ s.add_all([p1, p2])
+ s.flush()
+
+ s.add_all(
+ [
+ A(id=1, parent_id=1),
+ B(id=2, parent_id=1),
+ A(id=3, parent_id=1),
+ B(id=4, parent_id=1),
+ ]
+ )
+ s.flush()
+
+ s.query(A).filter(A.id.in_([3, 4])).update(
+ {A.type: None}, synchronize_session=False
+ )
+ s.commit()
+
+ def test_pk_is_null(self):
+ Parent, A = self.classes("Parent", "A")
+
+ sess = Session()
+ q = (
+ sess.query(Parent, A)
+ .select_from(Parent)
+ .outerjoin(A)
+ .filter(Parent.id == 2)
+ )
+ row = q.all()[0]
+
+ eq_(row, (Parent(id=2), None))
+
+ def test_pk_not_null_discriminator_null_from_base(self):
+ A, = self.classes("A")
+
+ sess = Session()
+ q = sess.query(A).filter(A.id == 3)
+ assert_raises_message(
+ sa_exc.InvalidRequestError,
+ r"Row with identity key \(<class '.*A'>, \(3,\), None\) can't be "
+ "loaded into an object; the polymorphic discriminator "
+ "column 'a.type' is NULL",
+ q.all,
+ )
+
+ def test_pk_not_null_discriminator_null_from_sub(self):
+ B, = self.classes("B")
+
+ sess = Session()
+ q = sess.query(B).filter(B.id == 4)
+ assert_raises_message(
+ sa_exc.InvalidRequestError,
+ r"Row with identity key \(<class '.*A'>, \(4,\), None\) can't be "
+ "loaded into an object; the polymorphic discriminator "
+ "column 'a.type' is NULL",
+ q.all,
+ )
+
+
class NameConflictTest(fixtures.MappedTest):
@classmethod
def define_tables(cls, metadata):