elif single_entity:
rows = [process[0](row, None) for row in fetch]
else:
- rows = [util.NamedTuple(labels,
- [proc(row, None) for proc in process])
+ rows = [util.NamedTuple([proc(row, None) for proc in process], labels)
for row in fetch]
if filter:
attributes.instance_state(newrow[i]),
attributes.instance_dict(newrow[i]),
load=load, _recursive={})
- result.append(util.NamedTuple(row._labels, newrow))
+ result.append(util.NamedTuple(newrow, row._labels))
return iter(result)
finally:
"""
- def __new__(cls, labels, vals):
+ def __new__(cls, vals, labels=None):
vals = list(vals)
t = tuple.__new__(cls, vals)
- t.__dict__ = dict(itertools.izip(labels, vals))
- t._labels = labels
+ if labels:
+ t.__dict__ = dict(itertools.izip(labels, vals))
+ t._labels = labels
return t
def keys(self):
from sqlalchemy.test import testing
from sqlalchemy import Integer, String, ForeignKey
from sqlalchemy.test.schema import Table, Column
-from sqlalchemy.orm import mapper, relation, create_session, attributes, interfaces
+from sqlalchemy.orm import mapper, relation, create_session, sessionmaker, attributes, interfaces
from test.orm import _base, _fixtures
eq_(ad.email_address, 'ed@bar.com')
eq_(u2, User(name='ed', addresses=[Address(email_address='ed@bar.com')]))
+ @testing.resolve_artifact_names
+ def test_pickle_protocols(self):
+ mapper(User, users, properties={
+ 'addresses':relation(Address, backref="user")
+ })
+ mapper(Address, addresses)
+
+ sess = sessionmaker()()
+ u1 = User(name='ed')
+ u1.addresses.append(Address(email_address='ed@bar.com'))
+ sess.add(u1)
+ sess.commit()
+
+ u1 = sess.query(User).first()
+ u1.addresses
+ for protocol in -1, 0, 1, 2:
+ u2 = pickle.loads(pickle.dumps(u1, protocol))
+ eq_(u1, u2)
+
@testing.resolve_artifact_names
def test_options_with_descriptors(self):
mapper(User, users, properties={
def test_tuple_labeling(self):
sess = create_session()
- for pickled in False, True:
+ # test pickle + all the protocols !
+ for pickled in False, -1, 0, 1, 2:
for row in sess.query(User, Address).join(User.addresses).all():
- if pickled:
- row = util.pickle.loads(util.pickle.dumps(row))
+ if pickled is not False:
+ row = util.pickle.loads(util.pickle.dumps(row, pickled))
eq_(set(row.keys()), set(['User', 'Address']))
eq_(row.User, row[0])
eq_(row.Address, row[1])
for row in sess.query(User.name, User.id.label('foobar')):
- if pickled:
- row = util.pickle.loads(util.pickle.dumps(row))
+ if pickled is not False:
+ row = util.pickle.loads(util.pickle.dumps(row, pickled))
eq_(set(row.keys()), set(['name', 'foobar']))
eq_(row.name, row[0])
eq_(row.foobar, row[1])
for row in sess.query(User).values(User.name, User.id.label('foobar')):
- if pickled:
- row = util.pickle.loads(util.pickle.dumps(row))
+ if pickled is not False:
+ row = util.pickle.loads(util.pickle.dumps(row, pickled))
eq_(set(row.keys()), set(['name', 'foobar']))
eq_(row.name, row[0])
eq_(row.foobar, row[1])
oalias = aliased(Order)
for row in sess.query(User, oalias).join(User.orders).all():
- if pickled:
- row = util.pickle.loads(util.pickle.dumps(row))
+ if pickled is not False:
+ row = util.pickle.loads(util.pickle.dumps(row, pickled))
eq_(set(row.keys()), set(['User']))
eq_(row.User, row[0])
oalias = aliased(Order, name='orders')
for row in sess.query(User, oalias).join(User.orders).all():
- if pickled:
- row = util.pickle.loads(util.pickle.dumps(row))
+ if pickled is not False:
+ row = util.pickle.loads(util.pickle.dumps(row, pickled))
eq_(set(row.keys()), set(['User', 'orders']))
eq_(row.User, row[0])
eq_(row.orders, row[1])
-
+
+ if pickled is not False:
+ ret = sess.query(User, Address).join(User.addresses).all()
+ util.pickle.loads(util.pickle.dumps(ret, pickled))
+
def test_column_queries(self):
sess = create_session()