]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- modernized UnicodeTest/BinaryTest
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 30 Jan 2009 20:48:54 +0000 (20:48 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 30 Jan 2009 20:48:54 +0000 (20:48 +0000)
- removed PickleType homegrown comparison method
- testtypes passes in 3k for sqlite/pg8000

06CHANGES
lib/sqlalchemy/dialects/postgres/base.py
lib/sqlalchemy/types.py
test/sql/testtypes.py
test/testlib/sa_unittest.py

index f4baaf3602bc3c2564975597c44817690b2efd1a..d8a48846363ce5b57e3d9d160d72181405fb7834 100644 (file)
--- a/06CHANGES
+++ b/06CHANGES
       and if on an older version of SQL Server, the operation fails.  The behavior is exactly
       the same except the error is raised by SQL server instead of the dialect, and no
       flag setting is required to enable it.
-    - using new dialect.initialize() feature to set up version-dependent behavior.
\ No newline at end of file
+    - using new dialect.initialize() feature to set up version-dependent behavior.
+    
+- types
+    - PickleType now uses == for comparison of values when mutable=True, 
+      unless the "comparator" argument with a comparsion function is specified to the type.  
+      Objects being pickled will be compared based on identity (which defeats the purpose
+      of mutable=True) if __eq__() is not overridden or a comparison function is not provided.
index 5221df5494c4268e99f9e47ffb99b80f6e822dad..bab875d6867070df21b2187d4349a3f0bb8af561 100644 (file)
@@ -64,7 +64,7 @@ option to the Index constructor::
 
 """
 
-import re, string
+import re
 
 from sqlalchemy import sql, schema, exc, util
 from sqlalchemy.engine import base, default
@@ -251,7 +251,7 @@ class PGCompiler(compiler.SQLCompiler):
                 else:
                     yield c
         columns = [self.process(c, within_columns_clause=True) for c in flatten_columnlist(returning_cols)]
-        text += ' RETURNING ' + string.join(columns, ', ')
+        text += ' RETURNING ' + ', '.join(columns)
         return text
 
     def visit_update(self, update_stmt):
@@ -306,7 +306,7 @@ class PGDDLCompiler(compiler.DDLCompiler):
         text += "INDEX %s ON %s (%s)" \
                     % (preparer.quote(self._validate_identifier(index.name, True), index.quote),
                        preparer.format_table(index.table),
-                       string.join([preparer.format_column(c) for c in index.columns], ', '))
+                       ', '.join([preparer.format_column(c) for c in index.columns]))
                        
         whereclause = index.kwargs.get('postgres_where', None)
         if whereclause is not None:
index c764feda971ee560d94a52f72a678d1595b1b221..aaea145fd3d9c5f43474223c3f91dd7b9db0f697 100644 (file)
@@ -844,7 +844,11 @@ class PickleType(MutableType, TypeDecorator):
         loads = self.pickler.loads
         if value is None:
             return None
+        # Py3K
+        #return loads(value)
+        # Py2K
         return loads(str(value))
+        # end Py2K
 
     def copy_value(self, value):
         if self.mutable:
@@ -855,9 +859,6 @@ class PickleType(MutableType, TypeDecorator):
     def compare_values(self, x, y):
         if self.comparator:
             return self.comparator(x, y)
-        elif self.mutable and not hasattr(x, '__eq__') and x is not None:
-            util.warn_deprecated("Objects stored with PickleType when mutable=True must implement __eq__() for reliable comparison.")
-            return self.pickler.dumps(x, self.protocol) == self.pickler.dumps(y, self.protocol)
         else:
             return x == y
 
index 97593f856be9f6c1641a2c709c0e4863e7675e4a..1a78e463ae55d1fc9692303bdd6057256de1ce68 100644 (file)
@@ -1,3 +1,4 @@
+# coding: utf-8
 import decimal
 import testenv; testenv.configure_for_tests()
 import datetime, os, pickleable, re
@@ -293,117 +294,82 @@ class ColumnsTest(TestBase, AssertsExecutionResults):
 
 class UnicodeTest(TestBase, AssertsExecutionResults):
     """tests the Unicode type.  also tests the TypeDecorator with instances in the types package."""
+
     def setUpAll(self):
-        global unicode_table
+        global unicode_table, metadata
         metadata = MetaData(testing.db)
         unicode_table = Table('unicode_table', metadata,
             Column('id', Integer, Sequence('uni_id_seq', optional=True), primary_key=True),
             Column('unicode_varchar', Unicode(250)),
             Column('unicode_text', UnicodeText),
-            Column('plain_varchar', String(250))
             )
-        unicode_table.create()
+        metadata.create_all()
+        
     def tearDownAll(self):
-        unicode_table.drop()
+        metadata.drop_all()
 
     def tearDown(self):
         unicode_table.delete().execute()
 
     def test_round_trip(self):
-        assert unicode_table.c.unicode_varchar.type.length == 250
-        rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
-        unicodedata = rawdata.decode('utf-8')
+        unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
+        
+        unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata)
         
-        if testing.db.dialect.supports_unicode_binds:
-            rawdata = "something"
-            
-        unicode_table.insert().execute(unicode_varchar=unicodedata,
-                                       unicode_text=unicodedata,
-                                       plain_varchar=rawdata)
         x = unicode_table.select().execute().fetchone()
         self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
         self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata)
 
-        if isinstance(x['plain_varchar'], unicode):
-            assert testing.db.dialect.supports_unicode_binds
-        else:
-            self.assert_(not isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == rawdata)
-
     def test_union(self):
         """ensure compiler processing works for UNIONs"""
 
-        rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
-        unicodedata = rawdata.decode('utf-8')
-        if testing.db.dialect.supports_unicode_binds:
-            rawdata = "something"
-        unicode_table.insert().execute(unicode_varchar=unicodedata,
-                                       unicode_text=unicodedata,
-                                       plain_varchar=rawdata)
+        unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
+
+        unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata)
                                        
         x = union(select([unicode_table.c.unicode_varchar]), select([unicode_table.c.unicode_varchar])).execute().fetchone()
         self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
 
-    def test_assertions(self):
-        self.assertRaisesMessage(exc.SAWarning, "Unicode type received non-unicode bind param value 'not unicode'", 
-            unicode_table.insert().execute, unicode_varchar='not unicode'
-        )
-
-        unicode_engine = engines.utf8_engine(options={'convert_unicode':True,
-                                                      'assert_unicode':True})
-        try:
-            self.assertRaisesMessage(exc.InvalidRequestError, "Unicode type received non-unicode bind param value 'im not unicode'", 
-                unicode_engine.execute, unicode_table.insert(), plain_varchar='im not unicode'
-            )
-
-            @testing.emits_warning('.*non-unicode bind')
-            def warns():
-                # test that data still goes in if warning is emitted....
-                unicode_table.insert().execute(unicode_varchar='not unicode')
-                assert (select([unicode_table.c.unicode_varchar]).execute().fetchall() == [('not unicode', )])
-            warns()
-
-        finally:
-            unicode_engine.dispose()
-
-    @testing.fails_on('oracle', 'FIXME: unknown')
+    @testing.fails_on('oracle', 'oracle converts empty strings to a blank space')
     def test_blank_strings(self):
         unicode_table.insert().execute(unicode_varchar=u'')
         assert select([unicode_table.c.unicode_varchar]).scalar() == u''
 
-    def test_engine_parameter(self):
-        """tests engine-wide unicode conversion"""
+    def test_parameters(self):
+        """test the dialect convert_unicode parameters."""
+
+        unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
+
+        u = Unicode(assert_unicode=True)
+        uni = u.dialect_impl(testing.db.dialect).bind_processor(testing.db.dialect)
+        # Py3K
+        #self.assertRaises(exc.InvalidRequestError, uni, b'x')
+        # Py2K
+        self.assertRaises(exc.InvalidRequestError, uni, 'x')
+        # end Py2K
+
+        u = Unicode()
+        uni = u.dialect_impl(testing.db.dialect).bind_processor(testing.db.dialect)
+        # Py3K
+        #self.assertRaises(exc.SAWarning, uni, b'x')
+        # Py2K
+        self.assertRaises(exc.SAWarning, uni, 'x')
+        # end Py2K
+
+        unicode_engine = engines.utf8_engine(options={'convert_unicode':True,'assert_unicode':True})
+        unicode_engine.dialect.supports_unicode_binds = False
         
-        prev_unicode = testing.db.engine.dialect.convert_unicode
-        prev_assert = testing.db.engine.dialect.assert_unicode
-        try:
-            testing.db.engine.dialect.convert_unicode = True
-            testing.db.engine.dialect.assert_unicode = False
-            rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
-            unicodedata = rawdata.decode('utf-8')
-            if testing.db.dialect.supports_unicode_binds:
-                rawdata = "something"
-            unicode_table.insert().execute(unicode_varchar=unicodedata,
-                                           unicode_text=unicodedata,
-                                           plain_varchar=rawdata)
-            x = unicode_table.select().execute().fetchone()
-            print 0, repr(unicodedata)
-            print 1, repr(x['unicode_varchar'])
-            print 2, repr(x['unicode_text'])
-            print 3, repr(x['plain_varchar'])
-            self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
-            self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata)
-            if not testing.db.dialect.supports_unicode_binds:
-                self.assert_(isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == unicodedata)
-        finally:
-            testing.db.engine.dialect.convert_unicode = prev_unicode
-            testing.db.engine.dialect.convert_unicode = prev_assert
-
-    @testing.crashes('oracle', 'FIXME: unknown, verify not fails_on')
-    @testing.fails_on('firebird', 'Data type unknown')
-    def test_length_function(self):
-        """checks the database correctly understands the length of a unicode string"""
-        teststr = u'aaa\x1234'
-        self.assert_(testing.db.func.length(teststr).scalar() == len(teststr))
+        s = String()
+        uni = s.dialect_impl(unicode_engine.dialect).bind_processor(unicode_engine.dialect)
+        # Py3K
+        #self.assertRaises(exc.InvalidRequestError, uni, b'x')
+        #assert isinstance(uni(unicodedata), bytes)
+        # Py2K
+        self.assertRaises(exc.InvalidRequestError, uni, 'x')
+        assert isinstance(uni(unicodedata), str)
+        # end Py2K
+        
+        assert uni(unicodedata) == unicodedata.encode('utf-8')
 
 class BinaryTest(TestBase, AssertsExecutionResults):
     __excluded_on__ = (
@@ -427,15 +393,15 @@ class BinaryTest(TestBase, AssertsExecutionResults):
                 return value
 
         binary_table = Table('binary_table', MetaData(testing.db),
-        Column('primary_id', Integer, Sequence('binary_id_seq', optional=True), primary_key=True),
-        Column('data', Binary),
-        Column('data_slice', Binary(100)),
-        Column('misc', String(30)),
-        # construct PickleType with non-native pickle module, since cPickle uses relative module
-        # loading and confuses this test's parent package 'sql' with the 'sqlalchemy.sql' package relative
-        # to the 'types' module
-        Column('pickled', PickleType),
-        Column('mypickle', MyPickleType)
+            Column('primary_id', Integer, Sequence('binary_id_seq', optional=True), primary_key=True),
+            Column('data', Binary),
+            Column('data_slice', Binary(100)),
+            Column('misc', String(30)),
+            # construct PickleType with non-native pickle module, since cPickle uses relative module
+            # loading and confuses this test's parent package 'sql' with the 'sqlalchemy.sql' package relative
+            # to the 'types' module
+            Column('pickled', PickleType),
+            Column('mypickle', MyPickleType)
         )
         binary_table.create()
 
@@ -446,24 +412,38 @@ class BinaryTest(TestBase, AssertsExecutionResults):
         binary_table.drop()
 
     @testing.fails_on('mssql', 'MSSQl BINARY type right pads the fixed length with \x00')
-    def testbinary(self):
+    def test_round_trip(self):
         testobj1 = pickleable.Foo('im foo 1')
         testobj2 = pickleable.Foo('im foo 2')
         testobj3 = pickleable.Foo('im foo 3')
 
         stream1 =self.load_stream('binary_data_one.dat')
         stream2 =self.load_stream('binary_data_two.dat')
-        binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat',    data=stream1, data_slice=stream1[0:100], pickled=testobj1, mypickle=testobj3)
-        binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_slice=stream2[0:99], pickled=testobj2)
-        binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_slice=stream2[0:99], pickled=None)
+        binary_table.insert().execute(
+                            primary_id=1, 
+                            misc='binary_data_one.dat', 
+                            data=stream1, 
+                            data_slice=stream1[0:100], 
+                            pickled=testobj1, 
+                            mypickle=testobj3)
+        binary_table.insert().execute(
+                            primary_id=2, 
+                            misc='binary_data_two.dat', 
+                            data=stream2, 
+                            data_slice=stream2[0:99], 
+                            pickled=testobj2)
+        binary_table.insert().execute(
+                            primary_id=3, 
+                            misc='binary_data_two.dat', 
+                            data=None, 
+                            data_slice=stream2[0:99], 
+                            pickled=None)
 
         for stmt in (
             binary_table.select(order_by=binary_table.c.primary_id),
             text("select * from binary_table order by binary_table.primary_id", typemap={'pickled':PickleType, 'mypickle':MyPickleType}, bind=testing.db)
         ):
             l = stmt.execute().fetchall()
-            print type(stream1), type(l[0]['data']), type(l[0]['data_slice'])
-            print len(stream1), len(l[0]['data']), len(l[0]['data_slice'])
             self.assertEquals(list(stream1), list(l[0]['data']))
             self.assertEquals(list(stream1[0:100]), list(l[0]['data_slice']))
             self.assertEquals(list(stream2), list(l[1]['data']))
@@ -472,10 +452,9 @@ class BinaryTest(TestBase, AssertsExecutionResults):
             self.assertEquals(testobj3.moredata, l[0]['mypickle'].moredata)
             self.assertEquals(l[0]['mypickle'].stuff, 'this is the right stuff')
 
-    def load_stream(self, name, len=12579):
+    def load_stream(self, name):
         f = os.path.join(os.path.dirname(testenv.__file__), name)
-        # put a number less than the typical MySQL default BLOB size
-        return file(f).read(len)
+        return open(f, mode='rb').read()
 
 class ExpressionTest(TestBase, AssertsExecutionResults):
     def setUpAll(self):
@@ -828,30 +807,6 @@ class BooleanTest(TestBase, AssertsExecutionResults):
         assert(res2==[(2, False)])
 
 class PickleTest(TestBase):
-    def test_noeq_deprecation(self):
-        p1 = PickleType()
-        
-        self.assertRaises(DeprecationWarning, 
-            p1.compare_values, pickleable.BarWithoutCompare(1, 2), pickleable.BarWithoutCompare(1, 2)
-        )
-
-        self.assertRaises(DeprecationWarning, 
-            p1.compare_values, pickleable.OldSchoolWithoutCompare(1, 2), pickleable.OldSchoolWithoutCompare(1, 2)
-        )
-        
-        @testing.uses_deprecated()
-        def go():
-            # test actual dumps comparison
-            assert p1.compare_values(pickleable.BarWithoutCompare(1, 2), pickleable.BarWithoutCompare(1, 2))
-            assert p1.compare_values(pickleable.OldSchoolWithoutCompare(1, 2), pickleable.OldSchoolWithoutCompare(1, 2))
-        go()
-        
-        assert p1.compare_values({1:2, 3:4}, {3:4, 1:2})
-        
-        p2 = PickleType(mutable=False)
-        assert not p2.compare_values(pickleable.BarWithoutCompare(1, 2), pickleable.BarWithoutCompare(1, 2))
-        assert not p2.compare_values(pickleable.OldSchoolWithoutCompare(1, 2), pickleable.OldSchoolWithoutCompare(1, 2))
-        
     def test_eq_comparison(self):
         p1 = PickleType()
         
index 7eb2c07271265172be135efbf5b0976f10b5efa7..85ee9beb233f6b7d766bb765e575289fe0f72bcd 100644 (file)
@@ -517,8 +517,12 @@ class TestLoader:
         elif (isinstance(obj, (type, types.ClassType)) and
               issubclass(obj, TestCase)):
             return self.loadTestsFromTestCase(obj)
+        # Py3K
+        #elif type(obj) == types.FunctionType:
+        # Py2K
         elif type(obj) == types.UnboundMethodType:
             return parent(obj.__name__)
+        # end Py2K
         elif isinstance(obj, TestSuite):
             return obj
         elif callable(obj):