- _BinaryExpression.compare() checks for a base set of "commutative" operators and checks for itself in reverse if so
- added ORM-based unit test for the above, fixes [ticket:664]
0.3.10
- sql
- got connection-bound metadata to work with implicit execution
- - cleanup to connection-bound sessions, SessionTransaction
- foreign key specs can have any chararcter in their identifiers
[ticket:667]
-
+ - added commutativity-awareness to binary clause comparisons to
+ each other, improves ORM lazy load optimization [ticket:664]
+- orm
+ - cleanup to connection-bound sessions, SessionTransaction
- postgres
- fixed max identifier length (63) [ticket:571]
def add(self, bind):
if self.parent is not None:
return self.parent.add(bind)
-
+
if self.connections.has_key(bind.engine):
- raise exceptions.InvalidRequestError("Session already has a Connection associated for the given Connection's Engine")
+ raise exceptions.InvalidRequestError("Session already has a Connection associated for the given %sEngine" % (isinstance(bind, engine.Connection) and "Connection's " or ""))
+
return self.get_or_add(bind)
def get_or_add(self, bind):
else:
e = bind.engine
c = bind
+ if e in self.connections:
+ raise exceptions.InvalidRequestError("Session already has a Connection associated for the given Connection's Engine")
self.connections[bind] = self.connections[e] = (c, c.begin(), c is not bind)
return self.connections[bind][0]
return (
isinstance(other, _BinaryExpression) and self.operator == other.operator and
- self.left.compare(other.left) and self.right.compare(other.right)
+ (
+ self.left.compare(other.left) and self.right.compare(other.right)
+ or (
+ self.operator in ['=', '!=', '+', '*'] and
+ self.left.compare(other.right) and self.right.compare(other.left)
+ )
+ )
)
def self_group(self, against=None):
def test_uses_get(self):
"""test that a simple many-to-one lazyload optimizes to use query.get()."""
- mapper(Address, addresses, properties = dict(
- user = relation(mapper(User, users), lazy=True)
- ))
+ for pj in (
+ None,
+ users.c.id==addresses.c.user_id,
+ addresses.c.user_id==users.c.id
+ ):
+ mapper(Address, addresses, properties = dict(
+ user = relation(mapper(User, users), lazy=True, primaryjoin=pj)
+ ))
- sess = create_session()
+ sess = create_session()
- # load address
- a1 = sess.query(Address).filter_by(email_address="ed@wood.com").one()
+ # load address
+ a1 = sess.query(Address).filter_by(email_address="ed@wood.com").one()
- # load user that is attached to the address
- u1 = sess.query(User).get(8)
+ # load user that is attached to the address
+ u1 = sess.query(User).get(8)
+
+ def go():
+ # lazy load of a1.user should get it from the session
+ assert a1.user is u1
+ self.assert_sql_count(testbase.db, go, 0)
+ clear_mappers()
- def go():
- # lazy load of a1.user should get it from the session
- assert a1.user is u1
- self.assert_sql_count(testbase.db, go, 0)
-
def test_many_to_one(self):
mapper(Address, addresses, properties = dict(
user = relation(mapper(User, users), lazy=True)
u = User()
sess.save(u)
sess.flush()
- assert transaction.get_or_add(testbase.db) is trans2.get_or_add(testbase.db) #is transaction.get_or_add(c) is trans2.get_or_add(c) is c
+ assert transaction.get_or_add(testbase.db) is trans2.get_or_add(testbase.db) is transaction.get_or_add(c) is trans2.get_or_add(c) is c
+
+ try:
+ transaction.add(testbase.db.connect())
+ assert False
+ except exceptions.InvalidRequestError, e:
+ assert str(e) == "Session already has a Connection associated for the given Connection's Engine"
+
+ try:
+ transaction.get_or_add(testbase.db.connect())
+ assert False
+ except exceptions.InvalidRequestError, e:
+ assert str(e) == "Session already has a Connection associated for the given Connection's Engine"
+
+ try:
+ transaction.add(testbase.db)
+ assert False
+ except exceptions.InvalidRequestError, e:
+ assert str(e) == "Session already has a Connection associated for the given Engine"
+
trans2.commit()
transaction.rollback()
assert len(sess.query(User).select()) == 0