]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Modifications to the mssql dialect in order to to pass through unicode in the pyodbc...
authorMichael Trier <mtrier@gmail.com>
Tue, 30 Dec 2008 06:39:37 +0000 (06:39 +0000)
committerMichael Trier <mtrier@gmail.com>
Tue, 30 Dec 2008 06:39:37 +0000 (06:39 +0000)
lib/sqlalchemy/databases/mssql.py
test/sql/testtypes.py

index ad9ba847abf745cd2f89caae0f483d27b8e68e73..490e562e959a028d987adb3569dfb45e03db8532 100644 (file)
@@ -221,6 +221,7 @@ from decimal import Decimal as _python_Decimal
 
 MSSQL_RESERVED_WORDS = set(['function'])
 
+
 class _StringType(object):
     """Base for MSSQL string types."""
 
@@ -253,6 +254,33 @@ class _StringType(object):
         return "%s(%s)" % (self.__class__.__name__,
                            ', '.join(['%s=%r' % (k, params[k]) for k in params]))
 
+    def bind_processor(self, dialect):
+        if self.convert_unicode or dialect.convert_unicode:
+            if self.assert_unicode is None:
+                assert_unicode = dialect.assert_unicode
+            else:
+                assert_unicode = self.assert_unicode
+
+            if not assert_unicode:
+                return None
+
+            def process(value):
+                if not isinstance(value, (unicode, sqltypes.NoneType)):
+                    if assert_unicode == 'warn':
+                        util.warn("Unicode type received non-unicode bind "
+                                  "param value %r" % value)
+                        return value
+                    else:
+                        raise exc.InvalidRequestError("Unicode type received non-unicode bind param value %r" % value)
+                else:
+                    return value
+            return process
+        else:
+            return None
+
+    def result_processor(self, dialect):
+        return None
+
 
 class MSNumeric(sqltypes.Numeric):
     def result_processor(self, dialect):
@@ -573,36 +601,6 @@ class MSNVarchar(_StringType, sqltypes.Unicode):
             return self._extend("NVARCHAR")
 
 
-class AdoMSNVarchar(_StringType, sqltypes.Unicode):
-    """overrides bindparam/result processing to not convert any unicode strings"""
-
-    def __init__(self, length=None, **kwargs):
-        """Construct a NVARCHAR.
-
-        :param length: Optional, Maximum data length, in characters.
-
-        :param collation: Optional, a column-level collation for this string
-          value. Accepts a Windows Collation Name or a SQL Collation Name.
-
-        """
-        _StringType.__init__(self, **kwargs)
-        sqltypes.Unicode.__init__(self, length=length,
-                convert_unicode=kwargs.get('convert_unicode', True),
-                assert_unicode=kwargs.get('assert_unicode', 'warn'))
-
-    def bind_processor(self, dialect):
-        return None
-
-    def result_processor(self, dialect):
-        return None
-
-    def get_col_spec(self):
-        if self.length:
-            return self._extend("NVARCHAR(%(length)s)" % {'length' : self.length})
-        else:
-            return self._extend("NVARCHAR")
-
-
 class MSChar(_StringType, sqltypes.CHAR):
     """MSSQL CHAR type, for fixed-length non-Unicode data with a maximum
     of 8,000 characters."""
@@ -1086,7 +1084,7 @@ class MSSQLDialect(default.DefaultDialect):
             coltype = self.ischema_names.get(type, None)
 
             kwargs = {}
-            if coltype in (MSString, MSChar, MSNVarchar, AdoMSNVarchar, MSNChar, MSText, MSNText):
+            if coltype in (MSString, MSChar, MSNVarchar, MSNChar, MSText, MSNText):
                 if collation:
                     kwargs.update(collation=collation)
 
@@ -1098,7 +1096,7 @@ class MSSQLDialect(default.DefaultDialect):
                               (type, name))
                     coltype = sqltypes.NULLTYPE
 
-                elif coltype in (MSNVarchar, AdoMSNVarchar) and charlen == -1:
+                elif coltype in (MSNVarchar,) and charlen == -1:
                     args[0] = None
                 coltype = coltype(*args, **kwargs)
             colargs = []
@@ -1262,9 +1260,6 @@ class MSSQLDialect_pyodbc(MSSQLDialect):
     ischema_names = MSSQLDialect.ischema_names.copy()
     ischema_names['smalldatetime'] = MSDate_pyodbc
     ischema_names['datetime'] = MSDateTime_pyodbc
-    if supports_unicode:
-        colspecs[sqltypes.Unicode] = AdoMSNVarchar
-        ischema_names['nvarchar'] = AdoMSNVarchar
 
     def make_connect_string(self, keys, query):
         if 'max_identifier_length' in keys:
@@ -1335,11 +1330,9 @@ class MSSQLDialect_adodbapi(MSSQLDialect):
         return module
 
     colspecs = MSSQLDialect.colspecs.copy()
-    colspecs[sqltypes.Unicode] = AdoMSNVarchar
     colspecs[sqltypes.DateTime] = MSDateTime_adodbapi
 
     ischema_names = MSSQLDialect.ischema_names.copy()
-    ischema_names['nvarchar'] = AdoMSNVarchar
     ischema_names['datetime'] = MSDateTime_adodbapi
 
     def make_connect_string(self, keys, query):
index ef4a355d3c89988c2770af9c99604462698f58c9..44b83defd86afe7bb184a2d30d49241c878bfff6 100644 (file)
@@ -368,7 +368,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
             testing.db.engine.dialect.assert_unicode = False
             rawdata = 'Alors vous imaginez ma surprise, au lever du jour, quand une dr\xc3\xb4le de petit voix m\xe2\x80\x99a r\xc3\xa9veill\xc3\xa9. Elle disait: \xc2\xab S\xe2\x80\x99il vous pla\xc3\xaet\xe2\x80\xa6 dessine-moi un mouton! \xc2\xbb\n'
             unicodedata = rawdata.decode('utf-8')
-            if testing.against('sqlite'):
+            if testing.against('sqlite', 'mssql'):
                 rawdata = "something"
             unicode_table.insert().execute(unicode_varchar=unicodedata,
                                            unicode_text=unicodedata,
@@ -380,7 +380,7 @@ class UnicodeTest(TestBase, AssertsExecutionResults):
             print 3, repr(x['plain_varchar'])
             self.assert_(isinstance(x['unicode_varchar'], unicode) and x['unicode_varchar'] == unicodedata)
             self.assert_(isinstance(x['unicode_text'], unicode) and x['unicode_text'] == unicodedata)
-            if not testing.against('sqlite'):
+            if not testing.against('sqlite', 'mssql'):
                 self.assert_(isinstance(x['plain_varchar'], unicode) and x['plain_varchar'] == unicodedata)
         finally:
             testing.db.engine.dialect.convert_unicode = prev_unicode