+# coding: utf-8
import decimal
import testenv; testenv.configure_for_tests()
import datetime, os, pickleable, re
class UnicodeTest(TestBase, AssertsExecutionResults):
"""tests the Unicode type. also tests the TypeDecorator with instances in the types package."""
+
def setUpAll(self):
- global unicode_table
+ global unicode_table, metadata
metadata = MetaData(testing.db)
unicode_table = Table('unicode_table', metadata,
Column('id', Integer, Sequence('uni_id_seq', optional=True), primary_key=True),
Column('unicode_varchar', Unicode(250)),
Column('unicode_text', UnicodeText),
- Column('plain_varchar', String(250))
)
- unicode_table.create()
+ metadata.create_all()
+
def tearDownAll(self):
- unicode_table.drop()
+ metadata.drop_all()
def tearDown(self):
unicode_table.delete().execute()
def test_round_trip(self):
- assert unicode_table.c.unicode_varchar.type.length == 250
- rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
- unicodedata = rawdata.decode('utf-8')
+ unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
+
+ unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata)
- if testing.db.dialect.supports_unicode_binds:
- rawdata = "something"
-
- unicode_table.insert().execute(unicode_varchar=unicodedata,
- unicode_text=unicodedata,
- plain_varchar=rawdata)
x = unicode_table.select().execute().fetchone()
self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata)
- if isinstance(x['plain_varchar'], unicode):
- assert testing.db.dialect.supports_unicode_binds
- else:
- self.assert_(not isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == rawdata)
-
def test_union(self):
"""ensure compiler processing works for UNIONs"""
- rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
- unicodedata = rawdata.decode('utf-8')
- if testing.db.dialect.supports_unicode_binds:
- rawdata = "something"
- unicode_table.insert().execute(unicode_varchar=unicodedata,
- unicode_text=unicodedata,
- plain_varchar=rawdata)
+ unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
+
+ unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata)
x = union(select([unicode_table.c.unicode_varchar]), select([unicode_table.c.unicode_varchar])).execute().fetchone()
self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
- def test_assertions(self):
- self.assertRaisesMessage(exc.SAWarning, "Unicode type received non-unicode bind param value 'not unicode'",
- unicode_table.insert().execute, unicode_varchar='not unicode'
- )
-
- unicode_engine = engines.utf8_engine(options={'convert_unicode':True,
- 'assert_unicode':True})
- try:
- self.assertRaisesMessage(exc.InvalidRequestError, "Unicode type received non-unicode bind param value 'im not unicode'",
- unicode_engine.execute, unicode_table.insert(), plain_varchar='im not unicode'
- )
-
- @testing.emits_warning('.*non-unicode bind')
- def warns():
- # test that data still goes in if warning is emitted....
- unicode_table.insert().execute(unicode_varchar='not unicode')
- assert (select([unicode_table.c.unicode_varchar]).execute().fetchall() == [('not unicode', )])
- warns()
-
- finally:
- unicode_engine.dispose()
-
- @testing.fails_on('oracle', 'FIXME: unknown')
+ @testing.fails_on('oracle', 'oracle converts empty strings to a blank space')
def test_blank_strings(self):
unicode_table.insert().execute(unicode_varchar=u'')
assert select([unicode_table.c.unicode_varchar]).scalar() == u''
- def test_engine_parameter(self):
- """tests engine-wide unicode conversion"""
+ def test_parameters(self):
+ """test the dialect convert_unicode parameters."""
+
+ unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
+
+ u = Unicode(assert_unicode=True)
+ uni = u.dialect_impl(testing.db.dialect).bind_processor(testing.db.dialect)
+ # Py3K
+ #self.assertRaises(exc.InvalidRequestError, uni, b'x')
+ # Py2K
+ self.assertRaises(exc.InvalidRequestError, uni, 'x')
+ # end Py2K
+
+ u = Unicode()
+ uni = u.dialect_impl(testing.db.dialect).bind_processor(testing.db.dialect)
+ # Py3K
+ #self.assertRaises(exc.SAWarning, uni, b'x')
+ # Py2K
+ self.assertRaises(exc.SAWarning, uni, 'x')
+ # end Py2K
+
+ unicode_engine = engines.utf8_engine(options={'convert_unicode':True,'assert_unicode':True})
+ unicode_engine.dialect.supports_unicode_binds = False
- prev_unicode = testing.db.engine.dialect.convert_unicode
- prev_assert = testing.db.engine.dialect.assert_unicode
- try:
- testing.db.engine.dialect.convert_unicode = True
- testing.db.engine.dialect.assert_unicode = False
- rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
- unicodedata = rawdata.decode('utf-8')
- if testing.db.dialect.supports_unicode_binds:
- rawdata = "something"
- unicode_table.insert().execute(unicode_varchar=unicodedata,
- unicode_text=unicodedata,
- plain_varchar=rawdata)
- x = unicode_table.select().execute().fetchone()
- print 0, repr(unicodedata)
- print 1, repr(x['unicode_varchar'])
- print 2, repr(x['unicode_text'])
- print 3, repr(x['plain_varchar'])
- self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
- self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata)
- if not testing.db.dialect.supports_unicode_binds:
- self.assert_(isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == unicodedata)
- finally:
- testing.db.engine.dialect.convert_unicode = prev_unicode
- testing.db.engine.dialect.convert_unicode = prev_assert
-
- @testing.crashes('oracle', 'FIXME: unknown, verify not fails_on')
- @testing.fails_on('firebird', 'Data type unknown')
- def test_length_function(self):
- """checks the database correctly understands the length of a unicode string"""
- teststr = u'aaa\x1234'
- self.assert_(testing.db.func.length(teststr).scalar() == len(teststr))
+ s = String()
+ uni = s.dialect_impl(unicode_engine.dialect).bind_processor(unicode_engine.dialect)
+ # Py3K
+ #self.assertRaises(exc.InvalidRequestError, uni, b'x')
+ #assert isinstance(uni(unicodedata), bytes)
+ # Py2K
+ self.assertRaises(exc.InvalidRequestError, uni, 'x')
+ assert isinstance(uni(unicodedata), str)
+ # end Py2K
+
+ assert uni(unicodedata) == unicodedata.encode('utf-8')
class BinaryTest(TestBase, AssertsExecutionResults):
__excluded_on__ = (
return value
binary_table = Table('binary_table', MetaData(testing.db),
- Column('primary_id', Integer, Sequence('binary_id_seq', optional=True), primary_key=True),
- Column('data', Binary),
- Column('data_slice', Binary(100)),
- Column('misc', String(30)),
- # construct PickleType with non-native pickle module, since cPickle uses relative module
- # loading and confuses this test's parent package 'sql' with the 'sqlalchemy.sql' package relative
- # to the 'types' module
- Column('pickled', PickleType),
- Column('mypickle', MyPickleType)
+ Column('primary_id', Integer, Sequence('binary_id_seq', optional=True), primary_key=True),
+ Column('data', Binary),
+ Column('data_slice', Binary(100)),
+ Column('misc', String(30)),
+ # construct PickleType with non-native pickle module, since cPickle uses relative module
+ # loading and confuses this test's parent package 'sql' with the 'sqlalchemy.sql' package relative
+ # to the 'types' module
+ Column('pickled', PickleType),
+ Column('mypickle', MyPickleType)
)
binary_table.create()
binary_table.drop()
@testing.fails_on('mssql', 'MSSQl BINARY type right pads the fixed length with \x00')
- def testbinary(self):
+ def test_round_trip(self):
testobj1 = pickleable.Foo('im foo 1')
testobj2 = pickleable.Foo('im foo 2')
testobj3 = pickleable.Foo('im foo 3')
stream1 =self.load_stream('binary_data_one.dat')
stream2 =self.load_stream('binary_data_two.dat')
- binary_table.insert().execute(primary_id=1, misc='binary_data_one.dat', data=stream1, data_slice=stream1[0:100], pickled=testobj1, mypickle=testobj3)
- binary_table.insert().execute(primary_id=2, misc='binary_data_two.dat', data=stream2, data_slice=stream2[0:99], pickled=testobj2)
- binary_table.insert().execute(primary_id=3, misc='binary_data_two.dat', data=None, data_slice=stream2[0:99], pickled=None)
+ binary_table.insert().execute(
+ primary_id=1,
+ misc='binary_data_one.dat',
+ data=stream1,
+ data_slice=stream1[0:100],
+ pickled=testobj1,
+ mypickle=testobj3)
+ binary_table.insert().execute(
+ primary_id=2,
+ misc='binary_data_two.dat',
+ data=stream2,
+ data_slice=stream2[0:99],
+ pickled=testobj2)
+ binary_table.insert().execute(
+ primary_id=3,
+ misc='binary_data_two.dat',
+ data=None,
+ data_slice=stream2[0:99],
+ pickled=None)
for stmt in (
binary_table.select(order_by=binary_table.c.primary_id),
text("select * from binary_table order by binary_table.primary_id", typemap={'pickled':PickleType, 'mypickle':MyPickleType}, bind=testing.db)
):
l = stmt.execute().fetchall()
- print type(stream1), type(l[0]['data']), type(l[0]['data_slice'])
- print len(stream1), len(l[0]['data']), len(l[0]['data_slice'])
self.assertEquals(list(stream1), list(l[0]['data']))
self.assertEquals(list(stream1[0:100]), list(l[0]['data_slice']))
self.assertEquals(list(stream2), list(l[1]['data']))
self.assertEquals(testobj3.moredata, l[0]['mypickle'].moredata)
self.assertEquals(l[0]['mypickle'].stuff, 'this is the right stuff')
- def load_stream(self, name, len=12579):
+ def load_stream(self, name):
f = os.path.join(os.path.dirname(testenv.__file__), name)
- # put a number less than the typical MySQL default BLOB size
- return file(f).read(len)
+ return open(f, mode='rb').read()
class ExpressionTest(TestBase, AssertsExecutionResults):
def setUpAll(self):
assert(res2==[(2, False)])
class PickleTest(TestBase):
- def test_noeq_deprecation(self):
- p1 = PickleType()
-
- self.assertRaises(DeprecationWarning,
- p1.compare_values, pickleable.BarWithoutCompare(1, 2), pickleable.BarWithoutCompare(1, 2)
- )
-
- self.assertRaises(DeprecationWarning,
- p1.compare_values, pickleable.OldSchoolWithoutCompare(1, 2), pickleable.OldSchoolWithoutCompare(1, 2)
- )
-
- @testing.uses_deprecated()
- def go():
- # test actual dumps comparison
- assert p1.compare_values(pickleable.BarWithoutCompare(1, 2), pickleable.BarWithoutCompare(1, 2))
- assert p1.compare_values(pickleable.OldSchoolWithoutCompare(1, 2), pickleable.OldSchoolWithoutCompare(1, 2))
- go()
-
- assert p1.compare_values({1:2, 3:4}, {3:4, 1:2})
-
- p2 = PickleType(mutable=False)
- assert not p2.compare_values(pickleable.BarWithoutCompare(1, 2), pickleable.BarWithoutCompare(1, 2))
- assert not p2.compare_values(pickleable.OldSchoolWithoutCompare(1, 2), pickleable.OldSchoolWithoutCompare(1, 2))
-
def test_eq_comparison(self):
p1 = PickleType()