from sqlalchemy import exc, log, schema, sql, types as sqltypes, util
-class _PlainQuery(unicode):
- pass
-
-
class _oursqlNumeric(NUMERIC):
def result_processor(self, dialect, coltype):
if self.asdecimal:
return None
+
+class MySQL_oursqlExecutionContext(MySQLExecutionContext):
+
+ @property
+ def plain_query(self):
+ return self._connection.options.get('plain_query', False)
+
+
class MySQL_oursql(MySQLDialect):
driver = 'oursql'
supports_unicode_statements = True
supports_unicode_binds = True
supports_sane_rowcount = True
supports_sane_multi_rowcount = True
+ execution_ctx_cls = MySQL_oursqlExecutionContext
colspecs = util.update_copy(
MySQLDialect.colspecs,
def do_execute(self, cursor, statement, parameters, context=None):
"""Provide an implementation of *cursor.execute(statement, parameters)*."""
- if context and not context.compiled and isinstance(context.statement, _PlainQuery):
+ if context and context.plain_query:
cursor.execute(statement, plain_query=True)
else:
cursor.execute(statement, parameters)
connection.cursor().execute('BEGIN', plain_query=True)
def _xa_query(self, connection, query, xid):
- connection.execute(_PlainQuery(query % connection.connection._escape_string(xid)))
+ connection._with_options(plain_query=True).execute(query % connection.connection._escape_string(xid))
# Because mysql is bad, these methods have to be reimplemented to use _PlainQuery. Basically, some queries
# refuse to return any data if they're run through the parameterized query API, or refuse to be parameterized
self._xa_query(connection, 'XA COMMIT "%s"', xid)
def has_table(self, connection, table_name, schema=None):
- full_name = '.'.join(self.identifier_preparer._quote_free_identifiers(
- schema, table_name))
-
- st = "DESCRIBE %s" % full_name
- rs = None
- try:
- try:
- rs = connection.execute(_PlainQuery(st))
- have = rs.rowcount > 0
- rs.close()
- return have
- except exc.SQLError, e:
- if self._extract_error_code(e) == 1146:
- return False
- raise
- finally:
- if rs:
- rs.close()
+ return MySQLDialect.has_table(self, connection._with_options(plain_query=True), table_name, schema)
def _show_create_table(self, connection, table, charset=None,
full_name=None):
- """Run SHOW CREATE TABLE for a ``Table``."""
-
- if full_name is None:
- full_name = self.identifier_preparer.format_table(table)
- st = "SHOW CREATE TABLE %s" % full_name
-
- rp = None
- try:
- try:
- rp = connection.execute(_PlainQuery(st))
- except exc.SQLError, e:
- if self._extract_error_code(e) == 1146:
- raise exc.NoSuchTableError(full_name)
- else:
- raise
- row = rp.fetchone()
- if not row:
- raise exc.NoSuchTableError(full_name)
- return row[1].strip()
- finally:
- if rp:
- rp.close()
+ return MySQLDialect._show_create_table(self,
+ connection.contextual_connect(close_with_result=True)._with_options(plain_query=True),
+ table, charset, full_name)
def is_disconnect(self, e):
if isinstance(e, self.dbapi.ProgrammingError): # if underlying connection is closed, this is the error you get
.. index::
single: thread safety; Connection
"""
-
+ options = {}
+
def __init__(self, engine, connection=None, close_with_result=False,
- _branch=False):
+ _branch=False, _options=None):
"""Construct a new Connection.
Connection objects are typically constructed by an
self.__savepoint_seq = 0
self.__branch = _branch
self.__invalid = False
+ if _options:
+ self.options = _options
def _branch(self):
"""Return a new Connection which references this Connection's
"""
return self.engine.Connection(self.engine, self.__connection, _branch=True)
-
+
+ def _with_options(self, **opt):
+ """Add keyword options to a Connection generatively.
+
+ Experimental. May change the name/signature at
+ some point.
+
+ If made public, strongly consider the name
+ "options()" so as to be consistent with
+ orm.Query.options().
+
+ """
+ return self.engine.Connection(
+ self.engine, self.__connection,
+ _branch=self.__branch, _options=opt)
+
@property
def dialect(self):
"Dialect used by this Connection."