]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
postgresql dialect tests
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 28 Apr 2013 20:27:23 +0000 (16:27 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 28 Apr 2013 20:27:23 +0000 (16:27 -0400)
lib/sqlalchemy/dialects/postgresql/base.py
lib/sqlalchemy/dialects/postgresql/psycopg2.py
lib/sqlalchemy/types.py
lib/sqlalchemy/util/__init__.py
test/dialect/test_postgresql.py

index 992995434b4c65085d08ba50625fc2dd945349cc..cd5d9772d7c388505341402789a5621064a2717d 100644 (file)
@@ -188,7 +188,6 @@ underlying CREATE INDEX command, so it *must* be a valid index type for your
 version of PostgreSQL.
 
 """
-
 import re
 
 from ... import sql, schema, exc, util
@@ -333,7 +332,7 @@ class UUID(sqltypes.TypeEngine):
         if self.as_uuid:
             def process(value):
                 if value is not None:
-                    value = str(value)
+                    value = util.text_type(value)
                 return value
             return process
         else:
@@ -1419,7 +1418,7 @@ class PGDialect(default.DefaultDialect):
                 query,
                 bindparams=[
                     sql.bindparam(
-                        'schema', str(schema.lower()),
+                        'schema', util.text_type(schema.lower()),
                         type_=sqltypes.Unicode)]
             )
         )
@@ -1435,7 +1434,7 @@ class PGDialect(default.DefaultDialect):
                 "n.oid=c.relnamespace where n.nspname=current_schema() and "
                 "relname=:name",
                 bindparams=[
-                        sql.bindparam('name', str(table_name),
+                        sql.bindparam('name', util.text_type(table_name),
                         type_=sqltypes.Unicode)]
                 )
             )
@@ -1447,9 +1446,9 @@ class PGDialect(default.DefaultDialect):
                 "relname=:name",
                     bindparams=[
                         sql.bindparam('name',
-                        str(table_name), type_=sqltypes.Unicode),
+                        util.text_type(table_name), type_=sqltypes.Unicode),
                         sql.bindparam('schema',
-                        str(schema), type_=sqltypes.Unicode)]
+                        util.text_type(schema), type_=sqltypes.Unicode)]
                 )
             )
         return bool(cursor.first())
@@ -1463,7 +1462,7 @@ class PGDialect(default.DefaultDialect):
                     "n.nspname=current_schema() "
                     "and relname=:name",
                     bindparams=[
-                        sql.bindparam('name', str(sequence_name),
+                        sql.bindparam('name', util.text_type(sequence_name),
                         type_=sqltypes.Unicode)
                     ]
                 )
@@ -1475,10 +1474,10 @@ class PGDialect(default.DefaultDialect):
                 "n.oid=c.relnamespace where relkind='S' and "
                 "n.nspname=:schema and relname=:name",
                 bindparams=[
-                    sql.bindparam('name', str(sequence_name),
+                    sql.bindparam('name', util.text_type(sequence_name),
                      type_=sqltypes.Unicode),
                     sql.bindparam('schema',
-                                str(schema), type_=sqltypes.Unicode)
+                                util.text_type(schema), type_=sqltypes.Unicode)
                 ]
             )
             )
@@ -1488,9 +1487,9 @@ class PGDialect(default.DefaultDialect):
     def has_type(self, connection, type_name, schema=None):
         bindparams = [
             sql.bindparam('typname',
-                str(type_name), type_=sqltypes.Unicode),
+                util.text_type(type_name), type_=sqltypes.Unicode),
             sql.bindparam('nspname',
-                str(schema), type_=sqltypes.Unicode),
+                util.text_type(schema), type_=sqltypes.Unicode),
             ]
         if schema is not None:
             query = """
@@ -1546,9 +1545,9 @@ class PGDialect(default.DefaultDialect):
         """ % schema_where_clause
         # Since we're binding to unicode, table_name and schema_name must be
         # unicode.
-        table_name = str(table_name)
+        table_name = util.text_type(table_name)
         if schema is not None:
-            schema = str(schema)
+            schema = util.text_type(schema)
         s = sql.text(query, bindparams=[
             sql.bindparam('table_name', type_=sqltypes.Unicode),
             sql.bindparam('schema', type_=sqltypes.Unicode)
index 2893a387215d77a2bfff0257fb16ad44398b3cc0..ce06aab054a4a58fe750d4e6e852f72b32b31120 100644 (file)
@@ -142,6 +142,7 @@ type.  This replaces SQLAlchemy's pure-Python HSTORE coercion which takes
 effect for other DBAPIs.
 
 """
+from __future__ import absolute_import
 
 import re
 import logging
@@ -190,22 +191,20 @@ class _PGNumeric(sqltypes.Numeric):
 class _PGEnum(ENUM):
     def __init__(self, *arg, **kw):
         super(_PGEnum, self).__init__(*arg, **kw)
-# start Py2K
-#        if self.convert_unicode:
-#            self.convert_unicode = "force"
-# end Py2K
+        if util.py2k:
+            if self.convert_unicode:
+                self.convert_unicode = "force"
 
 
 class _PGArray(ARRAY):
     def __init__(self, *arg, **kw):
         super(_PGArray, self).__init__(*arg, **kw)
-# start Py2K
-#        # FIXME: this check won't work for setups that
-#        # have convert_unicode only on their create_engine().
-#        if isinstance(self.item_type, sqltypes.String) and \
-#                    self.item_type.convert_unicode:
-#            self.item_type.convert_unicode = "force"
-# end Py2K
+        if util.py2k:
+            # FIXME: this check won't work for setups that
+            # have convert_unicode only on their create_engine().
+            if isinstance(self.item_type, sqltypes.String) and \
+                        self.item_type.convert_unicode:
+                self.item_type.convert_unicode = "force"
 
 
 class _PGHStore(HSTORE):
@@ -294,9 +293,9 @@ class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer):
 
 class PGDialect_psycopg2(PGDialect):
     driver = 'psycopg2'
-# start Py2K
-#    supports_unicode_statements = False
-# end Py2K
+    if util.py2k:
+        supports_unicode_statements = False
+
     default_paramstyle = 'pyformat'
     supports_sane_multi_rowcount = False
     execution_ctx_cls = PGExecutionContext_psycopg2
index ffcd793c6aaf4854081a98681665a432c4a3f6e1..c601dc3069d3a1351e875c17cd8cc8024edf1b75 100644 (file)
@@ -1153,7 +1153,7 @@ class String(Concatenable, TypeEngine):
                 # habits.  since we will be getting back unicode
                 # in most cases, we check for it (decode will fail).
                 def process(value):
-                    if isinstance(value, str):
+                    if isinstance(value, util.text_type):
                         return value
                     else:
                         return to_unicode(value)
index 25dcce335f1d57b58c9d9d9f6571e2e06c52f64b..72baa9f368575dae69a67222237215a025ab1ef9 100644 (file)
@@ -8,7 +8,8 @@ from .compat import callable, cmp, reduce,  \
     threading, py3k, py2k, jython, pypy, cpython, win32, \
     pickle, dottedgetter, parse_qsl, namedtuple, next, WeakSet, reraise, \
     raise_from_cause, text_type, string_types, int_types, binary_type, \
-    quote_plus, with_metaclass, print_, itertools_filterfalse, u, b
+    quote_plus, with_metaclass, print_, itertools_filterfalse, u, b,\
+    unquote_plus
 
 from ._collections import KeyedTuple, ImmutableContainer, immutabledict, \
     Properties, OrderedProperties, ImmutableProperties, OrderedDict, \
index 43ac7d7ae53036b3a51c7e07596518e6101beffa..90a09d362517242b946eeb99fcca00072558dfff 100644 (file)
@@ -554,28 +554,29 @@ class EnumTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
     @testing.fails_on('postgresql+pg8000',
                       'zxjdbc fails on ENUM: column "XXX" is of type '
                       'XXX but expression is of type text')
+    @testing.provide_metadata
     def test_unicode_labels(self):
-        metadata = MetaData(testing.db)
+        metadata = self.metadata
         t1 = Table('table', metadata,
             Column('id', Integer, primary_key=True),
             Column('value',
-                    Enum('réveillé', 'drôle', 'S’il',
+                    Enum(util.u('réveillé'), util.u('drôle'), util.u('S’il'),
                             name='onetwothreetype'))
         )
-
         metadata.create_all()
-        try:
-            t1.insert().execute(value='drôle')
-            t1.insert().execute(value='réveillé')
-            t1.insert().execute(value='S’il')
-            eq_(t1.select().order_by(t1.c.id).execute().fetchall(),
-                [(1, 'drôle'), (2, 'réveillé'), (3, 'S’il')]
-            )
-            m2 = MetaData(testing.db)
-            t2 = Table('table', m2, autoload=True)
-            assert t2.c.value.type.enums == ('réveillé', 'drôle', 'S’il')
-        finally:
-            metadata.drop_all()
+        t1.insert().execute(value=util.u('drôle'))
+        t1.insert().execute(value=util.u('réveillé'))
+        t1.insert().execute(value=util.u('S’il'))
+        eq_(t1.select().order_by(t1.c.id).execute().fetchall(),
+            [(1, util.u('drôle')), (2, util.u('réveillé')),
+                    (3, util.u('S’il'))]
+        )
+        m2 = MetaData(testing.db)
+        t2 = Table('table', m2, autoload=True)
+        eq_(
+            t2.c.value.type.enums,
+            (util.u('réveillé'), util.u('drôle'), util.u('S’il'))
+        )
 
     def test_non_native_type(self):
         metadata = MetaData()
@@ -2174,18 +2175,18 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults):
 
     def test_insert_array(self):
         arrtable = self.tables.arrtable
-        arrtable.insert().execute(intarr=[1, 2, 3], strarr=['abc',
-                                  'def'])
+        arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'),
+                                  util.u('def')])
         results = arrtable.select().execute().fetchall()
         eq_(len(results), 1)
         eq_(results[0]['intarr'], [1, 2, 3])
-        eq_(results[0]['strarr'], ['abc', 'def'])
+        eq_(results[0]['strarr'], [util.u('abc'), util.u('def')])
 
     def test_array_where(self):
         arrtable = self.tables.arrtable
-        arrtable.insert().execute(intarr=[1, 2, 3], strarr=['abc',
-                                  'def'])
-        arrtable.insert().execute(intarr=[4, 5, 6], strarr='ABC')
+        arrtable.insert().execute(intarr=[1, 2, 3], strarr=[util.u('abc'),
+                                  util.u('def')])
+        arrtable.insert().execute(intarr=[4, 5, 6], strarr=util.u('ABC'))
         results = arrtable.select().where(arrtable.c.intarr == [1, 2,
                 3]).execute().fetchall()
         eq_(len(results), 1)
@@ -2194,7 +2195,7 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults):
     def test_array_concat(self):
         arrtable = self.tables.arrtable
         arrtable.insert().execute(intarr=[1, 2, 3],
-                    strarr=['abc', 'def'])
+                    strarr=[util.u('abc'), util.u('def')])
         results = select([arrtable.c.intarr + [4, 5,
                          6]]).execute().fetchall()
         eq_(len(results), 1)
@@ -2203,15 +2204,15 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults):
     def test_array_subtype_resultprocessor(self):
         arrtable = self.tables.arrtable
         arrtable.insert().execute(intarr=[4, 5, 6],
-                                  strarr=[['m\xe4\xe4'], ['m\xf6\xf6'
-                                  ]])
-        arrtable.insert().execute(intarr=[1, 2, 3], strarr=['m\xe4\xe4'
-                                  , 'm\xf6\xf6'])
+                                  strarr=[[util.u('m\xe4\xe4')], [
+                                    util.u('m\xf6\xf6')]])
+        arrtable.insert().execute(intarr=[1, 2, 3], strarr=[
+                        util.u('m\xe4\xe4'), util.u('m\xf6\xf6')])
         results = \
             arrtable.select(order_by=[arrtable.c.intarr]).execute().fetchall()
         eq_(len(results), 2)
-        eq_(results[0]['strarr'], ['m\xe4\xe4', 'm\xf6\xf6'])
-        eq_(results[1]['strarr'], [['m\xe4\xe4'], ['m\xf6\xf6']])
+        eq_(results[0]['strarr'], [util.u('m\xe4\xe4'), util.u('m\xf6\xf6')])
+        eq_(results[1]['strarr'], [[util.u('m\xe4\xe4')], [util.u('m\xf6\xf6')]])
 
     def test_array_literal(self):
         eq_(
@@ -2263,7 +2264,7 @@ class ArrayTest(fixtures.TablesTest, AssertsExecutionResults):
         testing.db.execute(
             arrtable.insert(),
             intarr=[4, 5, 6],
-            strarr=['abc', 'def']
+            strarr=[util.u('abc'), util.u('def')]
         )
         eq_(
             testing.db.scalar(select([arrtable.c.intarr[2:3]])),