From: Mike Bayer Date: Fri, 30 Jan 2009 20:48:54 +0000 (+0000) Subject: - modernized UnicodeTest/BinaryTest X-Git-Tag: rel_0_6_6~296 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=65ad427ae11d5c72d6a05918cb7bd9897a13e993;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - modernized UnicodeTest/BinaryTest - removed PickleType homegrown comparison method - testtypes passes in 3k for sqlite/pg8000 --- diff --git a/06CHANGES b/06CHANGES index f4baaf3602..d8a4884636 100644 --- a/06CHANGES +++ b/06CHANGES @@ -26,4 +26,10 @@ and if on an older version of SQL Server, the operation fails. The behavior is exactly the same except the error is raised by SQL server instead of the dialect, and no flag setting is required to enable it. - - using new dialect.initialize() feature to set up version-dependent behavior. \ No newline at end of file + - using new dialect.initialize() feature to set up version-dependent behavior. + +- types + - PickleType now uses == for comparison of values when mutable=True, + unless the "comparator" argument with a comparsion function is specified to the type. + Objects being pickled will be compared based on identity (which defeats the purpose + of mutable=True) if __eq__() is not overridden or a comparison function is not provided. diff --git a/lib/sqlalchemy/dialects/postgres/base.py b/lib/sqlalchemy/dialects/postgres/base.py index 5221df5494..bab875d686 100644 --- a/lib/sqlalchemy/dialects/postgres/base.py +++ b/lib/sqlalchemy/dialects/postgres/base.py @@ -64,7 +64,7 @@ option to the Index constructor:: """ -import re, string +import re from sqlalchemy import sql, schema, exc, util from sqlalchemy.engine import base, default @@ -251,7 +251,7 @@ class PGCompiler(compiler.SQLCompiler): else: yield c columns = [self.process(c, within_columns_clause=True) for c in flatten_columnlist(returning_cols)] - text += ' RETURNING ' + string.join(columns, ', ') + text += ' RETURNING ' + ', '.join(columns) return text def visit_update(self, update_stmt): @@ -306,7 +306,7 @@ class PGDDLCompiler(compiler.DDLCompiler): text += "INDEX %s ON %s (%s)" \ % (preparer.quote(self._validate_identifier(index.name, True), index.quote), preparer.format_table(index.table), - string.join([preparer.format_column(c) for c in index.columns], ', ')) + ', '.join([preparer.format_column(c) for c in index.columns])) whereclause = index.kwargs.get('postgres_where', None) if whereclause is not None: diff --git a/lib/sqlalchemy/types.py b/lib/sqlalchemy/types.py index c764feda97..aaea145fd3 100644 --- a/lib/sqlalchemy/types.py +++ b/lib/sqlalchemy/types.py @@ -844,7 +844,11 @@ class PickleType(MutableType, TypeDecorator): loads = self.pickler.loads if value is None: return None + # Py3K + #return loads(value) + # Py2K return loads(str(value)) + # end Py2K def copy_value(self, value): if self.mutable: @@ -855,9 +859,6 @@ class PickleType(MutableType, TypeDecorator): def compare_values(self, x, y): if self.comparator: return self.comparator(x, y) - elif self.mutable and not hasattr(x, '__eq__') and x is not None: - util.warn_deprecated("Objects stored with PickleType when mutable=True must implement __eq__() for reliable comparison.") - return self.pickler.dumps(x, self.protocol) == self.pickler.dumps(y, self.protocol) else: return x == y diff --git a/test/sql/testtypes.py b/test/sql/testtypes.py index 97593f856b..1a78e463ae 100644 --- a/test/sql/testtypes.py +++ b/test/sql/testtypes.py @@ -1,3 +1,4 @@ +# coding: utf-8 import decimal import testenv; testenv.configure_for_tests() import datetime, os, pickleable, re @@ -293,117 +294,82 @@ class ColumnsTest(TestBase, AssertsExecutionResults): class UnicodeTest(TestBase, AssertsExecutionResults): """tests the Unicode type. also tests the TypeDecorator with instances in the types package.""" + def setUpAll(self): - global unicode_table + global unicode_table, metadata metadata = MetaData(testing.db) unicode_table = Table('unicode_table', metadata, Column('id', Integer, Sequence('uni_id_seq', optional=True), primary_key=True), Column('unicode_varchar', Unicode(250)), Column('unicode_text', UnicodeText), - Column('plain_varchar', String(250)) ) - unicode_table.create() + metadata.create_all() + def tearDownAll(self): - unicode_table.drop() + metadata.drop_all() def tearDown(self): unicode_table.delete().execute() def test_round_trip(self): - assert unicode_table.c.unicode_varchar.type.length == 250 - rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n' - unicodedata = rawdata.decode('utf-8') + unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »" + + unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata) - if testing.db.dialect.supports_unicode_binds: - rawdata = "something" - - unicode_table.insert().execute(unicode_varchar=unicodedata, - unicode_text=unicodedata, - plain_varchar=rawdata) x = unicode_table.select().execute().fetchone() self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata) self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata) - if isinstance(x['plain_varchar'], unicode): - assert testing.db.dialect.supports_unicode_binds - else: - self.assert_(not isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == rawdata) - def test_union(self): """ensure compiler processing works for UNIONs""" - rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n' - unicodedata = rawdata.decode('utf-8') - if testing.db.dialect.supports_unicode_binds: - rawdata = "something" - unicode_table.insert().execute(unicode_varchar=unicodedata, - unicode_text=unicodedata, - plain_varchar=rawdata) + unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »" + + unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata) x = union(select([unicode_table.c.unicode_varchar]), select([unicode_table.c.unicode_varchar])).execute().fetchone() self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata) - def test_assertions(self): - self.assertRaisesMessage(exc.SAWarning, "Unicode type received non-unicode bind param value 'not unicode'", - unicode_table.insert().execute, unicode_varchar='not unicode' - ) - - unicode_engine = engines.utf8_engine(options={'convert_unicode':True, - 'assert_unicode':True}) - try: - self.assertRaisesMessage(exc.InvalidRequestError, "Unicode type received non-unicode bind param value 'im not unicode'", - unicode_engine.execute, unicode_table.insert(), plain_varchar='im not unicode' - ) - - @testing.emits_warning('.*non-unicode bind') - def warns(): - # test that data still goes in if warning is emitted.... - unicode_table.insert().execute(unicode_varchar='not unicode') - assert (select([unicode_table.c.unicode_varchar]).execute().fetchall() == [('not unicode', )]) - warns() - - finally: - unicode_engine.dispose() - - @testing.fails_on('oracle', 'FIXME: unknown') + @testing.fails_on('oracle', 'oracle converts empty strings to a blank space') def test_blank_strings(self): unicode_table.insert().execute(unicode_varchar=u'') assert select([unicode_table.c.unicode_varchar]).scalar() == u'' - def test_engine_parameter(self): - """tests engine-wide unicode conversion""" + def test_parameters(self): + """test the dialect convert_unicode parameters.""" + + unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »" + + u = Unicode(assert_unicode=True) + uni = u.dialect_impl(testing.db.dialect).bind_processor(testing.db.dialect) + # Py3K + #self.assertRaises(exc.InvalidRequestError, uni, b'x') + # Py2K + self.assertRaises(exc.InvalidRequestError, uni, 'x') + # end Py2K + + u = Unicode() + uni = u.dialect_impl(testing.db.dialect).bind_processor(testing.db.dialect) + # Py3K + #self.assertRaises(exc.SAWarning, uni, b'x') + # Py2K + self.assertRaises(exc.SAWarning, uni, 'x') + # end Py2K + + unicode_engine = engines.utf8_engine(options={'convert_unicode':True,'assert_unicode':True}) + unicode_engine.dialect.supports_unicode_binds = False - prev_unicode = testing.db.engine.dialect.convert_unicode - prev_assert = testing.db.engine.dialect.assert_unicode - try: - testing.db.engine.dialect.convert_unicode = True - testing.db.engine.dialect.assert_unicode = False - rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n' - unicodedata = rawdata.decode('utf-8') - if testing.db.dialect.supports_unicode_binds: - rawdata = "something" - unicode_table.insert().execute(unicode_varchar=unicodedata, - unicode_text=unicodedata, - plain_varchar=rawdata) - x = unicode_table.select().execute().fetchone() - print 0, repr(unicodedata) - print 1, repr(x['unicode_varchar']) - print 2, repr(x['unicode_text']) - print 3, repr(x['plain_varchar']) - self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata) - self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata) - if not testing.db.dialect.supports_unicode_binds: - self.assert_(isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == unicodedata) - finally: - testing.db.engine.dialect.convert_unicode = prev_unicode - testing.db.engine.dialect.convert_unicode = prev_assert - - @testing.crashes('oracle', 'FIXME: unknown, verify not fails_on') - @testing.fails_on('firebird', 'Data type unknown') - def test_length_function(self): - """checks the database correctly understands the length of a unicode string""" - teststr = u'aaa\x1234' - self.assert_(testing.db.func.length(teststr).scalar() == len(teststr)) + s = String() + uni = s.dialect_impl(unicode_engine.dialect).bind_processor(unicode_engine.dialect) + # Py3K + #self.assertRaises(exc.InvalidRequestError, uni, b'x') + #assert isinstance(uni(unicodedata), bytes) + # Py2K + self.assertRaises(exc.InvalidRequestError, uni, 'x') + assert isinstance(uni(unicodedata), str) + # end Py2K + + assert uni(unicodedata) == unicodedata.encode('utf-8') class BinaryTest(TestBase, AssertsExecutionResults): __excluded_on__ = ( @@ -427,15 +393,15 @@ class BinaryTest(TestBase, AssertsExecutionResults): return value binary_table = Table('binary_table', MetaData(testing.db), - Column('primary_id', Integer, Sequence('binary_id_seq', optional=True), primary_key=True), - Column('data', Binary), - Column('data_slice', Binary(100)), - Column('misc', String(30)), - # construct PickleType with non-native pickle module, since cPickle uses relative module - # loading and confuses this test's parent package 'sql' with the 'sqlalchemy.sql' package relative - # to the 'types' module - Column('pickled', PickleType), - Column('mypickle', MyPickleType) + Column('primary_id', Integer, Sequence('binary_id_seq', optional=True), primary_key=True), + Column('data', Binary), + Column('data_slice', Binary(100)), + Column('misc', String(30)), + # construct PickleType with non-native pickle module, since cPickle uses relative module + # loading and confuses this test's parent package 'sql' with the 'sqlalchemy.sql' package relative + # to the 'types' module + Column('pickled', PickleType), + Column('mypickle', MyPickleType) ) binary_table.create() @@ -446,24 +412,38 @@ class BinaryTest(TestBase, AssertsExecutionResults): binary_table.drop() @testing.fails_on('mssql', 'MSSQl BINARY type right pads the fixed length with \x00') - def testbinary(self): + def test_round_trip(self): testobj1 = pickleable.Foo('im foo 1') testobj2 = pickleable.Foo('im foo 2') testobj3 = pickleable.Foo('im foo 3') stream1 =self.load_stream('binary_data_one.dat') stream2 =self.load_stream('binary_data_two.dat') - binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_slice=stream1[0:100], pickled=testobj1, mypickle=testobj3) - binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_slice=stream2[0:99], pickled=testobj2) - binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_slice=stream2[0:99], pickled=None) + binary_table.insert().execute( + primary_id=1, + misc='binary_data_one.dat', + data=stream1, + data_slice=stream1[0:100], + pickled=testobj1, + mypickle=testobj3) + binary_table.insert().execute( + primary_id=2, + misc='binary_data_two.dat', + data=stream2, + data_slice=stream2[0:99], + pickled=testobj2) + binary_table.insert().execute( + primary_id=3, + misc='binary_data_two.dat', + data=None, + data_slice=stream2[0:99], + pickled=None) for stmt in ( binary_table.select(order_by=binary_table.c.primary_id), text("select * from binary_table order by binary_table.primary_id", typemap={'pickled':PickleType, 'mypickle':MyPickleType}, bind=testing.db) ): l = stmt.execute().fetchall() - print type(stream1), type(l[0]['data']), type(l[0]['data_slice']) - print len(stream1), len(l[0]['data']), len(l[0]['data_slice']) self.assertEquals(list(stream1), list(l[0]['data'])) self.assertEquals(list(stream1[0:100]), list(l[0]['data_slice'])) self.assertEquals(list(stream2), list(l[1]['data'])) @@ -472,10 +452,9 @@ class BinaryTest(TestBase, AssertsExecutionResults): self.assertEquals(testobj3.moredata, l[0]['mypickle'].moredata) self.assertEquals(l[0]['mypickle'].stuff, 'this is the right stuff') - def load_stream(self, name, len=12579): + def load_stream(self, name): f = os.path.join(os.path.dirname(testenv.__file__), name) - # put a number less than the typical MySQL default BLOB size - return file(f).read(len) + return open(f, mode='rb').read() class ExpressionTest(TestBase, AssertsExecutionResults): def setUpAll(self): @@ -828,30 +807,6 @@ class BooleanTest(TestBase, AssertsExecutionResults): assert(res2==[(2, False)]) class PickleTest(TestBase): - def test_noeq_deprecation(self): - p1 = PickleType() - - self.assertRaises(DeprecationWarning, - p1.compare_values, pickleable.BarWithoutCompare(1, 2), pickleable.BarWithoutCompare(1, 2) - ) - - self.assertRaises(DeprecationWarning, - p1.compare_values, pickleable.OldSchoolWithoutCompare(1, 2), pickleable.OldSchoolWithoutCompare(1, 2) - ) - - @testing.uses_deprecated() - def go(): - # test actual dumps comparison - assert p1.compare_values(pickleable.BarWithoutCompare(1, 2), pickleable.BarWithoutCompare(1, 2)) - assert p1.compare_values(pickleable.OldSchoolWithoutCompare(1, 2), pickleable.OldSchoolWithoutCompare(1, 2)) - go() - - assert p1.compare_values({1:2, 3:4}, {3:4, 1:2}) - - p2 = PickleType(mutable=False) - assert not p2.compare_values(pickleable.BarWithoutCompare(1, 2), pickleable.BarWithoutCompare(1, 2)) - assert not p2.compare_values(pickleable.OldSchoolWithoutCompare(1, 2), pickleable.OldSchoolWithoutCompare(1, 2)) - def test_eq_comparison(self): p1 = PickleType() diff --git a/test/testlib/sa_unittest.py b/test/testlib/sa_unittest.py index 7eb2c07271..85ee9beb23 100644 --- a/test/testlib/sa_unittest.py +++ b/test/testlib/sa_unittest.py @@ -517,8 +517,12 @@ class TestLoader: elif (isinstance(obj, (type, types.ClassType)) and issubclass(obj, TestCase)): return self.loadTestsFromTestCase(obj) + # Py3K + #elif type(obj) == types.FunctionType: + # Py2K elif type(obj) == types.UnboundMethodType: return parent(obj.__name__) + # end Py2K elif isinstance(obj, TestSuite): return obj elif callable(obj):