- Fixed cascade bug in many-to-one relation() when attribute
was set to None, introduced in r6711 (cascade deleted
items into session during add()).
+
+ - The "allow_null_pks" flag is now called "allow_partial_pks",
+ defaults to True, acts like it did in 0.5 again. Except,
+ it also is implemented within merge() such that a SELECT
+ won't be issued for an incoming instance with partially
+ NULL primary key if the flag is False. [ticket:1680]
- sql
- The most common result processors conversion function were
:class:`~sqlalchemy.orm.query.Query`.
allow_null_pks
- This flag is deprecated - allow_null_pks is now "on" in all cases.
- Rows which contain NULL for some (but not all) of its primary key
- columns will be considered to have a valid primary key.
+ This flag is deprecated - this is stated as allow_partial_pks
+ which defaults to True.
+
+ allow_partial_pks
+ Defaults to True. Indicates that a composite primary key with
+ some NULL values should be considered as possibly existing
+ within the database. This affects whether a mapper will assign
+ an incoming row to an existing identity, as well as if
+ session.merge() will check the database first for a particular
+ primary key value. A "partial primary key" can occur if one
+ has mapped to an OUTER JOIN, for example.
batch
Indicates that save operations of multiple entities can be batched
concrete=False,
with_polymorphic=None,
allow_null_pks=None,
+ allow_partial_pks=True,
batch=True,
column_prefix=None,
include_properties=None,
if allow_null_pks:
util.warn_deprecated('the allow_null_pks option to Mapper() is '
- 'deprecated. It is now on in all cases.')
-
+ 'deprecated. It is now allow_partial_pks=False|True, '
+ 'defaults to True.')
+ allow_partial_pks = allow_null_pks
+
+ self.allow_partial_pks = allow_partial_pks
+
if with_polymorphic == '*':
self.with_polymorphic = ('*', None)
elif isinstance(with_polymorphic, (tuple, list)):
populate_instance = extension.get('populate_instance', None)
append_result = extension.get('append_result', None)
populate_existing = context.populate_existing or self.always_refresh
-
+ if self.allow_partial_pks:
+ is_not_primary_key = _none_set.issuperset
+ else:
+ is_not_primary_key = _none_set.issubset
+
def _instance(row, result):
if translate_row:
ret = translate_row(self, context, row)
else:
# check for non-NULL values in the primary key columns,
# else no entity is returned for the row
- if _none_set.issuperset(identitykey[1]):
+ if is_not_primary_key(identitykey[1]):
return None
-
+
isnew = True
currentload = True
loaded_instance = True
merged_state.key = key
self._update_impl(merged_state)
new_instance = True
-
- elif not _none_set.issuperset(key[1]):
+
+ elif not _none_set.issubset(key[1]) or \
+ (mapper.allow_partial_pks and
+ not _none_set.issuperset(key[1])):
merged = self.query(mapper.class_).get(key[1])
else:
merged = None
User(id=9, address_id=5),
User(id=10, address_id=None)])
+ @testing.resolve_artifact_names
+ def test_mapping_to_outerjoin_no_partial_pks(self):
+ """test the allow_partial_pks=False flag."""
+
+
+ mapper(User, users.outerjoin(addresses),
+ allow_partial_pks=False,
+ primary_key=[users.c.id, addresses.c.id],
+ properties=dict(
+ address_id=addresses.c.id))
+
+ session = create_session()
+ l = session.query(User).order_by(User.id, User.address_id).all()
+
+ eq_(l, [
+ User(id=7, address_id=1),
+ User(id=8, address_id=2),
+ User(id=8, address_id=3),
+ User(id=8, address_id=4),
+ User(id=9, address_id=5),
+ None])
+
@testing.resolve_artifact_names
def test_custom_join(self):
"""select_from totally replace the FROM parameters."""
from sqlalchemy.test.testing import assert_raises, assert_raises_message
import sqlalchemy as sa
-from sqlalchemy import Integer, PickleType
+from sqlalchemy import Integer, PickleType, String
import operator
from sqlalchemy.test import testing
from sqlalchemy.util import OrderedSet
)
assert a2 not in sess2.dirty
-
-
@testing.resolve_artifact_names
def test_many_to_many_cascade(self):
+class CompositeNullPksTest(_base.MappedTest):
+ @classmethod
+ def define_tables(cls, metadata):
+ Table("data", metadata,
+ Column('pk1', String(10), primary_key=True),
+ Column('pk2', String(10), primary_key=True),
+ )
+
+ @classmethod
+ def setup_classes(cls):
+ class Data(_base.ComparableEntity):
+ pass
+
+ @testing.resolve_artifact_names
+ def test_merge_allow_partial(self):
+ mapper(Data, data)
+ sess = sessionmaker()()
+
+ d1 = Data(pk1="someval", pk2=None)
+ def go():
+ return sess.merge(d1)
+ self.assert_sql_count(testing.db, go, 1)
+
+ @testing.resolve_artifact_names
+ def test_merge_disallow_partial(self):
+ mapper(Data, data, allow_partial_pks=False)
+ sess = sessionmaker()()
+
+ d1 = Data(pk1="someval", pk2=None)
+
+ def go():
+ return sess.merge(d1)
+ self.assert_sql_count(testing.db, go, 0)
+