self.rowcount = engine.context.rowcount
metadata = cursor.description
self.props = {}
+ self.keys = []
i = 0
if metadata is not None:
for item in metadata:
raise "None for metadata " + colname
if self.props.setdefault(colname, rec) is not rec:
self.props[colname] = (ResultProxy.AmbiguousColumn(colname), 0)
+ self.keys.append(colname)
self.props[i] = rec
i+=1
"""proxies a single cursor row for a parent ResultProxy."""
def __init__(self, parent, row):
"""RowProxy objects are constructed by ResultProxy objects."""
- self.parent = parent
- self.row = row
+ self.__parent = parent
+ self.__row = row
+ def keys(self):
+ return self.__parent.keys
def __iter__(self):
- for i in range(0, len(self.row)):
- yield self.parent._get_col(self.row, i)
+ for k in self.keys():
+ yield k
def __eq__(self, other):
- return (other is self) or (other == tuple([self.parent._get_col(self.row, key) for key in range(0, len(self.row))]))
+ return (other is self) or (other == tuple([self.__parent._get_col(self.__row, key) for key in range(0, len(self.__row))]))
def __repr__(self):
- return repr(tuple([self.parent._get_col(self.row, key) for key in range(0, len(self.row))]))
+ return repr(dict(self.iteritems()))
def __getitem__(self, key):
- return self.parent._get_col(self.row, key)
+ return self.__parent._get_col(self.__row, key)
def __getattr__(self, name):
try:
- return self.parent._get_col(self.row, name)
+ return self[name]
except:
raise AttributeError
+ def iteritems(self):
+ for k in self:
+ yield (k, self[k])
+ def iterkeys(self):
+ return self.__iter__()
+ def itervalues(self):
+ for _, v in self.iteritems():
+ yield v
+ def values(self):
+ return [v for _, v in self.iteritems()]
+ def items(self):
+ return list(self.iteritems())
objectstore.uow().commit()
usertable = users.select(users.c.user_id.in_(u.foo_id)).execute().fetchall()
- self.assert_(usertable[0].row == (u.foo_id, 'multitester'))
+ self.assertEqual(usertable[0].values(), [u.foo_id, 'multitester'])
addresstable = addresses.select(addresses.c.address_id.in_(u.address_id)).execute().fetchall()
- self.assert_(addresstable[0].row == (u.address_id, u.foo_id, 'multi@test.org'))
+ self.assertEqual(addresstable[0].values(), [u.address_id, u.foo_id, 'multi@test.org'])
u.email = 'lala@hey.com'
u.user_name = 'imnew'
objectstore.uow().commit()
usertable = users.select(users.c.user_id.in_(u.foo_id)).execute().fetchall()
- self.assert_(usertable[0].row == (u.foo_id, 'imnew'))
+ self.assertEqual(usertable[0].values(), [u.foo_id, 'imnew'])
addresstable = addresses.select(addresses.c.address_id.in_(u.address_id)).execute().fetchall()
- self.assert_(addresstable[0].row == (u.address_id, u.foo_id, 'lala@hey.com'))
+ self.assertEqual(addresstable[0].values(), [u.address_id, u.foo_id, 'lala@hey.com'])
u = m.select(users.c.user_id==u.foo_id)[0]
self.echo( repr(u.__dict__))
])
l = sql.select([users, addresses], sql.and_(users.c.user_id==addresses.c.address_id, addresses.c.address_id==a.address_id)).execute()
- self.echo( repr(l.fetchone().row))
+ self.echo( repr(l.fetchone().values()))
objectstore.uow().commit()
usertable = users.select(users.c.user_id.in_(u.user_id)).execute().fetchall()
- self.assert_(usertable[0].row == (u.user_id, 'one2manytester'))
+ self.assertEqual(usertable[0].values(), [u.user_id, 'one2manytester'])
addresstable = addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id), order_by=[addresses.c.email_address]).execute().fetchall()
- self.assert_(addresstable[0].row == (a2.address_id, u.user_id, 'lala@test.org'))
- self.assert_(addresstable[1].row == (a.address_id, u.user_id, 'one2many@test.org'))
+ self.assertEqual(addresstable[0].values(), [a2.address_id, u.user_id, 'lala@test.org'])
+ self.assertEqual(addresstable[1].values(), [a.address_id, u.user_id, 'one2many@test.org'])
userid = u.user_id
addressid = a2.address_id
addresstable = addresses.select(addresses.c.address_id == addressid).execute().fetchall()
- self.assert_(addresstable[0].row == (addressid, userid, 'somethingnew@foo.com'))
+ self.assertEqual(addresstable[0].values(), [addressid, userid, 'somethingnew@foo.com'])
self.assert_(u.user_id == userid and a2.address_id == addressid)
def testmapperswitch(self):
u.addresses.append(a2)
m.save(u)
addresstable = addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id)).execute().fetchall()
- self.echo( repr(addresstable[0].row))
- self.assert_(addresstable[0].row == (a.address_id, u.user_id, 'one2many@test.org'))
- self.assert_(addresstable[1].row == (a2.address_id, u.user_id, 'lala@test.org'))
+ self.echo( repr(addresstable[0].values()))
+ self.assertEqual(addresstable[0].values(), [a.address_id, u.user_id, 'one2many@test.org'])
+ self.assertEqual(addresstable[1].values(), [a2.address_id, u.user_id, 'lala@test.org'])
del u.addresses[1]
m.save(u)
addresstable = addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id)).execute().fetchall()
self.echo( repr(addresstable))
- self.assert_(addresstable[0].row == (a.address_id, u.user_id, 'one2many@test.org'))
- self.assert_(addresstable[1].row == (a2.address_id, None, 'lala@test.org'))
+ self.assertEqual(addresstable[0].values(), [a.address_id, u.user_id, 'one2many@test.org'])
+ self.assertEqual(addresstable[1].values(), [a2.address_id, None, 'lala@test.org'])
def testmanytomany(self):
items = orderitems
self.assert_(r.user_id == r['user_id'] == r[self.users.c.user_id] == 2)
self.assert_(r.user_name == r['user_name'] == r[self.users.c.user_name] == 'jack')
+ def test_column_dict_behaviours(self):
+ self.users.insert().execute(user_id=1, user_name='foo')
+ r = self.users.select().execute().fetchone()
+ self.assertEqual(r.keys(), ['user_id', 'user_name'])
+ self.assertEqual(r.items(), [('user_id', 1), ('user_name', 'foo')])
+ self.assertEqual(r.values(), [1, 'foo'])
+ self.assertEqual(zip(r.itervalues(), r.iterkeys()), zip(r.values(), r.keys()))
+ self.assertEqual(repr(r), "{'user_name': u'foo', 'user_id': 1}")
+
def test_column_accessor_shadow(self):
shadowed = Table('test_shadowed', db,
Column('shadow_id', INT, primary_key = True),
Column('shadow_name', VARCHAR(20)),
Column('parent', VARCHAR(20)),
Column('row', VARCHAR(20)),
+ Column('__parent', VARCHAR(20)),
+ Column('__row', VARCHAR(20)),
redefine = True
)
shadowed.create()
- shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow')
+ shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', __parent='Hidden parent', __row='Hidden row')
r = shadowed.select(shadowed.c.shadow_id==1).execute().fetchone()
self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1)
self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow')
- self.failIf(r.parent == 'The Light')
- self.failIf(r.row == 'Without light there is no shadow')
- self.assert_(isinstance(r.parent, ResultProxy))
- self.assert_(isinstance(r.row, tuple))
- self.assert_(r['parent'] == r[shadowed.c.parent] == 'The Light')
- self.assert_(r['row'] == r[shadowed.c.row] == 'Without light there is no shadow')
+ self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light')
+ self.assert_(r.row == r['row'] == r[shadowed.c.row] == 'Without light there is no shadow')
+ self.assert_(r['__parent'] == 'Hidden parent')
+ self.assert_(r['__row'] == 'Hidden row')
+ try:
+ print r.__parent, r.__row
+ self.fail('Should not allow access to private attributes')
+ except AttributeError:
+ pass # expected
if __name__ == "__main__":