default_strategy = 'plain'
+
def create_engine(*args, **kwargs):
"""Create a new :class:`.Engine` instance.
opened above and beyond the pool_size setting, which defaults
to five. this is only used with :class:`~sqlalchemy.pool.QueuePool`.
- :param module=None: reference to a Python module object (the module itself, not
- its string name). Specifies an alternate DBAPI module to be used
- by the engine's dialect. Each sub-dialect references a specific DBAPI which
- will be imported before first connect. This parameter causes the
- import to be bypassed, and the given module to be used instead.
- Can be used for testing of DBAPIs as well as to inject "mock"
- DBAPI implementations into the :class:`.Engine`.
+ :param module=None: reference to a Python module object (the module
+ itself, not its string name). Specifies an alternate DBAPI module to
+ be used by the engine's dialect. Each sub-dialect references a
+ specific DBAPI which will be imported before first connect. This
+ parameter causes the import to be bypassed, and the given module to
+ be used instead. Can be used for testing of DBAPIs as well as to
+ inject "mock" DBAPI implementations into the :class:`.Engine`.
:param pool=None: an already-constructed instance of
:class:`~sqlalchemy.pool.Pool`, such as a
'create_engine',
'engine_from_config',
)
-
-
from .util import _distill_params
import contextlib
+
class Connection(Connectable):
"""Provides high-level functionality for a wrapped DB-API connection.
result = connection.execution_options(stream_results=True).\\
execute(stmt)
- Note that any key/value can be passed to :meth:`.Connection.execution_options`,
- and it will be stored in the ``_execution_options`` dictionary of
- the :class:`.Connnection`. It is suitable for usage by end-user
- schemes to communicate with event listeners, for example.
+ Note that any key/value can be passed to
+ :meth:`.Connection.execution_options`, and it will be stored in the
+ ``_execution_options`` dictionary of the :class:`.Connnection`. It
+ is suitable for usage by end-user schemes to communicate with
+ event listeners, for example.
The keywords that are currently recognized by SQLAlchemy itself
include all those listed under :meth:`.Executable.execution_options`,
del self.__connection
self.__invalid = True
-
def detach(self):
"""Detach the underlying DB-API connection from its connection pool.
:meth:`.Connection.begin_twophase` - use a two phase /XID transaction
- :meth:`.Engine.begin` - context manager available from :class:`.Engine`.
+ :meth:`.Engine.begin` - context manager available from
+ :class:`.Engine`.
"""
The returned object is an instance of :class:`.TwoPhaseTransaction`,
which in addition to the methods provided by
- :class:`.Transaction`, also provides a :meth:`~.TwoPhaseTransaction.prepare`
- method.
+ :class:`.Transaction`, also provides a
+ :meth:`~.TwoPhaseTransaction.prepare` method.
:param xid: the two phase transaction id. If not supplied, a
random id will be generated.
return self.execute(object, *multiparams, **params).scalar()
def execute(self, object, *multiparams, **params):
- """Executes the a SQL statement construct and returns a :class:`.ResultProxy`.
+ """Executes the a SQL statement construct and returns a
+ :class:`.ResultProxy`.
:param object: The statement to be executed. May be
one of:
"Unexecutable object type: %s" %
type(object))
-
def _execute_function(self, func, multiparams, params):
"""Execute a sql.FunctionElement object."""
dialect=dialect, column_keys=keys,
inline=len(distilled_params) > 1)
-
ret = self._execute_context(
dialect,
dialect.execution_ctx_cls._init_compiled,
context)
raise
-
if self._has_events:
self.dispatch.after_cursor_execute(self, cursor,
statement,
context.handle_dbapi_exception(e)
is_disconnect = isinstance(e, self.dialect.dbapi.Error) and \
- self.dialect.is_disconnect(e, self.__connection, cursor)
-
+ self.dialect.is_disconnect(e, self.__connection, cursor)
if is_disconnect:
dbapi_conn_wrapper = self.connection
basestring: _execute_text
}
-
def default_schema_name(self):
return self.engine.dialect.get_default_schema_name(self)
else:
self.rollback()
+
class RootTransaction(Transaction):
def __init__(self, connection):
super(RootTransaction, self).__init__(connection, None)
shard1 = primary_engine.execution_options(shard_id="shard1")
shard2 = primary_engine.execution_options(shard_id="shard2")
- Above, the ``shard1`` engine serves as a factory for :class:`.Connection`
- objects that will contain the execution option ``shard_id=shard1``,
- and ``shard2`` will produce :class:`.Connection` objects that contain
- the execution option ``shard_id=shard2``.
+ Above, the ``shard1`` engine serves as a factory for
+ :class:`.Connection` objects that will contain the execution option
+ ``shard_id=shard1``, and ``shard2`` will produce :class:`.Connection`
+ objects that contain the execution option ``shard_id=shard2``.
An event handler can consume the above execution option to perform
a schema switch or other operation, given a connection. Below
"""
self.pool = self.pool._replace()
-
def _execute_default(self, default):
with self.contextual_connect() as conn:
return conn._execute_default(default, (), {})
if not self.close_with_result:
self.conn.close()
-
def begin(self, close_with_result=False):
"""Return a context manager delivering a :class:`.Connection`
with a :class:`.Transaction` established.
The ``close_with_result`` flag is normally ``False``, and indicates
that the :class:`.Connection` will be closed when the operation
- is complete. When set to ``True``, it indicates the :class:`.Connection`
- is in "single use" mode, where the :class:`.ResultProxy`
- returned by the first call to :meth:`.Connection.execute` will
- close the :class:`.Connection` when that :class:`.ResultProxy`
- has exhausted all result rows.
+ is complete. When set to ``True``, it indicates the
+ :class:`.Connection` is in "single use" mode, where the
+ :class:`.ResultProxy` returned by the first call to
+ :meth:`.Connection.execute` will close the :class:`.Connection` when
+ that :class:`.ResultProxy` has exhausted all result rows.
.. versionadded:: 0.7.6
def connect(self, **kwargs):
"""Return a new :class:`.Connection` object.
- The :class:`.Connection` object is a facade that uses a DBAPI connection internally
- in order to communicate with the database. This connection is procured
- from the connection-holding :class:`.Pool` referenced by this :class:`.Engine`.
- When the :meth:`~.Connection.close` method of the :class:`.Connection` object is called,
- the underlying DBAPI connection is then returned to the connection pool,
- where it may be used again in a subsequent call to :meth:`~.Engine.connect`.
+ The :class:`.Connection` object is a facade that uses a DBAPI
+ connection internally in order to communicate with the database. This
+ connection is procured from the connection-holding :class:`.Pool`
+ referenced by this :class:`.Engine`. When the
+ :meth:`~.Connection.close` method of the :class:`.Connection` object
+ is called, the underlying DBAPI connection is then returned to the
+ connection pool, where it may be used again in a subsequent call to
+ :meth:`~.Engine.connect`.
"""
return self._connection_cls(self, **kwargs)
def contextual_connect(self, close_with_result=False, **kwargs):
- """Return a :class:`.Connection` object which may be part of some ongoing context.
+ """Return a :class:`.Connection` object which may be part of some
+ ongoing context.
By default, this method does the same thing as :meth:`.Engine.connect`.
Subclasses of :class:`.Engine` may override this method
to provide contextual behavior.
- :param close_with_result: When True, the first :class:`.ResultProxy` created
- by the :class:`.Connection` will call the :meth:`.Connection.close` method
- of that connection as soon as any pending result rows are exhausted.
- This is used to supply the "connectionless execution" behavior provided
- by the :meth:`.Engine.execute` method.
+ :param close_with_result: When True, the first :class:`.ResultProxy`
+ created by the :class:`.Connection` will call the
+ :meth:`.Connection.close` method of that connection as soon as any
+ pending result rows are exhausted. This is used to supply the
+ "connectionless execution" behavior provided by the
+ :meth:`.Engine.execute` method.
"""
return self.pool.unique_connection()
+
class OptionEngine(Engine):
def __init__(self, proxied, execution_options):
self._proxied = proxied
def __init__(self, connection):
self.connection = connection
+
class SchemaGenerator(DDLBase):
- def __init__(self, dialect, connection, checkfirst=False, tables=None, **kwargs):
+
+ def __init__(self, dialect, connection, checkfirst=False,
+ tables=None, **kwargs):
super(SchemaGenerator, self).__init__(connection, **kwargs)
self.checkfirst = checkfirst
self.tables = tables and set(tables) or None
class SchemaDropper(DDLBase):
- def __init__(self, dialect, connection, checkfirst=False, tables=None, **kwargs):
+
+ def __init__(self, dialect, connection, checkfirst=False,
+ tables=None, **kwargs):
super(SchemaDropper, self).__init__(connection, **kwargs)
self.checkfirst = checkfirst
self.tables = tables
tables = self.tables
else:
tables = metadata.tables.values()
- collection = [t for t in reversed(sql_util.sort_tables(tables))
- if self._can_drop_table(t)]
- seq_coll = [s for s in metadata._sequences.values()
- if s.column is None and self._can_drop_sequence(s)]
- metadata.dispatch.before_drop(metadata, self.connection,
- tables=collection,
- checkfirst=self.checkfirst,
- _ddl_runner=self)
+ collection = [
+ t
+ for t in reversed(sql_util.sort_tables(tables))
+ if self._can_drop_table(t)
+ ]
+
+ seq_coll = [
+ s
+ for s in metadata._sequences.values()
+ if s.column is None and self._can_drop_sequence(s)
+ ]
+
+ metadata.dispatch.before_drop(
+ metadata, self.connection, tables=collection,
+ checkfirst=self.checkfirst, _ddl_runner=self)
for table in collection:
self.traverse_single(table, drop_ok=True)
for seq in seq_coll:
self.traverse_single(seq, drop_ok=True)
- metadata.dispatch.after_drop(metadata, self.connection,
- tables=collection,
- checkfirst=self.checkfirst,
- _ddl_runner=self)
+ metadata.dispatch.after_drop(
+ metadata, self.connection, tables=collection,
+ checkfirst=self.checkfirst, _ddl_runner=self)
def _can_drop_table(self, table):
self.dialect.validate_identifier(table.name)
postfetch_lastrowid = True
implicit_returning = False
-
supports_native_enum = False
supports_native_boolean = False
description_encoding = 'use_encoding'
# end Py2K
-
name = 'default'
# length at which to truncate
# the configured default of this dialect.
self.set_isolation_level(dbapi_conn, self.default_isolation_level)
+
class DefaultExecutionContext(interfaces.ExecutionContext):
isinsert = False
isupdate = False
from .. import util, event, events
+
class Dialect(object):
"""Define the behavior of a specific database and DB-API combination.
raise NotImplementedError()
- def do_execute_no_params(self, cursor, statement, parameters, context=None):
+ def do_execute_no_params(self, cursor, statement, parameters,
+ context=None):
"""Provide an implementation of ``cursor.execute(statement)``.
The parameter collection should not be sent.
class Connectable(object):
"""Interface for an object which supports execution of SQL constructs.
- The two implementations of :class:`.Connectable` are :class:`.Connection` and
- :class:`.Engine`.
+ The two implementations of :class:`.Connectable` are
+ :class:`.Connection` and :class:`.Engine`.
Connectable must also implement the 'dialect' member which references a
:class:`.Dialect` instance.
dispatch = event.dispatcher(events.ConnectionEvents)
-
def connect(self, **kwargs):
"""Return a :class:`.Connection` object.
raise NotImplementedError()
- @util.deprecated("0.7", "Use the create() method on the given schema "
- "object directly, i.e. :meth:`.Table.create`, "
- ":meth:`.Index.create`, :meth:`.MetaData.create_all`")
+ @util.deprecated("0.7",
+ "Use the create() method on the given schema "
+ "object directly, i.e. :meth:`.Table.create`, "
+ ":meth:`.Index.create`, :meth:`.MetaData.create_all`")
def create(self, entity, **kwargs):
"""Emit CREATE statements for the given schema entity."""
raise NotImplementedError()
- @util.deprecated("0.7", "Use the drop() method on the given schema "
- "object directly, i.e. :meth:`.Table.drop`, "
- ":meth:`.Index.drop`, :meth:`.MetaData.drop_all`")
+ @util.deprecated("0.7",
+ "Use the drop() method on the given schema "
+ "object directly, i.e. :meth:`.Table.drop`, "
+ ":meth:`.Index.drop`, :meth:`.MetaData.drop_all`")
def drop(self, entity, **kwargs):
"""Emit DROP statements for the given schema entity."""
def _execute_clauseelement(self, elem, multiparams=None, params=None):
raise NotImplementedError()
-
from .. import inspection
from .base import Connectable
+
@util.decorator
def cache(fn, self, con, *args, **kw):
info_cache = kw.get('info_cache', None)
@classmethod
def from_engine(cls, bind):
- """Construct a new dialect-specific Inspector object from the given engine or connection.
+ """Construct a new dialect-specific Inspector object from the given
+ engine or connection.
:param bind: a :class:`~sqlalchemy.engine.base.Connectable`,
which is typically an instance of
:class:`~sqlalchemy.engine.Engine` or
:class:`~sqlalchemy.engine.Connection`.
- This method differs from direct a direct constructor call of :class:`.Inspector`
- in that the :class:`~sqlalchemy.engine.base.Dialect` is given a chance to provide
- a dialect-specific :class:`.Inspector` instance, which may provide additional
- methods.
+ This method differs from direct a direct constructor call of
+ :class:`.Inspector` in that the
+ :class:`~sqlalchemy.engine.base.Dialect` is given a chance to provide
+ a dialect-specific :class:`.Inspector` instance, which may provide
+ additional methods.
See the example at :class:`.Inspector`.
return tnames
def get_table_options(self, table_name, schema=None, **kw):
- """Return a dictionary of options specified when the table of the given name was created.
+ """Return a dictionary of options specified when the table of the
+ given name was created.
This currently includes some options that apply to MySQL tables.
"""
if hasattr(self.dialect, 'get_table_options'):
- return self.dialect.get_table_options(self.bind, table_name, schema,
- info_cache=self.info_cache,
- **kw)
+ return self.dialect.get_table_options(
+ self.bind, table_name, schema,
+ info_cache=self.info_cache, **kw)
return {}
def get_view_names(self, schema=None):
info_cache=self.info_cache,
**kw)
-
def get_foreign_keys(self, table_name, schema=None, **kw):
"""Return information about foreign_keys in `table_name`.
info_cache=self.info_cache, **kw)
def reflecttable(self, table, include_columns, exclude_columns=()):
- """Given a Table object, load its internal constructs based on introspection.
+ """Given a Table object, load its internal constructs based on
+ introspection.
This is the underlying method used by most dialects to produce
table reflection. Direct usage is like::
colargs = []
if col_d.get('default') is not None:
- # the "default" value is assumed to be a literal SQL expression,
- # so is wrapped in text() so that no quoting occurs on re-issuance.
+ # the "default" value is assumed to be a literal SQL
+ # expression, so is wrapped in text() so that no quoting
+ # occurs on re-issuance.
colargs.append(
sa_schema.DefaultClause(
sql.text(col_d['default']), _reflected=True
# Primary keys
pk_cons = self.get_pk_constraint(table_name, schema, **tblkw)
if pk_cons:
- pk_cols = [table.c[pk]
- for pk in pk_cons['constrained_columns']
- if pk in table.c and pk not in exclude_columns
- ] + [pk for pk in table.primary_key if pk.key in exclude_columns]
- primary_key_constraint = sa_schema.PrimaryKeyConstraint(name=pk_cons.get('name'),
+ pk_cols = [
+ table.c[pk]
+ for pk in pk_cons['constrained_columns']
+ if pk in table.c and pk not in exclude_columns
+ ]
+ pk_cols += [
+ pk
+ for pk in table.primary_key
+ if pk.key in exclude_columns
+ ]
+ primary_key_constraint = sa_schema.PrimaryKeyConstraint(
+ name=pk_cons.get('name'),
*pk_cols
)
else:
return None
+
class BufferedRowResultProxy(ResultProxy):
"""A ResultProxy with row buffering behavior.
self.__rowbuffer = collections.deque()
return ret
+
class BufferedColumnRow(RowProxy):
def __init__(self, parent, row, processors, keymap):
# preprocess row
try:
return dialect.connect(*cargs, **cparams)
except Exception, e:
+ invalidated = dialect.is_disconnect(e, None, None)
# Py3K
#raise exc.DBAPIError.instance(None, None,
- # e, dialect.dbapi.Error,
- # connection_invalidated=
- # dialect.is_disconnect(e, None, None)
- # ) from e
+ # e, dialect.dbapi.Error,
+ # connection_invalidated=invalidated
+ #) from e
# Py2K
import sys
raise exc.DBAPIError.instance(
- None, None, e, dialect.dbapi.Error,
- connection_invalidated=
- dialect.is_disconnect(e, None, None)), \
- None, sys.exc_info()[2]
+ None, None, e, dialect.dbapi.Error,
+ connection_invalidated=invalidated
+ ), None, sys.exc_info()[2]
# end Py2K
creator = kwargs.pop('creator', connect)
'echo': 'echo_pool',
'timeout': 'pool_timeout',
'recycle': 'pool_recycle',
- 'events':'pool_events',
- 'use_threadlocal':'pool_threadlocal',
- 'reset_on_return':'pool_reset_on_return'}
+ 'events': 'pool_events',
+ 'use_threadlocal': 'pool_threadlocal',
+ 'reset_on_return': 'pool_reset_on_return'}
for k in util.get_cls_kwargs(poolclass):
tk = translate.get(k, k)
if tk in kwargs:
do_on_connect = dialect.on_connect()
if do_on_connect:
def on_connect(dbapi_connection, connection_record):
- conn = getattr(dbapi_connection, '_sqla_unwrap', dbapi_connection)
+ conn = getattr(
+ dbapi_connection, '_sqla_unwrap', dbapi_connection)
if conn is None:
return
do_on_connect(conn)
kwargs['checkfirst'] = False
from sqlalchemy.engine import ddl
- ddl.SchemaGenerator(self.dialect, self, **kwargs).traverse_single(entity)
+ ddl.SchemaGenerator(
+ self.dialect, self, **kwargs).traverse_single(entity)
def drop(self, entity, **kwargs):
kwargs['checkfirst'] = False
from sqlalchemy.engine import ddl
- ddl.SchemaDropper(self.dialect, self, **kwargs).traverse_single(entity)
+ ddl.SchemaDropper(
+ self.dialect, self, **kwargs).traverse_single(entity)
def _run_visitor(self, visitorcallable, element,
connection=None,
"""Provides a thread-local transactional wrapper around the root Engine class.
-The ``threadlocal`` module is invoked when using the ``strategy="threadlocal"`` flag
-with :func:`~sqlalchemy.engine.create_engine`. This module is semi-private and is
-invoked automatically when the threadlocal engine strategy is used.
+The ``threadlocal`` module is invoked when using the
+``strategy="threadlocal"`` flag with :func:`~sqlalchemy.engine.create_engine`.
+This module is semi-private and is invoked automatically when the threadlocal
+engine strategy is used.
"""
-from .. import util, event
+from .. import util
from . import base
import weakref
+
class TLConnection(base.Connection):
+
def __init__(self, *arg, **kw):
super(TLConnection, self).__init__(*arg, **kw)
self.__opencount = 0
self.__opencount = 0
base.Connection.close(self)
+
class TLEngine(base.Engine):
- """An Engine that includes support for thread-local managed transactions."""
+ """An Engine that includes support for thread-local managed
+ transactions.
+ """
_tl_connection_cls = TLConnection
def __init__(self, *args, **kwargs):
super(TLEngine, self).__init__(*args, **kwargs)
self._connections = util.threading.local()
-
def contextual_connect(self, **kw):
if not hasattr(self._connections, 'conn'):
connection = None
if connection is None or connection.closed:
# guards against pool-level reapers, if desired.
# or not connection.connection.is_valid:
- connection = self._tl_connection_cls(self, self.pool.connect(), **kw)
- self._connections.conn = conn = weakref.ref(connection)
+ connection = self._tl_connection_cls(
+ self, self.pool.connect(), **kw)
+ self._connections.conn = weakref.ref(connection)
return connection._increment_connect()
def begin_twophase(self, xid=None):
if not hasattr(self._connections, 'trans'):
self._connections.trans = []
- self._connections.trans.append(self.contextual_connect().begin_twophase(xid=xid))
+ self._connections.trans.append(
+ self.contextual_connect().begin_twophase(xid=xid))
return self
def begin_nested(self):
if not hasattr(self._connections, 'trans'):
self._connections.trans = []
- self._connections.trans.append(self.contextual_connect().begin_nested())
+ self._connections.trans.append(
+ self.contextual_connect().begin_nested())
return self
def begin(self):
"""Provides the :class:`~sqlalchemy.engine.url.URL` class which encapsulates
information about a database connection specification.
-The URL object is created automatically when :func:`~sqlalchemy.engine.create_engine` is called
-with a string argument; alternatively, the URL is a public-facing construct which can
+The URL object is created automatically when
+:func:`~sqlalchemy.engine.create_engine` is called with a string
+argument; alternatively, the URL is a public-facing construct which can
be used directly and is also accepted directly by ``create_engine()``.
"""
:param \**kw: Optional, alternate key names for url attributes.
- :param names: Deprecated. Same purpose as the keyword-based alternate names,
- but correlates the name to the original positionally.
+ :param names: Deprecated. Same purpose as the keyword-based alternate
+ names, but correlates the name to the original positionally.
"""
translated = {}
translated[name] = getattr(self, sname)
return translated
+
def make_url(name_or_url):
"""Given a string or unicode instance, produce a new URL instance.
else:
return name_or_url
+
def _parse_rfc1738_args(name):
pattern = re.compile(r'''
(?P<name>[\w\+]+)://
(?::(?P<port>[^/]*))?
)?
(?:/(?P<database>.*))?
- '''
- , re.X)
+ ''', re.X)
m = pattern.match(name)
if m is not None:
components['query'] = query
if components['password'] is not None:
- components['password'] = urllib.unquote_plus(components['password'])
+ components['password'] = \
+ urllib.unquote_plus(components['password'])
name = components.pop('name')
return URL(name, **components)
raise exc.ArgumentError(
"Could not parse rfc1738 URL from string '%s'" % name)
+
def _parse_keyvalue_args(name):
- m = re.match( r'(\w+)://(.*)', name)
+ m = re.match(r'(\w+)://(.*)', name)
if m is not None:
(name, args) = m.group(1, 2)
- opts = dict( util.parse_qsl( args ) )
+ opts = dict(util.parse_qsl(args))
return URL(name, *opts)
else:
return None
from .. import util
+
def _coerce_config(configuration, prefix):
"""Convert configuration values to expected types."""
util.coerce_kw_type(options, option, type_)
return options
+
def connection_memoize(key):
"""Decorator, memoize a function in a connection.info stash.
return decorated
+
def py_fallback():
def _distill_params(multiparams, params):
"""Given arguments from the calling form *multiparams, **params,
from sqlalchemy.cutils import _distill_params
except ImportError:
globals().update(py_fallback())
-
-