version of PostgreSQL.
"""
-
import re
from ... import sql, schema, exc, util
if self.as_uuid:
def process(value):
if value is not None:
- value = str(value)
+ value = util.text_type(value)
return value
return process
else:
query,
bindparams=[
sql.bindparam(
- 'schema', str(schema.lower()),
+ 'schema', util.text_type(schema.lower()),
type_=sqltypes.Unicode)]
)
)
"n.oid=c.relnamespace where n.nspname=current_schema() and "
"relname=:name",
bindparams=[
- sql.bindparam('name', str(table_name),
+ sql.bindparam('name', util.text_type(table_name),
type_=sqltypes.Unicode)]
)
)
"relname=:name",
bindparams=[
sql.bindparam('name',
- str(table_name), type_=sqltypes.Unicode),
+ util.text_type(table_name), type_=sqltypes.Unicode),
sql.bindparam('schema',
- str(schema), type_=sqltypes.Unicode)]
+ util.text_type(schema), type_=sqltypes.Unicode)]
)
)
return bool(cursor.first())
"n.nspname=current_schema() "
"and relname=:name",
bindparams=[
- sql.bindparam('name', str(sequence_name),
+ sql.bindparam('name', util.text_type(sequence_name),
type_=sqltypes.Unicode)
]
)
"n.oid=c.relnamespace where relkind='S' and "
"n.nspname=:schema and relname=:name",
bindparams=[
- sql.bindparam('name', str(sequence_name),
+ sql.bindparam('name', util.text_type(sequence_name),
type_=sqltypes.Unicode),
sql.bindparam('schema',
- str(schema), type_=sqltypes.Unicode)
+ util.text_type(schema), type_=sqltypes.Unicode)
]
)
)
def has_type(self, connection, type_name, schema=None):
bindparams = [
sql.bindparam('typname',
- str(type_name), type_=sqltypes.Unicode),
+ util.text_type(type_name), type_=sqltypes.Unicode),
sql.bindparam('nspname',
- str(schema), type_=sqltypes.Unicode),
+ util.text_type(schema), type_=sqltypes.Unicode),
]
if schema is not None:
query = """
""" % schema_where_clause
# Since we're binding to unicode, table_name and schema_name must be
# unicode.
- table_name = str(table_name)
+ table_name = util.text_type(table_name)
if schema is not None:
- schema = str(schema)
+ schema = util.text_type(schema)
s = sql.text(query, bindparams=[
sql.bindparam('table_name', type_=sqltypes.Unicode),
sql.bindparam('schema', type_=sqltypes.Unicode)
effect for other DBAPIs.
"""
+from __future__ import absolute_import
import re
import logging
class _PGEnum(ENUM):
def __init__(self, *arg, **kw):
super(_PGEnum, self).__init__(*arg, **kw)
-# start Py2K
-# if self.convert_unicode:
-# self.convert_unicode = "force"
-# end Py2K
+ if util.py2k:
+ if self.convert_unicode:
+ self.convert_unicode = "force"
class _PGArray(ARRAY):
def __init__(self, *arg, **kw):
super(_PGArray, self).__init__(*arg, **kw)
-# start Py2K
-# # FIXME: this check won't work for setups that
-# # have convert_unicode only on their create_engine().
-# if isinstance(self.item_type, sqltypes.String) and \
-# self.item_type.convert_unicode:
-# self.item_type.convert_unicode = "force"
-# end Py2K
+ if util.py2k:
+ # FIXME: this check won't work for setups that
+ # have convert_unicode only on their create_engine().
+ if isinstance(self.item_type, sqltypes.String) and \
+ self.item_type.convert_unicode:
+ self.item_type.convert_unicode = "force"
class _PGHStore(HSTORE):
class PGDialect_psycopg2(PGDialect):
driver = 'psycopg2'
-# start Py2K
-# supports_unicode_statements = False
-# end Py2K
+ if util.py2k:
+ supports_unicode_statements = False
+
default_paramstyle = 'pyformat'
supports_sane_multi_rowcount = False
execution_ctx_cls = PGExecutionContext_psycopg2
# habits. since we will be getting back unicode
# in most cases, we check for it (decode will fail).
def process(value):
- if isinstance(value, str):
+ if isinstance(value, util.text_type):
return value
else:
return to_unicode(value)
threading, py3k, py2k, jython, pypy, cpython, win32, \
pickle, dottedgetter, parse_qsl, namedtuple, next, WeakSet, reraise, \
raise_from_cause, text_type, string_types, int_types, binary_type, \
- quote_plus, with_metaclass, print_, itertools_filterfalse, u, b
+ quote_plus, with_metaclass, print_, itertools_filterfalse, u, b,\
+ unquote_plus
from ._collections import KeyedTuple, ImmutableContainer, immutabledict, \
Properties, OrderedProperties, ImmutableProperties, OrderedDict, \
@testing.fails_on('postgresql+pg8000',
'zxjdbc fails on ENUM: column "XXX" is of type '
'XXX but expression is of type text')
+ @testing.provide_metadata
def test_unicode_labels(self):
- metadata = MetaData(testing.db)
+ metadata = self.metadata
t1 = Table('table', metadata,
Column('id', Integer, primary_key=True),
Column('value',
- Enum('réveillé', 'drôle', 'S’il',
+ Enum(util.u('réveillé'), util.u('drôle'), util.u('S’il'),
name='onetwothreetype'))
)
-
metadata.create_all()
- try:
- t1.insert().execute(value='drôle')
- t1.insert().execute(value='réveillé')
- t1.insert().execute(value='S’il')
- eq_(t1.select().order_by(t1.c.id).execute().fetchall(),
- [(1, 'drôle'), (2, 'réveillé'), (3, 'S’il')]
- )
- m2 = MetaData(testing.db)
- t2 = Table('table', m2, autoload=True)
- assert t2.c.value.type.enums == ('réveillé', 'drôle', 'S’il')
- finally:
- metadata.drop_all()
+ t1.insert().execute(value=util.u('drôle'))
+ t1.insert().execute(value=util.u('réveillé'))
+ t1.insert().execute(value=util.u('S’il'))
+ eq_(t1.select().order_by(t1.c.id).execute().fetchall(),
+ [(1, util.u('drôle')), (2, util.u('réveillé')),
+ (3, util.u('S’il'))]
+ )
+ m2 = MetaData(testing.db)
+ t2 = Table('table', m2, autoload=True)
+ eq_(
+ t2.c.value.type.enums,
+ (util.u('réveillé'), util.u('drôle'), util.u('S’il'))
+ )
def test_non_native_type(self):
metadata = MetaData()
def test_insert_array(self):
arrtable = self.tables.arrtable
- arrtable.insert().execute(intarr=[1, 2, 3], strarr=['abc',
- 'def'])
+ arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'),
+ util.u('def')])
results = arrtable.select().execute().fetchall()
eq_(len(results), 1)
eq_(results[0]['intarr'], [1, 2, 3])
- eq_(results[0]['strarr'], ['abc', 'def'])
+ eq_(results[0]['strarr'], [util.u('abc'), util.u('def')])
def test_array_where(self):
arrtable = self.tables.arrtable
- arrtable.insert().execute(intarr=[1, 2, 3], strarr=['abc',
- 'def'])
- arrtable.insert().execute(intarr=[4, 5, 6], strarr='ABC')
+ arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'),
+ util.u('def')])
+ arrtable.insert().execute(intarr=[4, 5, 6], strarr=util.u('ABC'))
results = arrtable.select().where(arrtable.c.intarr == [1, 2,
3]).execute().fetchall()
eq_(len(results), 1)
def test_array_concat(self):
arrtable = self.tables.arrtable
arrtable.insert().execute(intarr=[1, 2, 3],
- strarr=['abc', 'def'])
+ strarr=[util.u('abc'), util.u('def')])
results = select([arrtable.c.intarr + [4, 5,
6]]).execute().fetchall()
eq_(len(results), 1)
def test_array_subtype_resultprocessor(self):
arrtable = self.tables.arrtable
arrtable.insert().execute(intarr=[4, 5, 6],
- strarr=[['m\xe4\xe4'], ['m\xf6\xf6'
- ]])
- arrtable.insert().execute(intarr=[1, 2, 3], strarr=['m\xe4\xe4'
- , 'm\xf6\xf6'])
+ strarr=[[util.u('m\xe4\xe4')], [
+ util.u('m\xf6\xf6')]])
+ arrtable.insert().execute(intarr=[1, 2, 3], strarr=[
+ util.u('m\xe4\xe4'), util.u('m\xf6\xf6')])
results = \
arrtable.select(order_by=[arrtable.c.intarr]).execute().fetchall()
eq_(len(results), 2)
- eq_(results[0]['strarr'], ['m\xe4\xe4', 'm\xf6\xf6'])
- eq_(results[1]['strarr'], [['m\xe4\xe4'], ['m\xf6\xf6']])
+ eq_(results[0]['strarr'], [util.u('m\xe4\xe4'), util.u('m\xf6\xf6')])
+ eq_(results[1]['strarr'], [[util.u('m\xe4\xe4')], [util.u('m\xf6\xf6')]])
def test_array_literal(self):
eq_(
testing.db.execute(
arrtable.insert(),
intarr=[4, 5, 6],
- strarr=['abc', 'def']
+ strarr=[util.u('abc'), util.u('def')]
)
eq_(
testing.db.scalar(select([arrtable.c.intarr[2:3]])),