]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
new changelog
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 11 Jan 2014 18:12:40 +0000 (13:12 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 11 Jan 2014 18:12:40 +0000 (13:12 -0500)
doc/build/requirements.txt
lib/sqlalchemy/connectors/mysqldb.py
lib/sqlalchemy/engine/default.py
test/engine/test_execute.py

index 8b98c8e4571b5e539540063931ab34e6ac27e150..6e2354f07df31116ab197a9a30a324f82d5b3a22 100644 (file)
@@ -1,3 +1,3 @@
 mako
-changelog>=0.3.2
+changelog>=0.3.4
 sphinx-paramlinks>=0.2.0
index 0f250dfdbdcbaf6c47aa1ce5718f9a0e8f2376fb..0a752b87ea1ccc1687e5356b47a8f3eb74af82e9 100644 (file)
@@ -62,6 +62,18 @@ class MySQLDBConnector(Connector):
         # is overridden when pymysql is used
         return __import__('MySQLdb')
 
+    def _check_unicode_returns(self, connection):
+        # work around issue fixed in
+        # https://github.com/farcepest/MySQLdb1/commit/cd44524fef63bd3fcb71947392326e9742d520e8
+        # unicode charset fails for a table with
+        additional_tests = [
+            sql.collate(sql.cast(
+                    sql.literal_column(
+                        "'test collated returns'"),
+                        sqltypes.TEXT), 'utf8_bin')
+        ]
+        return super(MySQLDBConnector, self)._check_unicode_returns(connection, additional_tests)
+
     def do_executemany(self, cursor, statement, parameters, context=None):
         rowcount = cursor.executemany(statement, parameters)
         if context is not None:
index 509d772aabe432de1577e8aa522eae0e3bce988a..bcb9960b12ffe3aaeebad2f69d0728a52f47e73d 100644 (file)
@@ -228,46 +228,55 @@ class DefaultDialect(interfaces.Dialect):
         """
         return None
 
-    def _check_unicode_returns(self, connection):
+    def _check_unicode_returns(self, connection, additional_tests=None):
         if util.py2k and not self.supports_unicode_statements:
             cast_to = util.binary_type
         else:
             cast_to = util.text_type
 
-        def check_unicode(formatstr, type_):
+        if self.positional:
+            parameters = self.execute_sequence_format()
+        else:
+            parameters = {}
+
+        def check_unicode(test):
             cursor = connection.connection.cursor()
             try:
                 try:
-                    cursor.execute(
-                        cast_to(
-                            expression.select(
-                                [expression.cast(
-                                    expression.literal_column(
-                                        "'test %s returns'" % formatstr),
-                                        type_)
-                            ]).compile(dialect=self)
-                        )
-                    )
+                    statement = cast_to(expression.select([test]).compile(dialect=self))
+                    connection._cursor_execute(cursor, statement, parameters)
                     row = cursor.fetchone()
 
                     return isinstance(row[0], util.text_type)
-                except self.dbapi.Error as de:
+                except exc.DBAPIError as de:
                     util.warn("Exception attempting to "
                             "detect unicode returns: %r" % de)
                     return False
             finally:
                 cursor.close()
 
-        # detect plain VARCHAR
-        unicode_for_varchar = check_unicode("plain", sqltypes.VARCHAR(60))
-
-        # detect if there's an NVARCHAR type with different behavior available
-        unicode_for_unicode = check_unicode("unicode", sqltypes.Unicode(60))
-
-        if unicode_for_unicode and not unicode_for_varchar:
+        tests = [
+            # detect plain VARCHAR
+            expression.cast(
+                expression.literal_column("'test plain returns'"),
+                sqltypes.VARCHAR(60)
+            ),
+            # detect if there's an NVARCHAR type with different behavior available
+            expression.cast(
+                expression.literal_column("'test unicode returns'"),
+                sqltypes.Unicode(60)
+            ),
+        ]
+
+        if additional_tests:
+            tests += additional_tests
+
+        results = set([check_unicode(test) for test in tests])
+
+        if results.issuperset([True, False]):
             return "conditional"
         else:
-            return unicode_for_varchar
+            return results == set([True])
 
     def _check_unicode_description(self, connection):
         # all DBAPIs on Py2K return cursor.description as encoded,
index c2479eff7ab65ee6d6d14709d1cebdff305dfc1e..d3bd3c2cdaeda03c4800128b99a366d294c72218 100644 (file)
@@ -1050,7 +1050,7 @@ class ResultProxyTest(fixtures.TestBase):
 
 class ExecutionOptionsTest(fixtures.TestBase):
     def test_dialect_conn_options(self):
-        engine = testing_engine("sqlite://")
+        engine = testing_engine("sqlite://", options=dict(_initialize=False))
         engine.dialect = Mock()
         conn = engine.connect()
         c2 = conn.execution_options(foo="bar")