]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
RowProxy changes - added keys(), used keys() to add more dictionary-like behaviour...
authorRobert Leftwich <rtl@pobox.com>
Sat, 28 Jan 2006 22:24:16 +0000 (22:24 +0000)
committerRobert Leftwich <rtl@pobox.com>
Sat, 28 Jan 2006 22:24:16 +0000 (22:24 +0000)
lib/sqlalchemy/engine.py
test/objectstore.py
test/query.py

index d384ba2b3953828188329205d232e8914b271854..3af02616b33248bd50137c72a8fcd7899765ae38 100644 (file)
@@ -631,6 +631,7 @@ class ResultProxy:
         self.rowcount = engine.context.rowcount
         metadata = cursor.description
         self.props = {}
+        self.keys = []
         i = 0
         if metadata is not None:
             for item in metadata:
@@ -644,6 +645,7 @@ class ResultProxy:
                     raise "None for metadata " + colname
                 if self.props.setdefault(colname, rec) is not rec:
                     self.props[colname] = (ResultProxy.AmbiguousColumn(colname), 0)
+                self.keys.append(colname)
                 self.props[i] = rec
                 i+=1
 
@@ -684,22 +686,36 @@ class RowProxy:
     """proxies a single cursor row for a parent ResultProxy."""
     def __init__(self, parent, row):
         """RowProxy objects are constructed by ResultProxy objects."""
-        self.parent = parent
-        self.row = row
+        self.__parent = parent
+        self.__row = row
+    def keys(self):
+        return self.__parent.keys
     def __iter__(self):
-        for i in range(0, len(self.row)):
-            yield self.parent._get_col(self.row, i)
+        for k in self.keys():
+            yield k
     def __eq__(self, other):
-        return (other is self) or (other == tuple([self.parent._get_col(self.row, key) for key in range(0, len(self.row))]))
+        return (other is self) or (other == tuple([self.__parent._get_col(self.__row, key) for key in range(0, len(self.__row))]))
     def __repr__(self):
-        return repr(tuple([self.parent._get_col(self.row, key) for key in range(0, len(self.row))]))
+        return repr(dict(self.iteritems()))
     def __getitem__(self, key):
-        return self.parent._get_col(self.row, key)
+        return self.__parent._get_col(self.__row, key)
     def __getattr__(self, name):
         try:
-            return self.parent._get_col(self.row, name)
+            return self[name]
         except:
             raise AttributeError
+    def iteritems(self):
+        for k in self:
+            yield (k, self[k])
+    def iterkeys(self):
+        return self.__iter__()
+    def itervalues(self):
+        for _, v in self.iteritems():
+            yield v
+    def values(self):
+        return [v for _, v in self.iteritems()]
+    def items(self):
+        return list(self.iteritems())
 
 
 
index ddd39b35d5f24211387b4063ac9c8ecb58c1dc85..4e0721b9719a80bce63b372d70060a3bc7497988 100644 (file)
@@ -220,18 +220,18 @@ class SaveTest(AssertMixin):
         objectstore.uow().commit()
 
         usertable = users.select(users.c.user_id.in_(u.foo_id)).execute().fetchall()
-        self.assert_(usertable[0].row == (u.foo_id, 'multitester'))
+        self.assertEqual(usertable[0].values(), [u.foo_id, 'multitester'])
         addresstable = addresses.select(addresses.c.address_id.in_(u.address_id)).execute().fetchall()
-        self.assert_(addresstable[0].row == (u.address_id, u.foo_id, 'multi@test.org'))
+        self.assertEqual(addresstable[0].values(), [u.address_id, u.foo_id, 'multi@test.org'])
 
         u.email = 'lala@hey.com'
         u.user_name = 'imnew'
         objectstore.uow().commit()
 
         usertable = users.select(users.c.user_id.in_(u.foo_id)).execute().fetchall()
-        self.assert_(usertable[0].row == (u.foo_id, 'imnew'))
+        self.assertEqual(usertable[0].values(), [u.foo_id, 'imnew'])
         addresstable = addresses.select(addresses.c.address_id.in_(u.address_id)).execute().fetchall()
-        self.assert_(addresstable[0].row == (u.address_id, u.foo_id, 'lala@hey.com'))
+        self.assertEqual(addresstable[0].values(), [u.address_id, u.foo_id, 'lala@hey.com'])
 
         u = m.select(users.c.user_id==u.foo_id)[0]
         self.echo( repr(u.__dict__))
@@ -402,7 +402,7 @@ class SaveTest(AssertMixin):
                 
         ])
         l = sql.select([users, addresses], sql.and_(users.c.user_id==addresses.c.address_id, addresses.c.address_id==a.address_id)).execute()
-        self.echo( repr(l.fetchone().row))
+        self.echo( repr(l.fetchone().values()))
 
         
 
@@ -425,10 +425,10 @@ class SaveTest(AssertMixin):
         objectstore.uow().commit()
 
         usertable = users.select(users.c.user_id.in_(u.user_id)).execute().fetchall()
-        self.assert_(usertable[0].row == (u.user_id, 'one2manytester'))
+        self.assertEqual(usertable[0].values(), [u.user_id, 'one2manytester'])
         addresstable = addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id), order_by=[addresses.c.email_address]).execute().fetchall()
-        self.assert_(addresstable[0].row == (a2.address_id, u.user_id, 'lala@test.org'))
-        self.assert_(addresstable[1].row == (a.address_id, u.user_id, 'one2many@test.org'))
+        self.assertEqual(addresstable[0].values(), [a2.address_id, u.user_id, 'lala@test.org'])
+        self.assertEqual(addresstable[1].values(), [a.address_id, u.user_id, 'one2many@test.org'])
 
         userid = u.user_id
         addressid = a2.address_id
@@ -439,7 +439,7 @@ class SaveTest(AssertMixin):
 
         
         addresstable = addresses.select(addresses.c.address_id == addressid).execute().fetchall()
-        self.assert_(addresstable[0].row == (addressid, userid, 'somethingnew@foo.com'))
+        self.assertEqual(addresstable[0].values(), [addressid, userid, 'somethingnew@foo.com'])
         self.assert_(u.user_id == userid and a2.address_id == addressid)
 
     def testmapperswitch(self):
@@ -570,15 +570,15 @@ class SaveTest(AssertMixin):
         u.addresses.append(a2)
         m.save(u)
         addresstable = addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id)).execute().fetchall()
-        self.echo( repr(addresstable[0].row))
-        self.assert_(addresstable[0].row == (a.address_id, u.user_id, 'one2many@test.org'))
-        self.assert_(addresstable[1].row == (a2.address_id, u.user_id, 'lala@test.org'))
+        self.echo( repr(addresstable[0].values()))
+        self.assertEqual(addresstable[0].values(), [a.address_id, u.user_id, 'one2many@test.org'])
+        self.assertEqual(addresstable[1].values(), [a2.address_id, u.user_id, 'lala@test.org'])
         del u.addresses[1]
         m.save(u)
         addresstable = addresses.select(addresses.c.address_id.in_(a.address_id, a2.address_id)).execute().fetchall()
         self.echo( repr(addresstable))
-        self.assert_(addresstable[0].row == (a.address_id, u.user_id, 'one2many@test.org'))
-        self.assert_(addresstable[1].row == (a2.address_id, None, 'lala@test.org'))
+        self.assertEqual(addresstable[0].values(), [a.address_id, u.user_id, 'one2many@test.org'])
+        self.assertEqual(addresstable[1].values(), [a2.address_id, None, 'lala@test.org'])
 
     def testmanytomany(self):
         items = orderitems
index e603b002b2791bc12815ca15eb738f7c722a76a7..ceb50ebd00b7bb93497ec5984805520f1cb1137a 100644 (file)
@@ -108,25 +108,39 @@ class QueryTest(PersistTest):
         self.assert_(r.user_id == r['user_id'] == r[self.users.c.user_id] == 2)
         self.assert_(r.user_name == r['user_name'] == r[self.users.c.user_name] == 'jack')
 
+    def test_column_dict_behaviours(self):
+        self.users.insert().execute(user_id=1, user_name='foo')
+        r = self.users.select().execute().fetchone()
+        self.assertEqual(r.keys(), ['user_id', 'user_name'])
+        self.assertEqual(r.items(), [('user_id', 1), ('user_name', 'foo')])
+        self.assertEqual(r.values(), [1, 'foo'])
+        self.assertEqual(zip(r.itervalues(), r.iterkeys()), zip(r.values(), r.keys()))
+        self.assertEqual(repr(r), "{'user_name': u'foo', 'user_id': 1}")
+        
     def test_column_accessor_shadow(self):
         shadowed = Table('test_shadowed', db,
                          Column('shadow_id', INT, primary_key = True),
                          Column('shadow_name', VARCHAR(20)),
                          Column('parent', VARCHAR(20)),
                          Column('row', VARCHAR(20)),
+                         Column('__parent', VARCHAR(20)),
+                         Column('__row', VARCHAR(20)),
             redefine = True
         )
         shadowed.create()
-        shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light',  row='Without light there is no shadow')
+        shadowed.insert().execute(shadow_id=1, shadow_name='The Shadow', parent='The Light', row='Without light there is no shadow', __parent='Hidden parent', __row='Hidden row')
         r = shadowed.select(shadowed.c.shadow_id==1).execute().fetchone()
         self.assert_(r.shadow_id == r['shadow_id'] == r[shadowed.c.shadow_id] == 1)
         self.assert_(r.shadow_name == r['shadow_name'] == r[shadowed.c.shadow_name] == 'The Shadow')
-        self.failIf(r.parent == 'The Light')
-        self.failIf(r.row == 'Without light there is no shadow')
-        self.assert_(isinstance(r.parent, ResultProxy))
-        self.assert_(isinstance(r.row, tuple))
-        self.assert_(r['parent'] == r[shadowed.c.parent] == 'The Light')
-        self.assert_(r['row'] == r[shadowed.c.row] == 'Without light there is no shadow')
+        self.assert_(r.parent == r['parent'] == r[shadowed.c.parent] == 'The Light')
+        self.assert_(r.row == r['row'] == r[shadowed.c.row] == 'Without light there is no shadow')
+        self.assert_(r['__parent'] == 'Hidden parent')
+        self.assert_(r['__row'] == 'Hidden row')
+        try:
+            print r.__parent, r.__row
+            self.fail('Should not allow access to private attributes')
+        except AttributeError:
+            pass # expected
         
         
 if __name__ == "__main__":