AttributeHistory object if an untriggered callable was found (not sure how this used to work
OK....)
two mappers that referenced each other
- upgraded all unittests to insert './lib/' into sys.path,
working around new setuptools PYTHONPATH-killing behavior
+- further fixes with attributes/dependencies/etc....
0.2.4
- try/except when the mapper sets init.__name__ on a mapped class,
class InstrumentedAttribute(object):
"""a property object that instruments attribute access on object instances. All methods correspond to
a single attribute on a particular class."""
+
+ PASSIVE_NORESULT = object()
+
def __init__(self, manager, key, uselist, callable_, typecallable, trackparent=False, extension=None, **kwargs):
self.manager = manager
self.key = key
item._state[('hasparent', self)] = value
def get_history(self, obj, passive=False):
- """returns a new AttributeHistory object for the given object for this
- InstrumentedAttribute's attribute."""
- return AttributeHistory(self, obj, passive=passive)
+ """return a new AttributeHistory object for the given object/this attribute's key.
+
+ if passive is True, then dont execute any callables; if the attribute's value
+ can only be achieved via executing a callable, then return None."""
+ # get the current state. this may trigger a lazy load if
+ # passive is False.
+ current = self.get(obj, passive=passive, raiseerr=False)
+ if current is InstrumentedAttribute.PASSIVE_NORESULT:
+ return None
+ return AttributeHistory(self, obj, current, passive=passive)
def set_callable(self, obj, callable_):
"""sets a callable function on the given object which will be executed when this attribute
callable_ = self._get_callable(obj)
if callable_ is not None:
if passive:
- return None
+ return InstrumentedAttribute.PASSIVE_NORESULT
l = InstrumentedList(self, obj, self._adapt_list(callable_()), init=False)
# if a callable was executed, then its part of the "committed state"
# if any, so commit the newly loaded data
callable_ = self._get_callable(obj)
if callable_ is not None:
if passive:
- return None
+ return InstrumentedAttribute.PASSIVE_NORESULT
obj.__dict__[self.key] = callable_()
# if a callable was executed, then its part of the "committed state"
# if any, so commit the newly loaded data
class AttributeHistory(object):
"""calculates the "history" of a particular attribute on a particular instance, based on the CommittedState
associated with the instance, if any."""
- def __init__(self, attr, obj, passive=False):
+ def __init__(self, attr, obj, current, passive=False):
self.attr = attr
- # get the current state. this may trigger a lazy load if
- # passive is False.
- current = attr.get(obj, passive=passive, raiseerr=False)
-
+
# get the "original" value. if a lazy load was fired when we got
# the 'current' value, this "original" was also populated just
# now as well (therefore we have to get it second)
self._current = current
else:
self._current = [current]
-
if attr.uselist:
s = util.Set(original or [])
self._added_items = []
"""
attr = getattr(obj.__class__, key)
x = attr.get(obj, passive=passive)
- if x is None:
+ if x is InstrumentedAttribute.PASSIVE_NORESULT:
return []
elif attr.uselist:
return x
self._synchronize(obj, child, None, False)
if child is not None and self.post_update:
uowcommit.register_object(child, postupdate=True)
- for child in childlist.deleted_items():
- if not self.cascade.delete_orphan:
- self._synchronize(obj, child, None, True)
+ for child in childlist.deleted_items():
+ if not self.cascade.delete_orphan:
+ self._synchronize(obj, child, None, True)
def preprocess_dependencies(self, task, deplist, uowcommit, delete = False):
#print self.mapper.mapped_table.name + " " + self.key + " " + repr(len(deplist)) + " preprocess_dep isdelete " + repr(delete) + " direction " + repr(self.direction)
for child in childlist.added_items():
if child is not None:
uowcommit.register_object(child)
- for child in childlist.deleted_items():
- if not self.cascade.delete_orphan:
- uowcommit.register_object(child, isdelete=False)
- elif childlist.hasparent(child) is False:
- uowcommit.register_object(child, isdelete=True)
- for c in self.mapper.cascade_iterator('delete', child):
- uowcommit.register_object(c, isdelete=True)
+ for child in childlist.deleted_items():
+ if not self.cascade.delete_orphan:
+ uowcommit.register_object(child, isdelete=False)
+ elif childlist.hasparent(child) is False:
+ uowcommit.register_object(child, isdelete=True)
+ for c in self.mapper.cascade_iterator('delete', child):
+ uowcommit.register_object(c, isdelete=True)
def _synchronize(self, obj, child, associationrow, clearkeys):
source = obj
if not type in self.cascade:
return
childlist = sessionlib.attribute_manager.get_history(object, self.key, passive=True)
-
+ if childlist is None:
+ return
mapper = self.mapper.primary_mapper()
for c in childlist.added_items() + childlist.deleted_items() + childlist.unchanged_items():
if c is not None and c not in recursive:
isdelete = taskelement.isdelete
# list of dependent objects from this object
- childlist = dep.get_object_dependencies(obj, trans, passive = True)
-
+ childlist = dep.get_object_dependencies(obj, trans, passive=True)
+ if childlist is None:
+ continue
# the task corresponding to saving/deleting of those dependent objects
childtask = trans.get_task_by_mapper(processor.mapper.primary_mapper())
sess.flush()
self.assert_(item_keywords.count().scalar() == 0)
+class AssociationTest2(testbase.PersistTest):
+ def setUpAll(self):
+ global table_originals, table_people, table_isauthor, metadata, Originals, People, IsAuthor
+ metadata = BoundMetaData(testbase.db)
+ table_originals = Table('Originals', metadata,
+ Column('ID', Integer, primary_key=True),
+ Column('Title', String(200), nullable=False),
+ Column('Date', Date ),
+ )
+ table_people = Table('People', metadata,
+ Column('ID', Integer, primary_key=True),
+ Column('Name', String(140), nullable=False),
+ Column('Country', CHAR(2), default='es'),
+ )
+ table_isauthor = Table('IsAuthor', metadata,
+ Column('OriginalsID', Integer, ForeignKey('Originals.ID'),
+default=None),
+ Column('PeopleID', Integer, ForeignKey('People.ID'),
+default=None),
+ Column('Kind', CHAR(1), default='A'),
+ )
+ metadata.create_all()
+
+ class Base(object):
+ def __init__(self, **kw):
+ for k,v in kw.iteritems():
+ setattr(self, k, v)
+ def display(self):
+ c = [ "%s=%s" % (col.key, repr(getattr(self, col.key))) for col
+in self.c ]
+ return "%s(%s)" % (self.__class__.__name__, ', '.join(c))
+ def __repr__(self):
+ return self.display()
+ def __str__(self):
+ return self.display()
+ class Originals(Base):
+ order = [table_originals.c.Title, table_originals.c.Date]
+ class People(Base):
+ order = [table_people.c.Name]
+ class IsAuthor(Base):
+ pass
+
+ mapper(Originals, table_originals, order_by=Originals.order,
+ properties={
+ 'people': relation(IsAuthor, association=People),
+ 'authors': relation(People, secondary=table_isauthor, backref='written',
+ primaryjoin=and_(table_originals.c.ID==table_isauthor.c.OriginalsID,
+ table_isauthor.c.Kind=='A')),
+ 'title': table_originals.c.Title,
+ 'date': table_originals.c.Date,
+ })
+ mapper(People, table_people, order_by=People.order, properties= {
+ 'originals': relation(IsAuthor, association=Originals),
+ 'name': table_people.c.Name,
+ 'country': table_people.c.Country,
+ })
+ mapper(IsAuthor, table_isauthor,
+ primary_key=[table_isauthor.c.OriginalsID, table_isauthor.c.PeopleID,
+table_isauthor.c.Kind],
+ properties={
+ 'original': relation(Originals, lazy=False),
+ 'person': relation(People, lazy=False),
+ 'kind': table_isauthor.c.Kind,
+ })
+
+ def tearDown(self):
+ for t in metadata.table_iterator(reverse=True):
+ t.delete().execute()
+ def tearDownAll(self):
+ clear_mappers()
+ metadata.drop_all()
+
+ def testinsert(self):
+ # this test is sure to get more complex...
+ p = People(name='name', country='es')
+ sess = create_session()
+ sess.save(p)
+ sess.flush()
+
+
if __name__ == "__main__":
testbase.main()