from array import array as _array
from sqlalchemy import exceptions, logging, schema, sql, util
-from sqlalchemy.pool import connection_cache_decorator
from sqlalchemy.sql import operators as sql_operators
from sqlalchemy.sql import functions as sql_functions
from sqlalchemy.sql import compiler
SELECT_RE = re.compile(
r'\s*(?:SELECT|SHOW|DESCRIBE|XA RECOVER)',
re.I | re.UNICODE)
+SET_RE = re.compile(
+ r'\s*SET\s+(?:(?:GLOBAL|SESSION)\s+)?\w',
+ re.I | re.UNICODE)
+
class _NumericType(object):
"""Base for MySQL numeric types."""
self._last_inserted_ids[0] is None):
self._last_inserted_ids = ([self.cursor.lastrowid] +
self._last_inserted_ids[1:])
+ elif (not self.isupdate and not self.should_autocommit and
+ self.statement and SET_RE.match(self.statement)):
+ # This misses if a user forces autocommit on text('SET NAMES'),
+ # which is probably a programming error anyhow.
+ self.connection.info.pop(('mysql', 'charset'), None)
def returns_rows_text(self, statement):
return SELECT_RE.match(statement)
def get_default_schema_name(self, connection):
return connection.execute('SELECT DATABASE()').scalar()
- get_default_schema_name = connection_cache_decorator(get_default_schema_name)
-
+ get_default_schema_name = engine_base.connection_memoize(
+ ('dialect', 'default_schema_name'))(get_default_schema_name)
+
def table_names(self, connection, schema):
"""Return a Unicode SHOW TABLES from a given schema."""
cached per-Connection.
"""
- try:
- return connection.info['_mysql_server_version_info']
- except KeyError:
- version = connection.info['_mysql_server_version_info'] = \
- self._server_version_info(connection.connection.connection)
- return version
+ return self._server_version_info(connection.connection.connection)
+ server_version_info = engine_base.connection_memoize(
+ ('mysql', 'server_version_info'))(server_version_info)
def _server_version_info(self, dbapi_con):
"""Convert a MySQL-python server_info string into a tuple."""
columns = self._describe_table(connection, table, charset)
sql = reflector._describe_to_create(table, columns)
- self._adjust_casing(connection, table, charset)
+ self._adjust_casing(connection, table)
return reflector.reflect(connection, table, sql, charset,
only=include_columns)
def _adjust_casing(self, connection, table, charset=None):
"""Adjust Table name to the server case sensitivity, if needed."""
- if charset is None:
- charset = self._detect_charset(connection)
-
- casing = self._detect_casing(connection, charset)
+ casing = self._detect_casing(connection)
# For winxx database hosts. TODO: is this really needed?
if casing == 1 and table.name != table.name.lower():
"""Sniff out the character set in use for connection results."""
# Allow user override, won't sniff if force_charset is set.
- if 'force_charset' in connection.info:
- return connection.info['force_charset']
+ if ('mysql', 'force_charset') in connection.info:
+ return connection.info[('mysql', 'force_charset')]
# Note: MySQL-python 1.2.1c7 seems to ignore changes made
# on a connection via set_character_set()
"combination of MySQL server and MySQL-python. "
"MySQL-python >= 1.2.2 is recommended. Assuming latin1.")
return 'latin1'
+ _detect_charset = engine_base.connection_memoize(
+ ('mysql', 'charset'))(_detect_charset)
+
- def _detect_casing(self, connection, charset=None):
+ def _detect_casing(self, connection):
"""Sniff out identifier case sensitivity.
Cached per-connection. This value can not change without a server
restart.
- """
+ """
# http://dev.mysql.com/doc/refman/5.0/en/name-case-sensitivity.html
- try:
- return connection.info['lower_case_table_names']
- except KeyError:
- row = _compat_fetchone(connection.execute(
- "SHOW VARIABLES LIKE 'lower_case_table_names'"),
- charset=charset)
- if not row:
+ charset = self._detect_charset(connection)
+ row = _compat_fetchone(connection.execute(
+ "SHOW VARIABLES LIKE 'lower_case_table_names'"),
+ charset=charset)
+ if not row:
+ cs = 0
+ else:
+ # 4.0.15 returns OFF or ON according to [ticket:489]
+ # 3.23 doesn't, 4.0.27 doesn't..
+ if row[1] == 'OFF':
cs = 0
+ elif row[1] == 'ON':
+ cs = 1
else:
- # 4.0.15 returns OFF or ON according to [ticket:489]
- # 3.23 doesn't, 4.0.27 doesn't..
- if row[1] == 'OFF':
- cs = 0
- elif row[1] == 'ON':
- cs = 1
- else:
- cs = int(row[1])
- row.close()
- connection.info['lower_case_table_names'] = cs
- return cs
+ cs = int(row[1])
+ row.close()
+ return cs
+ _detect_casing = engine_base.connection_memoize(
+ ('mysql', 'lower_case_table_names'))(_detect_casing)
- def _detect_collations(self, connection, charset=None):
+ def _detect_collations(self, connection):
"""Pull the active COLLATIONS list from the server.
Cached per-connection.
"""
- try:
- return connection.info['collations']
- except KeyError:
- collations = {}
- if self.server_version_info(connection) < (4, 1, 0):
- pass
- else:
- rs = connection.execute('SHOW COLLATION')
- for row in _compat_fetchall(rs, charset):
- collations[row[0]] = row[1]
- connection.info['collations'] = collations
- return collations
+ collations = {}
+ if self.server_version_info(connection) < (4, 1, 0):
+ pass
+ else:
+ charset = self._detect_charset(connection)
+ rs = connection.execute('SHOW COLLATION')
+ for row in _compat_fetchall(rs, charset):
+ collations[row[0]] = row[1]
+ return collations
+ _detect_collations = engine_base.connection_memoize(
+ ('mysql', 'collations'))(_detect_collations)
def use_ansiquotes(self, useansi):
self._use_ansiquotes = useansi
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-import inspect, itertools, sets, sys, warnings, weakref
+import inspect, itertools, new, sets, sys, warnings, weakref
import __builtin__
types = __import__('types')
return sym
finally:
symbol._lock.release()
-
+
+
+def function_named(fn, name):
+ """Return a function with a given __name__.
+
+ Will assign to __name__ and return the original function if possible on
+ the Python implementation, otherwise a new function will be constructed.
+
+ """
+ try:
+ fn.__name__ = name
+ except TypeError:
+ fn = new.function(fn.func_code, fn.func_globals, name,
+ fn.func_defaults, fn.func_closure)
+ return fn
+
def conditional_cache_decorator(func):
"""apply conditional caching to the return value of a function."""
func_with_warning.__doc__ = doc
func_with_warning.__dict__.update(func.__dict__)
- try:
- func_with_warning.__name__ = func.__name__
- except TypeError:
- pass
- return func_with_warning
+
+ return function_named(func_with_warning, func.__name__)
self.assert_compile(cast(t.c.col, type_), expected)
+class ExecutionTest(TestBase):
+ """Various MySQL execution special cases."""
+
+ __only_on__ = 'mysql'
+
+ def test_charset_caching(self):
+ engine = engines.testing_engine()
+
+ cx = engine.connect()
+ meta = MetaData()
+
+ assert ('mysql', 'charset') not in cx.info
+ assert ('mysql', 'force_charset') not in cx.info
+
+ cx.execute(text("SELECT 1")).fetchall()
+ assert ('mysql', 'charset') not in cx.info
+
+ meta.reflect(cx)
+ assert ('mysql', 'charset') in cx.info
+
+ cx.execute(text("SET @squiznart=123"))
+ assert ('mysql', 'charset') in cx.info
+
+ # the charset invalidation is very conservative
+ cx.execute(text("SET TIMESTAMP = DEFAULT"))
+ assert ('mysql', 'charset') not in cx.info
+
+ cx.info[('mysql', 'force_charset')] = 'latin1'
+
+ assert engine.dialect._detect_charset(cx) == 'latin1'
+ assert cx.info[('mysql', 'charset')] == 'latin1'
+
+ del cx.info[('mysql', 'force_charset')]
+ del cx.info[('mysql', 'charset')]
+
+ meta.reflect(cx)
+ assert ('mysql', 'charset') in cx.info
+
+ # String execution doesn't go through the detector.
+ cx.execute("SET TIMESTAMP = DEFAULT")
+ assert ('mysql', 'charset') in cx.info
+
+
def colspec(c):
return testing.db.dialect.schemagenerator(testing.db.dialect,
testing.db, None, None).get_column_specification(c)