From: Mike Bayer Date: Sun, 28 Apr 2013 20:27:23 +0000 (-0400) Subject: postgresql dialect tests X-Git-Tag: rel_0_9_0b1~304^2~13^2~58 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=734376130f01d17acc0d6c2d34c1e0f94a1cd1da;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git postgresql dialect tests --- diff --git a/lib/sqlalchemy/dialects/postgresql/base.py b/lib/sqlalchemy/dialects/postgresql/base.py index 992995434b..cd5d9772d7 100644 --- a/lib/sqlalchemy/dialects/postgresql/base.py +++ b/lib/sqlalchemy/dialects/postgresql/base.py @@ -188,7 +188,6 @@ underlying CREATE INDEX command, so it *must* be a valid index type for your version of PostgreSQL. """ - import re from ... import sql, schema, exc, util @@ -333,7 +332,7 @@ class UUID(sqltypes.TypeEngine): if self.as_uuid: def process(value): if value is not None: - value = str(value) + value = util.text_type(value) return value return process else: @@ -1419,7 +1418,7 @@ class PGDialect(default.DefaultDialect): query, bindparams=[ sql.bindparam( - 'schema', str(schema.lower()), + 'schema', util.text_type(schema.lower()), type_=sqltypes.Unicode)] ) ) @@ -1435,7 +1434,7 @@ class PGDialect(default.DefaultDialect): "n.oid=c.relnamespace where n.nspname=current_schema() and " "relname=:name", bindparams=[ - sql.bindparam('name', str(table_name), + sql.bindparam('name', util.text_type(table_name), type_=sqltypes.Unicode)] ) ) @@ -1447,9 +1446,9 @@ class PGDialect(default.DefaultDialect): "relname=:name", bindparams=[ sql.bindparam('name', - str(table_name), type_=sqltypes.Unicode), + util.text_type(table_name), type_=sqltypes.Unicode), sql.bindparam('schema', - str(schema), type_=sqltypes.Unicode)] + util.text_type(schema), type_=sqltypes.Unicode)] ) ) return bool(cursor.first()) @@ -1463,7 +1462,7 @@ class PGDialect(default.DefaultDialect): "n.nspname=current_schema() " "and relname=:name", bindparams=[ - sql.bindparam('name', str(sequence_name), + sql.bindparam('name', util.text_type(sequence_name), type_=sqltypes.Unicode) ] ) @@ -1475,10 +1474,10 @@ class PGDialect(default.DefaultDialect): "n.oid=c.relnamespace where relkind='S' and " "n.nspname=:schema and relname=:name", bindparams=[ - sql.bindparam('name', str(sequence_name), + sql.bindparam('name', util.text_type(sequence_name), type_=sqltypes.Unicode), sql.bindparam('schema', - str(schema), type_=sqltypes.Unicode) + util.text_type(schema), type_=sqltypes.Unicode) ] ) ) @@ -1488,9 +1487,9 @@ class PGDialect(default.DefaultDialect): def has_type(self, connection, type_name, schema=None): bindparams = [ sql.bindparam('typname', - str(type_name), type_=sqltypes.Unicode), + util.text_type(type_name), type_=sqltypes.Unicode), sql.bindparam('nspname', - str(schema), type_=sqltypes.Unicode), + util.text_type(schema), type_=sqltypes.Unicode), ] if schema is not None: query = """ @@ -1546,9 +1545,9 @@ class PGDialect(default.DefaultDialect): """ % schema_where_clause # Since we're binding to unicode, table_name and schema_name must be # unicode. - table_name = str(table_name) + table_name = util.text_type(table_name) if schema is not None: - schema = str(schema) + schema = util.text_type(schema) s = sql.text(query, bindparams=[ sql.bindparam('table_name', type_=sqltypes.Unicode), sql.bindparam('schema', type_=sqltypes.Unicode) diff --git a/lib/sqlalchemy/dialects/postgresql/psycopg2.py b/lib/sqlalchemy/dialects/postgresql/psycopg2.py index 2893a38721..ce06aab054 100644 --- a/lib/sqlalchemy/dialects/postgresql/psycopg2.py +++ b/lib/sqlalchemy/dialects/postgresql/psycopg2.py @@ -142,6 +142,7 @@ type. This replaces SQLAlchemy's pure-Python HSTORE coercion which takes effect for other DBAPIs. """ +from __future__ import absolute_import import re import logging @@ -190,22 +191,20 @@ class _PGNumeric(sqltypes.Numeric): class _PGEnum(ENUM): def __init__(self, *arg, **kw): super(_PGEnum, self).__init__(*arg, **kw) -# start Py2K -# if self.convert_unicode: -# self.convert_unicode = "force" -# end Py2K + if util.py2k: + if self.convert_unicode: + self.convert_unicode = "force" class _PGArray(ARRAY): def __init__(self, *arg, **kw): super(_PGArray, self).__init__(*arg, **kw) -# start Py2K -# # FIXME: this check won't work for setups that -# # have convert_unicode only on their create_engine(). -# if isinstance(self.item_type, sqltypes.String) and \ -# self.item_type.convert_unicode: -# self.item_type.convert_unicode = "force" -# end Py2K + if util.py2k: + # FIXME: this check won't work for setups that + # have convert_unicode only on their create_engine(). + if isinstance(self.item_type, sqltypes.String) and \ + self.item_type.convert_unicode: + self.item_type.convert_unicode = "force" class _PGHStore(HSTORE): @@ -294,9 +293,9 @@ class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer): class PGDialect_psycopg2(PGDialect): driver = 'psycopg2' -# start Py2K -# supports_unicode_statements = False -# end Py2K + if util.py2k: + supports_unicode_statements = False + default_paramstyle = 'pyformat' supports_sane_multi_rowcount = False execution_ctx_cls = PGExecutionContext_psycopg2 diff --git a/lib/sqlalchemy/types.py b/lib/sqlalchemy/types.py index ffcd793c6a..c601dc3069 100644 --- a/lib/sqlalchemy/types.py +++ b/lib/sqlalchemy/types.py @@ -1153,7 +1153,7 @@ class String(Concatenable, TypeEngine): # habits. since we will be getting back unicode # in most cases, we check for it (decode will fail). def process(value): - if isinstance(value, str): + if isinstance(value, util.text_type): return value else: return to_unicode(value) diff --git a/lib/sqlalchemy/util/__init__.py b/lib/sqlalchemy/util/__init__.py index 25dcce335f..72baa9f368 100644 --- a/lib/sqlalchemy/util/__init__.py +++ b/lib/sqlalchemy/util/__init__.py @@ -8,7 +8,8 @@ from .compat import callable, cmp, reduce, \ threading, py3k, py2k, jython, pypy, cpython, win32, \ pickle, dottedgetter, parse_qsl, namedtuple, next, WeakSet, reraise, \ raise_from_cause, text_type, string_types, int_types, binary_type, \ - quote_plus, with_metaclass, print_, itertools_filterfalse, u, b + quote_plus, with_metaclass, print_, itertools_filterfalse, u, b,\ + unquote_plus from ._collections import KeyedTuple, ImmutableContainer, immutabledict, \ Properties, OrderedProperties, ImmutableProperties, OrderedDict, \ diff --git a/test/dialect/test_postgresql.py b/test/dialect/test_postgresql.py index 43ac7d7ae5..90a09d3625 100644 --- a/test/dialect/test_postgresql.py +++ b/test/dialect/test_postgresql.py @@ -554,28 +554,29 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL): @testing.fails_on('postgresql+pg8000', 'zxjdbc fails on ENUM: column "XXX" is of type ' 'XXX but expression is of type text') + @testing.provide_metadata def test_unicode_labels(self): - metadata = MetaData(testing.db) + metadata = self.metadata t1 = Table('table', metadata, Column('id', Integer, primary_key=True), Column('value', - Enum('réveillé', 'drôle', 'S’il', + Enum(util.u('réveillé'), util.u('drôle'), util.u('S’il'), name='onetwothreetype')) ) - metadata.create_all() - try: - t1.insert().execute(value='drôle') - t1.insert().execute(value='réveillé') - t1.insert().execute(value='S’il') - eq_(t1.select().order_by(t1.c.id).execute().fetchall(), - [(1, 'drôle'), (2, 'réveillé'), (3, 'S’il')] - ) - m2 = MetaData(testing.db) - t2 = Table('table', m2, autoload=True) - assert t2.c.value.type.enums == ('réveillé', 'drôle', 'S’il') - finally: - metadata.drop_all() + t1.insert().execute(value=util.u('drôle')) + t1.insert().execute(value=util.u('réveillé')) + t1.insert().execute(value=util.u('S’il')) + eq_(t1.select().order_by(t1.c.id).execute().fetchall(), + [(1, util.u('drôle')), (2, util.u('réveillé')), + (3, util.u('S’il'))] + ) + m2 = MetaData(testing.db) + t2 = Table('table', m2, autoload=True) + eq_( + t2.c.value.type.enums, + (util.u('réveillé'), util.u('drôle'), util.u('S’il')) + ) def test_non_native_type(self): metadata = MetaData() @@ -2174,18 +2175,18 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): def test_insert_array(self): arrtable = self.tables.arrtable - arrtable.insert().execute(intarr=[1, 2, 3], strarr=['abc', - 'def']) + arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'), + util.u('def')]) results = arrtable.select().execute().fetchall() eq_(len(results), 1) eq_(results[0]['intarr'], [1, 2, 3]) - eq_(results[0]['strarr'], ['abc', 'def']) + eq_(results[0]['strarr'], [util.u('abc'), util.u('def')]) def test_array_where(self): arrtable = self.tables.arrtable - arrtable.insert().execute(intarr=[1, 2, 3], strarr=['abc', - 'def']) - arrtable.insert().execute(intarr=[4, 5, 6], strarr='ABC') + arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'), + util.u('def')]) + arrtable.insert().execute(intarr=[4, 5, 6], strarr=util.u('ABC')) results = arrtable.select().where(arrtable.c.intarr == [1, 2, 3]).execute().fetchall() eq_(len(results), 1) @@ -2194,7 +2195,7 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): def test_array_concat(self): arrtable = self.tables.arrtable arrtable.insert().execute(intarr=[1, 2, 3], - strarr=['abc', 'def']) + strarr=[util.u('abc'), util.u('def')]) results = select([arrtable.c.intarr + [4, 5, 6]]).execute().fetchall() eq_(len(results), 1) @@ -2203,15 +2204,15 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): def test_array_subtype_resultprocessor(self): arrtable = self.tables.arrtable arrtable.insert().execute(intarr=[4, 5, 6], - strarr=[['m\xe4\xe4'], ['m\xf6\xf6' - ]]) - arrtable.insert().execute(intarr=[1, 2, 3], strarr=['m\xe4\xe4' - , 'm\xf6\xf6']) + strarr=[[util.u('m\xe4\xe4')], [ + util.u('m\xf6\xf6')]]) + arrtable.insert().execute(intarr=[1, 2, 3], strarr=[ + util.u('m\xe4\xe4'), util.u('m\xf6\xf6')]) results = \ arrtable.select(order_by=[arrtable.c.intarr]).execute().fetchall() eq_(len(results), 2) - eq_(results[0]['strarr'], ['m\xe4\xe4', 'm\xf6\xf6']) - eq_(results[1]['strarr'], [['m\xe4\xe4'], ['m\xf6\xf6']]) + eq_(results[0]['strarr'], [util.u('m\xe4\xe4'), util.u('m\xf6\xf6')]) + eq_(results[1]['strarr'], [[util.u('m\xe4\xe4')], [util.u('m\xf6\xf6')]]) def test_array_literal(self): eq_( @@ -2263,7 +2264,7 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults): testing.db.execute( arrtable.insert(), intarr=[4, 5, 6], - strarr=['abc', 'def'] + strarr=[util.u('abc'), util.u('def')] ) eq_( testing.db.scalar(select([arrtable.c.intarr[2:3]])),