__visit_name__ = 'string'
- def __init__(self, length=None, convert_unicode=False, assert_unicode=None):
+ def __init__(self, length=None, convert_unicode=False, assert_unicode=None, unicode_error=None):
"""
Create a string-holding type.
rationale here is that isinstance() calls are enormously
expensive at the level of column-fetching. To
force the check to occur regardless, set
- convert_unicode='force'.
+ convert_unicode='force'. This will incur significant
+ performance overhead when fetching unicode result columns.
Similarly, if the dialect is known to accept bind parameters
as unicode objects, no translation from unicode to bytestring
If true, will raise an :exc:`sqlalchemy.exc.InvalidRequestError`.
+ :param unicode_error: Optional, a method to use to handle Unicode
+ conversion errors. Behaves like the 'errors' keyword argument to
+ the standard library's string.decode() functions. This flag
+ requires that `convert_unicode` is set to `"force"` - otherwise,
+ SQLAlchemy is not guaranteed to handle the task of unicode
+ conversion. Note that this flag adds significant performance
+ overhead to row-fetching operations for backends that already
+ return unicode objects natively (which most DBAPIs do). This
+ flag should only be used as an absolute last resort for reading
+ strings from a column with varied or corrupted encodings,
+ which only applies to databases that accept invalid encodings
+ in the first place (i.e. MySQL. *not* PG, Sqlite, etc.)
+
"""
+ if unicode_error is not None and convert_unicode != 'force':
+ raise exc.ArgumentError("convert_unicode must be 'force' "
+ "when unicode_error is set.")
self.length = length
self.convert_unicode = convert_unicode
self.assert_unicode = assert_unicode
+ self.unicode_error = unicode_error
+
def adapt(self, impltype):
return impltype(
assert_unicode = dialect.assert_unicode
else:
assert_unicode = self.assert_unicode
-
+
if dialect.supports_unicode_binds and assert_unicode:
def process(value):
if value is None or isinstance(value, unicode):
encoder = codecs.getencoder(dialect.encoding)
def process(value):
if isinstance(value, unicode):
- return encoder(value)[0]
+ return encoder(value, self.unicode_error)[0]
elif assert_unicode and value is not None:
if assert_unicode == 'warn':
util.warn("Unicode type received non-unicode bind "
self.convert_unicode == 'force')
if needs_convert:
- # note we *assume* that we do not have a unicode object
- # here, instead of an expensive isinstance() check.
- return processors.to_unicode_processor_factory(dialect.encoding)
+ to_unicode = processors.to_unicode_processor_factory(
+ dialect.encoding, self.unicode_error)
+
+ if dialect.returns_unicode_strings:
+ # we wouldn't be here unless convert_unicode='force'
+ # was specified. since we will be getting back unicode
+ # in most cases, we check for it (decode will fail).
+ def process(value):
+ if isinstance(value, unicode):
+ return value
+ else:
+ return to_unicode(value)
+ return process
+ else:
+ # here, we assume that the object is not unicode,
+ # avoiding expensive isinstance() check.
+ return to_unicode
else:
return None
if exp in compiled:
break
else:
- assert False, "%r matches none of %r for dialect %s" % (compiled, expected, dialect.name)
+ assert False, "%r matches none of %r for dialect %s" % \
+ (compiled, expected, dialect.name)
class TypeAffinityTest(TestBase):
def test_type_affinity(self):
class MyDecoratedType(types.TypeDecorator):
impl = String
def bind_processor(self, dialect):
- impl_processor = super(MyDecoratedType, self).bind_processor(dialect) or (lambda value:value)
+ impl_processor = super(MyDecoratedType, self).bind_processor(dialect)\
+ or (lambda value:value)
def process(value):
return "BIND_IN"+ impl_processor(value)
return process
def result_processor(self, dialect, coltype):
- impl_processor = super(MyDecoratedType, self).result_processor(dialect, coltype) or (lambda value:value)
+ impl_processor = super(MyDecoratedType, self).result_processor(dialect, coltype)\
+ or (lambda value:value)
def process(value):
return impl_processor(value) + "BIND_OUT"
return process
impl = Unicode
def bind_processor(self, dialect):
- impl_processor = super(MyUnicodeType, self).bind_processor(dialect) or (lambda value:value)
+ impl_processor = super(MyUnicodeType, self).bind_processor(dialect)\
+ or (lambda value:value)
def process(value):
return "BIND_IN"+ impl_processor(value)
return process
def result_processor(self, dialect, coltype):
- impl_processor = super(MyUnicodeType, self).result_processor(dialect, coltype) or (lambda value:value)
+ impl_processor = super(MyUnicodeType, self).result_processor(dialect, coltype)\
+ or (lambda value:value)
def process(value):
return impl_processor(value) + "BIND_OUT"
return process
# cx_oracle was producing different behavior for cursor.executemany()
# vs. cursor.execute()
- unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
+ unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand "\
+ u"une drôle de petit voix m’a réveillé. "\
+ u"Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
unicode_table.insert().execute(
dict(unicode_varchar=unicodedata,unicode_text=unicodedata),
def test_union(self):
"""ensure compiler processing works for UNIONs"""
- unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
+ unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand "\
+ u"une drôle de petit voix m’a réveillé. "\
+ u"Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
unicode_table.insert().execute(unicode_varchar=unicodedata,unicode_text=unicodedata)
- x = union(select([unicode_table.c.unicode_varchar]), select([unicode_table.c.unicode_varchar])).execute().first()
- self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
+ x = union(
+ select([unicode_table.c.unicode_varchar]),
+ select([unicode_table.c.unicode_varchar])
+ ).execute().first()
+
+ assert isinstance(x['unicode_varchar'], unicode)
+ eq_(x['unicode_varchar'], unicodedata)
@testing.fails_on('oracle', 'oracle converts empty strings to a blank space')
def test_blank_strings(self):
def test_parameters(self):
"""test the dialect convert_unicode parameters."""
- unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand une drôle de petit voix m’a réveillé. Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
+ unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand "\
+ u"une drôle de petit voix m’a réveillé. "\
+ u"Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
u = Unicode(assert_unicode=True)
uni = u.dialect_impl(testing.db.dialect).bind_processor(testing.db.dialect)
assert isinstance(uni(unicodedata), str)
# end Py2K
- assert uni(unicodedata) == unicodedata.encode('utf-8')
+ eq_(uni(unicodedata), unicodedata.encode('utf-8'))
+
+ def test_ignoring_unicode_error(self):
+ """checks String(unicode_error='ignore') is passed to underlying codec."""
+
+ unicodedata = u"Alors vous imaginez ma surprise, au lever du jour, quand "\
+ u"une drôle de petit voix m’a réveillé. "\
+ u"Elle disait: « S’il vous plaît… dessine-moi un mouton! »"
+
+ asciidata = unicodedata.encode('ascii', 'ignore')
+
+ m = MetaData()
+ table = Table('unicode_err_table', m,
+ Column('sort', Integer),
+ Column('plain_varchar_no_coding_error', \
+ String(248, convert_unicode='force', unicode_error='ignore'))
+ )
+
+ m2 = MetaData()
+ utf8_table = Table('unicode_err_table', m2,
+ Column('sort', Integer),
+ Column('plain_varchar_no_coding_error', \
+ String(248, convert_unicode=True))
+ )
+
+ engine = engines.testing_engine(options={'encoding':'ascii'})
+ m.create_all(engine)
+ try:
+ # insert a row that should be ascii and
+ # coerce from unicode with ignore on the bind side
+ engine.execute(
+ table.insert(),
+ sort=1,
+ plain_varchar_no_coding_error=unicodedata
+ )
+
+ # switch to utf-8
+ engine.dialect.encoding = 'utf-8'
+ from binascii import hexlify
+
+ # the row that we put in was stored as hexlified ascii
+ row = engine.execute(utf8_table.select()).first()
+ x = row['plain_varchar_no_coding_error']
+ a = hexlify(x)
+ b = hexlify(asciidata)
+ eq_(a, b)
+
+ # insert another row which will be stored with
+ # utf-8 only chars
+ engine.execute(
+ utf8_table.insert(),
+ sort=2,
+ plain_varchar_no_coding_error=unicodedata
+ )
+
+ # switch back to ascii
+ engine.dialect.encoding = 'ascii'
+
+ # one row will be ascii with ignores,
+ # the other will be either ascii with the ignores
+ # or just the straight unicode+ utf8 value if the
+ # dialect just returns unicode
+ result = engine.execute(table.select().order_by(table.c.sort))
+ ascii_row = result.fetchone()
+ utf8_row = result.fetchone()
+ result.close()
+
+ x = ascii_row['plain_varchar_no_coding_error']
+ a = hexlify(x)
+ b = hexlify(asciidata)
+ eq_(a, b)
+
+ x = utf8_row['plain_varchar_no_coding_error']
+ if engine.dialect.returns_unicode_strings:
+ eq_(x, unicodedata)
+ else:
+ a = hexlify(x)
+ eq_(a, b)
+
+ finally:
+ m.drop_all(engine)
+
class EnumTest(TestBase):
@classmethod
{'id':4, 'someenum':'four'}
)
- @testing.fails_on('mysql', "the CHECK constraint doesn't raise an exception for unknown reason")
+ @testing.fails_on('mysql',
+ "the CHECK constraint doesn't raise an exception for unknown reason")
def test_non_native_constraint(self):
assert_raises(exc.DBAPIError,
non_native_enum_table.insert().execute,
return value
binary_table = Table('binary_table', MetaData(testing.db),
- Column('primary_id', Integer, Sequence('binary_id_seq', optional=True), primary_key=True),
+ Column('primary_id', Integer, primary_key=True, test_needs_autoincrement=True),
Column('data', LargeBinary),
Column('data_slice', LargeBinary(100)),
Column('misc', String(30)),
- # construct PickleType with non-native pickle module, since cPickle uses relative module
- # loading and confuses this test's parent package 'sql' with the 'sqlalchemy.sql' package relative
- # to the 'types' module
Column('pickled', PickleType),
Column('mypickle', MyPickleType)
)
binary_table.select(order_by=binary_table.c.primary_id),
text(
"select * from binary_table order by binary_table.primary_id",
- typemap={'pickled':PickleType, 'mypickle':MyPickleType, 'data':LargeBinary, 'data_slice':LargeBinary},
+ typemap={'pickled':PickleType,
+ 'mypickle':MyPickleType,
+ 'data':LargeBinary, 'data_slice':LargeBinary},
bind=testing.db)
):
l = stmt.execute().fetchall()
meta.create_all()
- test_table.insert().execute({'id':1, 'data':'somedata', 'atimestamp':datetime.date(2007, 10, 15), 'avalue':25})
+ test_table.insert().execute({
+ 'id':1,
+ 'data':'somedata',
+ 'atimestamp':datetime.date(2007, 10, 15),
+ 'avalue':25})
@classmethod
def teardown_class(cls):
expr = test_table.c.atimestamp == bindparam("thedate")
assert expr.right.type.__class__ == test_table.c.atimestamp.type.__class__
- assert testing.db.execute(test_table.select().where(expr), {"thedate":datetime.date(2007, 10, 15)}).fetchall() == [(1, 'somedata', datetime.date(2007, 10, 15), 25)]
+ eq_(
+ testing.db.execute(
+ test_table.select().where(expr),
+ {"thedate":datetime.date(2007, 10, 15)}).fetchall(),
+ [(1, 'somedata', datetime.date(2007, 10, 15), 25)]
+ )
expr = test_table.c.avalue == bindparam("somevalue")
- assert expr.right.type.__class__ == test_table.c.avalue.type.__class__
- assert testing.db.execute(test_table.select().where(expr), {"somevalue":25}).fetchall() == [(1, 'somedata', datetime.date(2007, 10, 15), 25)]
+ eq_(expr.right.type.__class__, test_table.c.avalue.type.__class__)
+
+ eq_(
+ testing.db.execute(test_table.select().where(expr), {"somevalue":25}).fetchall(),
+ [(1, 'somedata', datetime.date(2007, 10, 15), 25)]
+ )
@testing.fails_on('firebird', 'Data type unknown on the parameter')
def test_operator_adapt(self):
testing.db.execute(
"insert into booltest (id, unconstrained_value) values (1, 5)")
-
class PickleTest(TestBase):
def test_eq_comparison(self):
p1 = PickleType()
):
assert p1.compare_values(p1.copy_value(obj), obj)
- assert_raises(NotImplementedError, p1.compare_values, pickleable.BrokenComparable('foo'),pickleable.BrokenComparable('foo'))
+ assert_raises(NotImplementedError,
+ p1.compare_values,
+ pickleable.BrokenComparable('foo'),
+ pickleable.BrokenComparable('foo'))
def test_nonmutable_comparison(self):
p1 = PickleType()