]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
pg8000 handling unicode fine now
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 16 Jan 2009 21:02:46 +0000 (21:02 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 16 Jan 2009 21:02:46 +0000 (21:02 +0000)
lib/sqlalchemy/dialects/postgres/pg8000.py
lib/sqlalchemy/dialects/postgres/psycopg2.py
test/sql/testtypes.py

index 43ed3ee1d4d7fcb1c22338daaea97e84f0dc0794..758ccca1eadd02e79ee6b06e034e4011dae2d224 100644 (file)
@@ -8,9 +8,9 @@ URLs are of the form `postgres+pg8000://user@password@host:port/dbname[?key=valu
 Unicode
 -------
 
-Unicode data which contains non-ascii characters don't seem to be supported yet.  non-ascii
-schema identifiers though *are* supported, if you set the client_encoding=utf8 in the postgresql.conf 
-file.
+pg8000 requires that the postgres client encoding be configured in the postgresql.conf file
+in order to use encodings other than ascii.  Set this value to the same value as 
+the "encoding" parameter on create_engine(), usually "utf-8".
 
 Interval
 --------
@@ -52,13 +52,18 @@ class Postgres_pg8000(PGDialect):
 
     supports_unicode_statements = False #True
     
-    # this one doesn't matter, cant pass non-ascii through
-    # pending further investigation
-    supports_unicode_binds = False #True
+    supports_unicode_binds = True
     
     default_paramstyle = 'format'
     supports_sane_multi_rowcount = False
     execution_ctx_cls = Postgres_pg8000ExecutionContext
+    colspecs = util.update_copy(
+        PGDialect.colspecs,
+        {
+            sqltypes.Numeric : PGNumeric,
+            sqltypes.Float: sqltypes.Float,  # prevents PGNumeric from being used
+        }
+    )
     
     @classmethod
     def dbapi(cls):
index f46da2182706769ae306addfff5ae6865f3d121f..b90ac8d9c651070a64b9ad803ad99e2213c5f0e9 100644 (file)
@@ -58,12 +58,6 @@ class PGNumeric(sqltypes.Numeric):
             return process
 
 
-colspecs = PGDialect.colspecs.copy()
-colspecs.update({
-    sqltypes.Numeric : PGNumeric,
-    sqltypes.Float: sqltypes.Float,  # prevents PGNumeric from being used
-})
-
 # TODO: filter out 'FOR UPDATE' statements
 SERVER_SIDE_CURSOR_RE = re.compile(
     r'\s*SELECT',
@@ -117,6 +111,15 @@ class Postgres_psycopg2(PGDialect):
     supports_sane_multi_rowcount = False
     execution_ctx_cls = Postgres_psycopg2ExecutionContext
     statement_compiler = Postgres_psycopg2Compiler
+
+    colspecs = util.update_copy(
+        PGDialect.colspecs,
+        {
+            sqltypes.Numeric : PGNumeric,
+            sqltypes.Float: sqltypes.Float,  # prevents PGNumeric from being used
+        }
+    )
+
     
     def __init__(self, server_side_cursors=False, **kwargs):
         PGDialect.__init__(self, **kwargs)
@@ -134,9 +137,6 @@ class Postgres_psycopg2(PGDialect):
         opts.update(url.query)
         return ([], opts)
 
-    def type_descriptor(self, typeobj):
-        return sqltypes.adapt_type(typeobj, colspecs)
-
     def is_disconnect(self, e):
         if isinstance(e, self.dbapi.OperationalError):
             return 'closed the connection' in str(e) or 'connection not open' in str(e)
index 39f79e540c6024c124904eecfb80449ed5acbcbd..73fcabb4a0b0098277e7c43a5141dd288a624712 100644 (file)
@@ -292,7 +292,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
         rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
         unicodedata = rawdata.decode('utf-8')
         
-        if testing.against('sqlite', '>' '2.4'):
+        if testing.db.dialect.supports_unicode_binds:
             rawdata = "something"
             
         unicode_table.insert().execute(unicode_varchar=unicodedata,
@@ -303,10 +303,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
         self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata)
 
         if isinstance(x['plain_varchar'], unicode):
-            # SQLLite and MSSQL return non-unicode data as unicode
-            self.assert_(testing.against('sqlite', '+pyodbc'))
-            if not testing.against('sqlite'):
-                self.assert_(x['plain_varchar'] == unicodedata)
+            assert testing.db.dialect.supports_unicode_binds
         else:
             self.assert_(not isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == rawdata)
 
@@ -315,7 +312,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
 
         rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
         unicodedata = rawdata.decode('utf-8')
-        if testing.against('sqlite'):
+        if testing.db.dialect.supports_unicode_binds:
             rawdata = "something"
         unicode_table.insert().execute(unicode_varchar=unicodedata,
                                        unicode_text=unicodedata,
@@ -325,20 +322,16 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
         self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
 
     def test_assertions(self):
-        try:
-            unicode_table.insert().execute(unicode_varchar='not unicode')
-            assert False
-        except exc.SAWarning, e:
-            assert str(e) == "Unicode type received non-unicode bind param value 'not unicode'", str(e)
+        self.assertRaisesMessage(exc.SAWarning, "Unicode type received non-unicode bind param value 'not unicode'", 
+            unicode_table.insert().execute, unicode_varchar='not unicode'
+        )
 
         unicode_engine = engines.utf8_engine(options={'convert_unicode':True,
                                                       'assert_unicode':True})
         try:
-            try:
-                unicode_engine.execute(unicode_table.insert(), plain_varchar='im not unicode')
-                assert False
-            except exc.InvalidRequestError, e:
-                assert str(e) == "Unicode type received non-unicode bind param value 'im not unicode'"
+            self.assertRaisesMessage(exc.InvalidRequestError, "Unicode type received non-unicode bind param value 'im not unicode'", 
+                unicode_engine.execute, unicode_table.insert(), plain_varchar='im not unicode'
+            )
 
             @testing.emits_warning('.*non-unicode bind')
             def warns():
@@ -357,6 +350,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
 
     def test_engine_parameter(self):
         """tests engine-wide unicode conversion"""
+        
         prev_unicode = testing.db.engine.dialect.convert_unicode
         prev_assert = testing.db.engine.dialect.assert_unicode
         try:
@@ -364,7 +358,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
             testing.db.engine.dialect.assert_unicode = False
             rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
             unicodedata = rawdata.decode('utf-8')
-            if testing.against('sqlite', 'mssql'):
+            if testing.db.dialect.supports_unicode_binds:
                 rawdata = "something"
             unicode_table.insert().execute(unicode_varchar=unicodedata,
                                            unicode_text=unicodedata,
@@ -376,7 +370,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
             print 3, repr(x['plain_varchar'])
             self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
             self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata)
-            if not testing.against('sqlite', 'mssql'):
+            if not testing.db.dialect.supports_unicode_binds:
                 self.assert_(isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == unicodedata)
         finally:
             testing.db.engine.dialect.convert_unicode = prev_unicode