Defines the basic components used to interface DB-API modules with
higher-level statement-construction, connection-management, execution
and result contexts.
-
"""
-__all__ = ['BufferedColumnResultProxy', 'BufferedColumnRow', 'BufferedRowResultProxy', 'Compiled', 'Connectable',
- 'Connection', 'DefaultRunner', 'Dialect', 'Engine', 'ExecutionContext', 'NestedTransaction', 'ResultProxy',
- 'RootTransaction', 'RowProxy', 'SchemaIterator', 'StringIO', 'Transaction', 'TwoPhaseTransaction', 'connection_memoize']
+__all__ = [
+ 'BufferedColumnResultProxy', 'BufferedColumnRow', 'BufferedRowResultProxy',
+ 'Compiled', 'Connectable', 'Connection', 'DefaultRunner', 'Dialect', 'Engine',
+ 'ExecutionContext', 'NestedTransaction', 'ResultProxy', 'RootTransaction',
+ 'RowProxy', 'SchemaIterator', 'StringIO', 'Transaction', 'TwoPhaseTransaction',
+ 'connection_memoize']
import inspect, StringIO, sys
from sqlalchemy import exc, schema, util, types, log
ExecutionContext, Compiled, DefaultGenerator, and TypeEngine.
All Dialects implement the following attributes:
-
+
name
identifying name for the dialect from a DBAPI-neutral point of view
(i.e. 'sqlite')
-
+
driver
- identitfying name for the dialect's DBAPI
-
+ identitfying name for the dialect's DBAPI
+
positional
True if the paramstyle for this Dialect is positional.
defaults.
statement_compiler
- a :class:`~Compiled` class used to compile SQL
- statements
+ a :class:`~Compiled` class used to compile SQL statements
ddl_compiler
- a :class:`~Compiled` class used to compile DDL
- statements
+ a :class:`~Compiled` class used to compile DDL statements
server_version_info
a tuple containing a version number for the DB backend in use.
- This value is only available for supporting dialects, and only for
+ This value is only available for supporting dialects, and only for
a dialect that's been associated with a connection pool via
create_engine() or otherwise had its ``initialize()`` method called
with a conneciton.
The maximum length of identifier names.
supports_unicode_statements
- Indicate whether the DB-API can receive SQL statements as Python unicode strings
+ Indicate whether the DB-API can receive SQL statements as Python
+ unicode strings
supports_unicode_binds
- Indicate whether the DB-API can receive string bind parameters as Python unicode strings
+ Indicate whether the DB-API can receive string bind parameters
+ as Python unicode strings
supports_sane_rowcount
- Indicate whether the dialect properly implements rowcount for ``UPDATE`` and ``DELETE`` statements.
+ Indicate whether the dialect properly implements rowcount for
+ ``UPDATE`` and ``DELETE`` statements.
supports_sane_multi_rowcount
- Indicate whether the dialect properly implements rowcount for ``UPDATE`` and ``DELETE`` statements
- when executed via executemany.
+ Indicate whether the dialect properly implements rowcount for
+ ``UPDATE`` and ``DELETE`` statements when executed via
+ executemany.
preexecute_pk_sequences
- Indicate if the dialect should pre-execute sequences on primary key
- columns during an INSERT, if it's desired that the new row's primary key
- be available after execution.
+ Indicate if the dialect should pre-execute sequences on primary
+ key columns during an INSERT, if it's desired that the new row's
+ primary key be available after execution.
supports_pk_autoincrement
Indicates if the dialect should allow the database to passively assign
dbapi_type_map
A mapping of DB-API type objects present in this Dialect's
- DB-API implmentation mapped to TypeEngine implementations used
+ DB-API implementation mapped to TypeEngine implementations used
by the dialect.
This is used to apply types to result sets based on the DB-API
types present in cursor.description; it only takes effect for
result sets against textual statements where no explicit
typemap was present.
-
+
colspecs
- A dictionary of TypeEngine classes from sqlalchemy.types mapped to subclasses
- that are specific to the dialect class. This dictionary is class-level only
- and is not accessed from the dialect instance itself.
-
+ A dictionary of TypeEngine classes from sqlalchemy.types mapped
+ to subclasses that are specific to the dialect class. This
+ dictionary is class-level only and is not accessed from the
+ dialect instance itself.
+
supports_default_values
- Indicates if the construct ``INSERT INTO tablename DEFAULT VALUES`` is supported
-
+ Indicates if the construct ``INSERT INTO tablename DEFAULT
+ VALUES`` is supported
"""
def create_connect_args(self, url):
raise NotImplementedError()
-
@classmethod
def type_descriptor(cls, typeobj):
"""Transform a generic type to a dialect-specific type.
Dialect classes will usually use the
:func:`~sqlalchemy.types.adapt_type` method in the types module to
make this job easy.
-
+
The returned result is cached *per dialect class* so can
contain no dialect-instance state.
-
"""
raise NotImplementedError()
-
def initialize(self, connection):
"""Called during strategized creation of the dialect with a connection.
-
+
Allows dialects to configure options based on server version info or
other properties.
-
"""
+
pass
def reflecttable(self, connection, table, include_columns=None):
def get_columns(self, connection, table_name, schema=None, **kw):
"""Return information about columns in `table_name`.
- Given a [sqlalchemy.engine#Connection], a string `table_name`, and an
- optional string `schema`, return column information as a list of
- dictionaries with these keys:
+ Given a :class:`~sqlalchemy.engine.Connection`, a string
+ `table_name`, and an optional string `schema`, return column
+ information as a list of dictionaries with these keys:
name
the column's name
def get_primary_keys(self, connection, table_name, schema=None, **kw):
"""Return information about primary keys in `table_name`.
- Given a [sqlalchemy.engine#Connection], a string `table_name`, and an
- optional string `schema`, return primary key information as a list of
- column names.
-
+ Given a :class:`~sqlalchemy.engine.Connection`, a string
+ `table_name`, and an optional string `schema`, return primary
+ key information as a list of column names.
"""
raise NotImplementedError()
def get_foreign_keys(self, connection, table_name, schema=None, **kw):
"""Return information about foreign_keys in `table_name`.
- Given a [sqlalchemy.engine#Connection], a string `table_name`, and an
- optional string `schema`, return foreign key information as a list of
- dicts with these keys:
+ Given a :class:`~sqlalchemy.engine.Connection`, a string
+ `table_name`, and an optional string `schema`, return foreign
+ key information as a list of dicts with these keys:
name
the constraint's name
schema:
Optional, retrieve names from a non-default schema.
-
"""
raise NotImplementedError()
def get_view_definition(self, connection, view_name, schema=None, **kw):
"""Return view definition.
- Given a [sqlalchemy.engine#Connection], a string `view_name`, and an
- optional string `schema`, return the view definition.
-
+ Given a :class:`~sqlalchemy.engine.Connection`, a string
+ `view_name`, and an optional string `schema`, return the view
+ definition.
"""
raise NotImplementedError()
def get_indexes(self, connection, table_name, schema=None, **kw):
"""Return information about indexes in `table_name`.
- Given a [sqlalchemy.engine#Connection], a string `table_name` and an
- optional string `schema`, return index information as a list of
- dictionaries with these keys:
+ Given a :class:`~sqlalchemy.engine.Connection`, a string
+ `table_name` and an optional string `schema`, return index
+ information as a list of dictionaries with these keys:
name
the index's name
unique
boolean
-
"""
raise NotImplementedError()
class ExecutionContext(object):
"""A messenger object for a Dialect that corresponds to a single execution.
- ExecutionContext should have these datamembers:
+ ExecutionContext should have these data members:
connection
Connection object which can be freely used by default value
True if the statement is an UPDATE.
should_autocommit
- True if the statement is a "committable" statement
+ True if the statement is a "committable" statement.
postfetch_cols
- a list of Column objects for which a server-side default
- or inline SQL expression value was fired off. applies to inserts and updates.
-
-
+ a list of Column objects for which a server-side default or
+ inline SQL expression value was fired off. Applies to inserts
+ and updates.
"""
def create_cursor(self):
def handle_dbapi_exception(self, e):
"""Receive a DBAPI exception which occured upon execute, result fetch, etc."""
-
+
raise NotImplementedError()
-
+
def should_autocommit_text(self, statement):
"""Parse the given textual statement and return True if it refers to a "committable" statement"""
``Compiled`` object be dependent on the actual values of those
bind parameters, even though it may reference those values as
defaults.
-
"""
def __init__(self, dialect, statement, bind=None):
"""Construct a new ``Compiled`` object.
- dialect
- ``Dialect`` to compile against.
+ :param dialect: ``Dialect`` to compile against.
- statement
- ``ClauseElement`` to be compiled.
+ :param statement: ``ClauseElement`` to be compiled.
- bind
- Optional Engine or Connection to compile this statement against.
-
+ :param bind: Optional Engine or Connection to compile this statement against.
"""
+
self.dialect = dialect
self.statement = statement
self.bind = bind
def construct_params(self, params=None):
"""Return the bind params for this compiled object.
- `params` is a dict of string/object pairs whos
- values will override bind values compiled in
- to the statement.
-
+ :param params: a dict of string/object pairs whos values will
+ override bind values compiled in to the
+ statement.
"""
+
raise NotImplementedError()
def execute(self, *multiparams, **params):
return self.execute(*multiparams, **params).scalar()
+
class TypeCompiler(object):
"""Produces DDL specification for TypeEngine objects."""
-
+
def __init__(self, dialect):
self.dialect = dialect
-
+
def process(self, type_):
return type_._compiler_dispatch(self)
-
+
class Connectable(object):
"""Interface for an object which supports execution of SQL constructs.
-
+
The two implementations of ``Connectable`` are :class:`Connection` and
:class:`Engine`.
-
+
Connectable must also implement the 'dialect' member which references a
:class:`Dialect` instance.
-
"""
def contextual_connect(self):
def _execute_clauseelement(self, elem, multiparams=None, params=None):
raise NotImplementedError()
+
class Connection(Connectable):
"""Provides high-level functionality for a wrapped DB-API connection.
.. index::
single: thread safety; Connection
-
"""
def __init__(self, engine, connection=None, close_with_result=False,
Connection objects are typically constructed by an
:class:`~sqlalchemy.engine.Engine`, see the ``connect()`` and
``contextual_connect()`` methods of Engine.
-
"""
self.engine = engine
self.__savepoint_seq = 0
self.__branch = _branch
self.__invalid = False
-
+
def _branch(self):
"""Return a new Connection which references this Connection's
engine and connection; but does not have close_with_result enabled,
This is used to execute "sub" statements within a single execution,
usually an INSERT statement.
-
"""
+
return self.engine.Connection(self.engine, self.__connection, _branch=True)
@property
@property
def closed(self):
- """return True if this connection is closed."""
+ """Return True if this connection is closed."""
return not self.__invalid and '_Connection__connection' not in self.__dict__
@property
def invalidated(self):
- """return True if this connection was invalidated."""
+ """Return True if this connection was invalidated."""
return self.__invalid
def should_close_with_result(self):
"""Indicates if this Connection should be closed when a corresponding
ResultProxy is closed; this is essentially an auto-release mode.
-
"""
+
return self.__close_with_result
@property
def info(self):
"""A collection of per-DB-API connection instance properties."""
+
return self.connection.info
def connect(self):
This ``Connectable`` interface method returns self, allowing
Connections to be used interchangably with Engines in most
situations that require a bind.
-
"""
+
return self
def contextual_connect(self, **kwargs):
This ``Connectable`` interface method returns self, allowing
Connections to be used interchangably with Engines in most
situations that require a bind.
-
"""
+
return self
def invalidate(self, exception=None):
rolled back before a reconnect on this Connection can proceed. This
is to prevent applications from accidentally continuing their transactional
operations in a non-transactional state.
-
"""
+
if self.closed:
raise exc.InvalidRequestError("This Connection is closed")
:class:`~sqlalchemy.interfaces.PoolListener` for a mechanism to modify
connection state when connections leave and return to their
connection pool.
-
"""
+
self.__connection.detach()
def begin(self):
outermost transaction may ``commit``. Calls to ``commit`` on
inner transactions are ignored. Any transaction in the
hierarchy may ``rollback``, however.
-
"""
+
if self.__transaction is None:
self.__transaction = RootTransaction(self)
else:
def begin_twophase(self, xid=None):
"""Begin a two-phase or XA transaction and return a Transaction handle.
- xid
- the two phase transaction id. If not supplied, a random id
- will be generated.
+ :param xid: the two phase transaction id. If not supplied, a random id
+ will be generated.
"""
if self.__transaction is not None:
raise exc.InvalidRequestError("Unexecutable object type: " + str(type(object)))
def __distill_params(self, multiparams, params):
- """given arguments from the calling form *multiparams, **params, return a list
+ """Given arguments from the calling form *multiparams, **params, return a list
of bind parameter structures, usually a list of dictionaries.
- in the case of 'raw' execution which accepts positional parameters,
- it may be a list of tuples or lists."""
+ In the case of 'raw' execution which accepts positional parameters,
+ it may be a list of tuples or lists.
+ """
if not multiparams:
if params:
def _execute_ddl(self, ddl, params, multiparams):
context = self.__create_execution_context(
- compiled_ddl=ddl.compile(dialect=self.dialect),
+ compiled_ddl=ddl.compile(dialect=self.dialect),
parameters=None
)
return self.__execute_context(context)
keys = []
context = self.__create_execution_context(
- compiled_sql=elem.compile(dialect=self.dialect, column_keys=keys, inline=len(params) > 1),
+ compiled_sql=elem.compile(dialect=self.dialect, column_keys=keys, inline=len(params) > 1),
parameters=params
)
return self.__execute_context(context)
"""Execute a sql.Compiled object."""
context = self.__create_execution_context(
- compiled_sql=compiled,
+ compiled_sql=compiled,
parameters=self.__distill_params(multiparams, params)
)
return self.__execute_context(context)
parameters = self.__distill_params(multiparams, params)
context = self.__create_execution_context(statement=statement, parameters=parameters)
return self.__execute_context(context)
-
+
def __execute_context(self, context):
if context.compiled:
context.pre_exec()
if context.should_autocommit and not self.in_transaction():
self._commit_impl()
return context.get_result_proxy()
-
+
def _handle_dbapi_exception(self, e, statement, parameters, cursor, context):
if getattr(self, '_reentrant_error', False):
raise exc.DBAPIError.instance(statement, parameters, e), None, sys.exc_info()[2]
try:
if not isinstance(e, self.dialect.dbapi.Error):
return
-
+
if context:
context.handle_dbapi_exception(e)
-
+
is_disconnect = self.dialect.is_disconnect(e)
if is_disconnect:
self.invalidate(e)
def run_callable(self, callable_):
return callable_(self)
+
class Transaction(object):
"""Represent a Transaction in progress.
.. index::
single: thread safety; Transaction
-
"""
def __init__(self, connection, parent):
self.connection = connection
self._parent = parent or self
self.is_active = True
-
+
def close(self):
"""Close this transaction.
This is used to cancel a Transaction without affecting the scope of
an enclosing transaction.
"""
+
if not self._parent.is_active:
return
if self._parent is self:
else:
self.rollback()
+
class RootTransaction(Transaction):
def __init__(self, connection):
super(RootTransaction, self).__init__(connection, None)
def _do_commit(self):
self.connection._commit_impl()
+
class NestedTransaction(Transaction):
def __init__(self, connection, parent):
super(NestedTransaction, self).__init__(connection, parent)
def _do_commit(self):
self.connection._release_savepoint_impl(self._savepoint, self._parent)
+
class TwoPhaseTransaction(Transaction):
def __init__(self, connection, xid):
super(TwoPhaseTransaction, self).__init__(connection, None)
def commit(self):
self.connection._commit_twophase_impl(self.xid, self._is_prepared)
+
class Engine(Connectable):
"""
- Connects a :class:`~sqlalchemy.pool.Pool` and :class:`~sqlalchemy.engine.base.Dialect`
+ Connects a :class:`~sqlalchemy.pool.Pool` and :class:`~sqlalchemy.engine.base.Dialect`
together to provide a source of database connectivity and behavior.
"""
@property
def name(self):
"String name of the :class:`~sqlalchemy.engine.Dialect` in use by this ``Engine``."
-
+
return self.dialect.name
@property
def table_names(self, schema=None, connection=None):
"""Return a list of all table names available in the database.
- schema:
- Optional, retrieve names from a non-default schema.
+ :param schema: Optional, retrieve names from a non-default schema.
- connection:
- Optional, use a specified connection. Default is the
- ``contextual_connect`` for this ``Engine``.
+ :param connection: Optional, use a specified connection. Default is the
+ ``contextual_connect`` for this ``Engine``.
"""
if connection is None:
return self.pool.unique_connection()
+
def _proxy_connection_cls(cls, proxy):
class ProxyConnection(cls):
def execute(self, object, *multiparams, **params):
return proxy.execute(self, super(ProxyConnection, self).execute, object, *multiparams, **params)
-
+
def _execute_clauseelement(self, elem, multiparams=None, params=None):
return proxy.execute(self, super(ProxyConnection, self).execute, elem, *(multiparams or []), **(params or {}))
-
+
def _cursor_execute(self, cursor, statement, parameters, context=None):
return proxy.cursor_execute(super(ProxyConnection, self)._cursor_execute, cursor, statement, parameters, context, False)
-
+
def _cursor_executemany(self, cursor, statement, parameters, context=None):
return proxy.cursor_execute(super(ProxyConnection, self)._cursor_executemany, cursor, statement, parameters, context, True)
return ProxyConnection
+
class RowProxy(object):
"""Proxy a single cursor row for a parent ResultProxy.
"""
__slots__ = ['__parent', '__row']
-
+
def __init__(self, parent, row):
"""RowProxy objects are constructed by ResultProxy objects."""
yield self.__parent._get_col(self.__row, i)
__hash__ = None
-
+
def __eq__(self, other):
return ((other is self) or
(other == tuple(self.__parent._get_col(self.__row, key)
"""Return the list of keys as strings represented by this RowProxy."""
return self.__parent.keys
-
+
def iterkeys(self):
return iter(self.__parent.keys)
-
+
def values(self):
"""Return the values represented by this RowProxy as a list."""
return list(self)
-
+
def itervalues(self):
return iter(self)
+
class BufferedColumnRow(RowProxy):
def __init__(self, parent, row):
row = [ResultProxy._get_col(parent, row, i) for i in xrange(len(row))]
self.connection = context.root_connection
self._echo = context.engine._should_log_info
self._init_metadata()
-
+
@property
def rowcount(self):
if self._rowcount is None:
self._rowcount = self.context.get_rowcount()
self.close()
return
-
+
self._rowcount = None
self._props = util.populate_column_dict(None)
self._props.creator = self.__key_fallback()
if self.dialect.description_encoding:
colname = colname.decode(self.dialect.description_encoding)
-
+
if '.' in colname:
# sqlite will in some circumstances prepend table name to colnames, so strip
origname = colname
if self._echo:
self.context.engine.logger.debug(
"Col " + repr(tuple(x[0] for x in metadata)))
-
+
def __key_fallback(self):
# create a closure without 'self' to avoid circular references
props = self._props
-
+
def fallback(key):
if isinstance(key, basestring):
key = key.lower()
def close(self):
"""Close this ResultProxy.
-
+
Closes the underlying DBAPI cursor corresponding to the execution.
If this ResultProxy was generated from an implicit execution,
underlying DBAPI connection to the connection pool.)
This method is called automatically when:
-
- * all result rows are exhausted using the fetchXXX() methods.
- * cursor.description is None.
-
+
+ * all result rows are exhausted using the fetchXXX() methods.
+ * cursor.description is None.
"""
+
if not self.closed:
self.closed = True
self.cursor.close()
"""Return ``last_inserted_ids()`` from the underlying ExecutionContext.
See ExecutionContext for details.
-
"""
+
return self.context.last_inserted_ids()
def last_updated_params(self):
"""Return ``last_updated_params()`` from the underlying ExecutionContext.
See ExecutionContext for details.
-
"""
+
return self.context.last_updated_params()
def last_inserted_params(self):
"""Return ``last_inserted_params()`` from the underlying ExecutionContext.
See ExecutionContext for details.
-
"""
+
return self.context.last_inserted_params()
def lastrow_has_defaults(self):
"""Return ``lastrow_has_defaults()`` from the underlying ExecutionContext.
See ExecutionContext for details.
-
"""
+
return self.context.lastrow_has_defaults()
def postfetch_cols(self):
"""Return ``postfetch_cols()`` from the underlying ExecutionContext.
See ExecutionContext for details.
-
"""
+
return self.context.postfetch_cols
-
+
def prefetch_cols(self):
return self.context.prefetch_cols
-
+
def supports_sane_rowcount(self):
"""Return ``supports_sane_rowcount`` from the dialect."""
-
+
return self.dialect.supports_sane_rowcount
def supports_sane_multi_rowcount(self):
def fetchone(self):
"""Fetch one row, just like DB-API ``cursor.fetchone()``."""
+
try:
row = self._fetchone_impl()
if row is not None:
def scalar(self):
"""Fetch the first column of the first row, and close the result set."""
+
try:
row = self._fetchone_impl()
except Exception, e:
self.connection._handle_dbapi_exception(e, None, None, self.cursor, self.context)
raise
-
+
try:
if row is not None:
return self._process_row(self, row)[0]
finally:
self.close()
+
class BufferedRowResultProxy(ResultProxy):
"""A ResultProxy with row buffering behavior.
The pre-fetching behavior fetches only one row initially, and then
grows its buffer size by a fixed amount with each successive need
for additional rows up to a size of 100.
-
"""
def _init_metadata(self):
def _fetchall_impl(self):
return self.__rowbuffer + list(self.cursor.fetchall())
+
class BufferedColumnResultProxy(ResultProxy):
"""A ResultProxy with column buffering behavior.
of scope unless explicitly fetched. Currently this includes just
cx_Oracle LOB objects, but this behavior is known to exist in
other DB-APIs as well (Pygresql, currently unsupported).
-
"""
_process_row = BufferedColumnRow
l.append(row)
return l
+
class DefaultRunner(schema.SchemaVisitor):
"""A visitor which accepts ColumnDefault objects, produces the
dialect-specific SQL corresponding to their execution, and
DefaultRunners are used internally by Engines and Dialects.
Specific database modules should provide their own subclasses of
DefaultRunner to allow database-specific behavior.
-
"""
def __init__(self, context):
def execute_string(self, stmt, params=None):
"""execute a string statement, using the raw cursor, and return a scalar result."""
-
+
conn = self.context._connection
if isinstance(stmt, unicode) and not self.dialect.supports_unicode_statements:
stmt = stmt.encode(self.dialect.encoding)
Only applicable to functions which take no arguments other than a
connection. The memo will be stored in ``connection.info[key]``.
-
"""
+
@util.decorator
def decorated(fn, self, connection):
connection = connection.connect()