database if you supply a __autoload__ = True attribute in your
mapping inner-class. Currently this does not support reflecting
any relationships.
+- deferred column load could screw up the connection status in
+a flush() under some circumstances, this was fixed
0.2.5
- fixed endless loop bug in select_by(), if the traversal hit
self.__connection = None
del self.__connection
def scalar(self, object, parameters=None, **kwargs):
- row = self.execute(object, parameters, **kwargs).fetchone()
- if row is not None:
- return row[0]
- else:
- return None
+ result = self.execute(object, parameters, **kwargs)
+ row = result.fetchone()
+ try:
+ if row is not None:
+ return row[0]
+ else:
+ return None
+ finally:
+ result.close()
def execute(self, object, *multiparams, **params):
return Connection.executors[type(object).__mro__[-2]](self, object, *multiparams, **params)
def execute_default(self, default, **kwargs):
if not attr:
return None
clause.clauses.append(primary_key == attr)
-
session = sessionlib.object_session(instance)
if session is None:
raise exceptions.InvalidRequestError("Parent instance %s is not bound to a Session; deferred load operation of attribute '%s' cannot proceed" % (instance.__class__, self.key))
- connection = session.connection(self.parent)
-
- try:
- if self.group is not None:
- groupcols = [p for p in self.localparent.props.values() if isinstance(p, DeferredColumnProperty) and p.group==self.group]
- row = connection.execute(sql.select([g.columns[0] for g in groupcols], clause, use_labels=True), None).fetchone()
+
+ if self.group is not None:
+ groupcols = [p for p in self.localparent.props.values() if isinstance(p, DeferredColumnProperty) and p.group==self.group]
+ result = session.execute(self.localparent, sql.select([g.columns[0] for g in groupcols], clause, use_labels=True), None)
+ try:
+ row = result.fetchone()
for prop in groupcols:
if prop is self:
continue
instance.__dict__[prop.key] = row[prop.columns[0]]
sessionlib.attribute_manager.init_instance_attribute(instance, prop.key, uselist=False)
return row[self.columns[0]]
- else:
- return connection.scalar(sql.select([self.columns[0]], clause, use_labels=True),None)
- finally:
- connection.close()
+ finally:
+ result.close()
+ else:
+ return session.scalar(self.localparent, sql.select([self.columns[0]], clause, use_labels=True),None)
+
return lazyload
def setup(self, key, statement, **options):
pass
l = AddressUser.mapper.selectone()
self.assert_(l.user_id == au.user_id and l.address_id == au.address_id)
+ def testdeferred(self):
+ """test that a deferred load within a flush() doesnt screw up the connection"""
+ mapper(User, users, properties={
+ 'user_name':deferred(users.c.user_name)
+ })
+ u = User()
+ u.user_id=42
+ ctx.current.flush()
+
def testmultitable(self):
"""tests a save of an object where each instance spans two tables. also tests
redefinition of the keynames for the column properties."""