.. autoclass:: _ConnectionRecord
:members:
-
-Pooling Plain DB-API Connections
---------------------------------
-
-Any :pep:`249` DB-API module can be "proxied" through the connection
-pool transparently. Usage of the DB-API is exactly as before, except
-the ``connect()`` method will consult the pool. Below we illustrate
-this with ``psycopg2``::
-
- import sqlalchemy.pool as pool
- import psycopg2 as psycopg
-
- psycopg = pool.manage(psycopg)
-
- # then connect normally
- connection = psycopg.connect(database='test', username='scott',
- password='tiger')
-
-This produces a :class:`_DBProxy` object which supports the same
-``connect()`` function as the original DB-API module. Upon
-connection, a connection proxy object is returned, which delegates its
-calls to a real DB-API connection object. This connection object is
-stored persistently within a connection pool (an instance of
-:class:`.Pool`) that corresponds to the exact connection arguments sent
-to the ``connect()`` function.
-
-The connection proxy supports all of the methods on the original
-connection object, most of which are proxied via ``__getattr__()``.
-The ``close()`` method will return the connection to the pool, and the
-``cursor()`` method will return a proxied cursor object. Both the
-connection proxy and the cursor proxy will also return the underlying
-connection to the pool after they have both been garbage collected,
-which is detected via weakref callbacks (``__del__`` is not used).
-
-Additionally, when connections are returned to the pool, a
-``rollback()`` is issued on the connection unconditionally. This is
-to release any locks still held by the connection that may have
-resulted from normal activity.
-
-By default, the ``connect()`` method will return the same connection
-that is already checked out in the current thread. This allows a
-particular connection to be used in a given thread without needing to
-pass it around between functions. To disable this behavior, specify
-``use_threadlocal=False`` to the ``manage()`` function.
-
-.. autofunction:: sqlalchemy.pool.manage
-
-.. autofunction:: sqlalchemy.pool.clear_managers
-
pool = poolclass(creator, **pool_args)
else:
- if isinstance(pool, poollib._DBProxy):
+ if isinstance(pool, poollib.dbapi_proxy._DBProxy):
pool = pool.get_pool(*cargs, **cparams)
else:
pool = pool
--- /dev/null
+# sqlalchemy/pool/__init__.py
+# Copyright (C) 2005-2018 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+
+
+"""Connection pooling for DB-API connections.
+
+Provides a number of connection pool implementations for a variety of
+usage scenarios and thread behavior requirements imposed by the
+application, DB-API or database itself.
+
+Also provides a DB-API 2.0 connection proxying mechanism allowing
+regular DB-API connect() methods to be transparently managed by a
+SQLAlchemy connection pool.
+"""
+
+from .base import _refs # noqa
+from .base import Pool # noqa
+from .impl import ( # noqa
+ QueuePool, StaticPool, NullPool, AssertionPool, SingletonThreadPool)
+from .dbapi_proxy import manage, clear_managers # noqa
+
+from .base import reset_rollback, reset_commit, reset_none # noqa
+
+# as these are likely to be used in various test suites, debugging
+# setups, keep them in the sqlalchemy.pool namespace
+from .base import _ConnectionFairy, _ConnectionRecord, _finalize_fairy # noqa
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""Connection pooling for DB-API connections.
+"""Base constructs for connection pools.
-Provides a number of connection pool implementations for a variety of
-usage scenarios and thread behavior requirements imposed by the
-application, DB-API or database itself.
-
-Also provides a DB-API 2.0 connection proxying mechanism allowing
-regular DB-API connect() methods to be transparently managed by a
-SQLAlchemy connection pool.
"""
+from collections import deque
import time
-import traceback
import weakref
-from . import exc, log, event, interfaces, util
-from .util import queue as sqla_queue
-from .util import threading, memoized_property, \
- chop_traceback
-
-from collections import deque
-proxies = {}
-
-
-def manage(module, **params):
- r"""Return a proxy for a DB-API module that automatically
- pools connections.
-
- Given a DB-API 2.0 module and pool management parameters, returns
- a proxy for the module that will automatically pool connections,
- creating new connection pools for each distinct set of connection
- arguments sent to the decorated module's connect() function.
-
- :param module: a DB-API 2.0 database module
-
- :param poolclass: the class used by the pool module to provide
- pooling. Defaults to :class:`.QueuePool`.
-
- :param \**params: will be passed through to *poolclass*
-
- """
- try:
- return proxies[module]
- except KeyError:
- return proxies.setdefault(module, _DBProxy(module, **params))
+from .. import exc, log, event, interfaces, util
+from ..util import threading
-def clear_managers():
- """Remove all current DB-API 2.0 managers.
-
- All pools and connections are disposed.
- """
-
- for manager in proxies.values():
- manager.close()
- proxies.clear()
-
reset_rollback = util.symbol('reset_rollback')
reset_commit = util.symbol('reset_commit')
reset_none = util.symbol('reset_none')
self._checkin()
-class SingletonThreadPool(Pool):
-
- """A Pool that maintains one connection per thread.
-
- Maintains one connection per each thread, never moving a connection to a
- thread other than the one which it was created in.
-
- .. warning:: the :class:`.SingletonThreadPool` will call ``.close()``
- on arbitrary connections that exist beyond the size setting of
- ``pool_size``, e.g. if more unique **thread identities**
- than what ``pool_size`` states are used. This cleanup is
- non-deterministic and not sensitive to whether or not the connections
- linked to those thread identities are currently in use.
-
- :class:`.SingletonThreadPool` may be improved in a future release,
- however in its current status it is generally used only for test
- scenarios using a SQLite ``:memory:`` database and is not recommended
- for production use.
-
-
- Options are the same as those of :class:`.Pool`, as well as:
-
- :param pool_size: The number of threads in which to maintain connections
- at once. Defaults to five.
-
- :class:`.SingletonThreadPool` is used by the SQLite dialect
- automatically when a memory-based database is used.
- See :ref:`sqlite_toplevel`.
-
- """
-
- def __init__(self, creator, pool_size=5, **kw):
- kw['use_threadlocal'] = True
- Pool.__init__(self, creator, **kw)
- self._conn = threading.local()
- self._all_conns = set()
- self.size = pool_size
-
- def recreate(self):
- self.logger.info("Pool recreating")
- return self.__class__(self._creator,
- pool_size=self.size,
- recycle=self._recycle,
- echo=self.echo,
- logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
- reset_on_return=self._reset_on_return,
- _dispatch=self.dispatch,
- dialect=self._dialect)
-
- def dispose(self):
- """Dispose of this pool."""
-
- for conn in self._all_conns:
- try:
- conn.close()
- except Exception:
- # pysqlite won't even let you close a conn from a thread
- # that didn't create it
- pass
-
- self._all_conns.clear()
-
- def _cleanup(self):
- while len(self._all_conns) >= self.size:
- c = self._all_conns.pop()
- c.close()
-
- def status(self):
- return "SingletonThreadPool id:%d size: %d" % \
- (id(self), len(self._all_conns))
-
- def _do_return_conn(self, conn):
- pass
-
- def _do_get(self):
- try:
- c = self._conn.current()
- if c:
- return c
- except AttributeError:
- pass
- c = self._create_connection()
- self._conn.current = weakref.ref(c)
- if len(self._all_conns) >= self.size:
- self._cleanup()
- self._all_conns.add(c)
- return c
-
-
-class QueuePool(Pool):
-
- """A :class:`.Pool` that imposes a limit on the number of open connections.
-
- :class:`.QueuePool` is the default pooling implementation used for
- all :class:`.Engine` objects, unless the SQLite dialect is in use.
-
- """
-
- def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30,
- **kw):
- r"""
- Construct a QueuePool.
-
- :param creator: a callable function that returns a DB-API
- connection object, same as that of :paramref:`.Pool.creator`.
-
- :param pool_size: The size of the pool to be maintained,
- defaults to 5. This is the largest number of connections that
- will be kept persistently in the pool. Note that the pool
- begins with no connections; once this number of connections
- is requested, that number of connections will remain.
- ``pool_size`` can be set to 0 to indicate no size limit; to
- disable pooling, use a :class:`~sqlalchemy.pool.NullPool`
- instead.
-
- :param max_overflow: The maximum overflow size of the
- pool. When the number of checked-out connections reaches the
- size set in pool_size, additional connections will be
- returned up to this limit. When those additional connections
- are returned to the pool, they are disconnected and
- discarded. It follows then that the total number of
- simultaneous connections the pool will allow is pool_size +
- `max_overflow`, and the total number of "sleeping"
- connections the pool will allow is pool_size. `max_overflow`
- can be set to -1 to indicate no overflow limit; no limit
- will be placed on the total number of concurrent
- connections. Defaults to 10.
-
- :param timeout: The number of seconds to wait before giving up
- on returning a connection. Defaults to 30.
-
- :param \**kw: Other keyword arguments including
- :paramref:`.Pool.recycle`, :paramref:`.Pool.echo`,
- :paramref:`.Pool.reset_on_return` and others are passed to the
- :class:`.Pool` constructor.
-
- """
- Pool.__init__(self, creator, **kw)
- self._pool = sqla_queue.Queue(pool_size)
- self._overflow = 0 - pool_size
- self._max_overflow = max_overflow
- self._timeout = timeout
- self._overflow_lock = threading.Lock()
-
- def _do_return_conn(self, conn):
- try:
- self._pool.put(conn, False)
- except sqla_queue.Full:
- try:
- conn.close()
- finally:
- self._dec_overflow()
-
- def _do_get(self):
- use_overflow = self._max_overflow > -1
-
- try:
- wait = use_overflow and self._overflow >= self._max_overflow
- return self._pool.get(wait, self._timeout)
- except sqla_queue.Empty:
- # don't do things inside of "except Empty", because when we say
- # we timed out or can't connect and raise, Python 3 tells
- # people the real error is queue.Empty which it isn't.
- pass
- if use_overflow and self._overflow >= self._max_overflow:
- if not wait:
- return self._do_get()
- else:
- raise exc.TimeoutError(
- "QueuePool limit of size %d overflow %d reached, "
- "connection timed out, timeout %d" %
- (self.size(), self.overflow(), self._timeout), code="3o7r")
-
- if self._inc_overflow():
- try:
- return self._create_connection()
- except:
- with util.safe_reraise():
- self._dec_overflow()
- else:
- return self._do_get()
-
- def _inc_overflow(self):
- if self._max_overflow == -1:
- self._overflow += 1
- return True
- with self._overflow_lock:
- if self._overflow < self._max_overflow:
- self._overflow += 1
- return True
- else:
- return False
-
- def _dec_overflow(self):
- if self._max_overflow == -1:
- self._overflow -= 1
- return True
- with self._overflow_lock:
- self._overflow -= 1
- return True
-
- def recreate(self):
- self.logger.info("Pool recreating")
- return self.__class__(self._creator, pool_size=self._pool.maxsize,
- max_overflow=self._max_overflow,
- timeout=self._timeout,
- recycle=self._recycle, echo=self.echo,
- logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
- reset_on_return=self._reset_on_return,
- _dispatch=self.dispatch,
- dialect=self._dialect)
-
- def dispose(self):
- while True:
- try:
- conn = self._pool.get(False)
- conn.close()
- except sqla_queue.Empty:
- break
-
- self._overflow = 0 - self.size()
- self.logger.info("Pool disposed. %s", self.status())
-
- def status(self):
- return "Pool size: %d Connections in pool: %d "\
- "Current Overflow: %d Current Checked out "\
- "connections: %d" % (self.size(),
- self.checkedin(),
- self.overflow(),
- self.checkedout())
-
- def size(self):
- return self._pool.maxsize
-
- def checkedin(self):
- return self._pool.qsize()
-
- def overflow(self):
- return self._overflow
-
- def checkedout(self):
- return self._pool.maxsize - self._pool.qsize() + self._overflow
-
-
-class NullPool(Pool):
-
- """A Pool which does not pool connections.
-
- Instead it literally opens and closes the underlying DB-API connection
- per each connection open/close.
-
- Reconnect-related functions such as ``recycle`` and connection
- invalidation are not supported by this Pool implementation, since
- no connections are held persistently.
-
- .. versionchanged:: 0.7
- :class:`.NullPool` is used by the SQlite dialect automatically
- when a file-based database is used. See :ref:`sqlite_toplevel`.
-
- """
-
- def status(self):
- return "NullPool"
-
- def _do_return_conn(self, conn):
- conn.close()
-
- def _do_get(self):
- return self._create_connection()
-
- def recreate(self):
- self.logger.info("Pool recreating")
-
- return self.__class__(self._creator,
- recycle=self._recycle,
- echo=self.echo,
- logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
- reset_on_return=self._reset_on_return,
- _dispatch=self.dispatch,
- dialect=self._dialect)
-
- def dispose(self):
- pass
-
-
-class StaticPool(Pool):
-
- """A Pool of exactly one connection, used for all requests.
-
- Reconnect-related functions such as ``recycle`` and connection
- invalidation (which is also used to support auto-reconnect) are not
- currently supported by this Pool implementation but may be implemented
- in a future release.
-
- """
-
- @memoized_property
- def _conn(self):
- return self._creator()
-
- @memoized_property
- def connection(self):
- return _ConnectionRecord(self)
-
- def status(self):
- return "StaticPool"
-
- def dispose(self):
- if '_conn' in self.__dict__:
- self._conn.close()
- self._conn = None
-
- def recreate(self):
- self.logger.info("Pool recreating")
- return self.__class__(creator=self._creator,
- recycle=self._recycle,
- use_threadlocal=self._use_threadlocal,
- reset_on_return=self._reset_on_return,
- echo=self.echo,
- logging_name=self._orig_logging_name,
- _dispatch=self.dispatch,
- dialect=self._dialect)
-
- def _create_connection(self):
- return self._conn
-
- def _do_return_conn(self, conn):
- pass
-
- def _do_get(self):
- return self.connection
-
-
-class AssertionPool(Pool):
-
- """A :class:`.Pool` that allows at most one checked out connection at
- any given time.
-
- This will raise an exception if more than one connection is checked out
- at a time. Useful for debugging code that is using more connections
- than desired.
-
- .. versionchanged:: 0.7
- :class:`.AssertionPool` also logs a traceback of where
- the original connection was checked out, and reports
- this in the assertion error raised.
-
- """
-
- def __init__(self, *args, **kw):
- self._conn = None
- self._checked_out = False
- self._store_traceback = kw.pop('store_traceback', True)
- self._checkout_traceback = None
- Pool.__init__(self, *args, **kw)
-
- def status(self):
- return "AssertionPool"
-
- def _do_return_conn(self, conn):
- if not self._checked_out:
- raise AssertionError("connection is not checked out")
- self._checked_out = False
- assert conn is self._conn
-
- def dispose(self):
- self._checked_out = False
- if self._conn:
- self._conn.close()
-
- def recreate(self):
- self.logger.info("Pool recreating")
- return self.__class__(self._creator, echo=self.echo,
- logging_name=self._orig_logging_name,
- _dispatch=self.dispatch,
- dialect=self._dialect)
-
- def _do_get(self):
- if self._checked_out:
- if self._checkout_traceback:
- suffix = ' at:\n%s' % ''.join(
- chop_traceback(self._checkout_traceback))
- else:
- suffix = ''
- raise AssertionError("connection is already checked out" + suffix)
-
- if not self._conn:
- self._conn = self._create_connection()
-
- self._checked_out = True
- if self._store_traceback:
- self._checkout_traceback = traceback.format_stack()
- return self._conn
-
-
-class _DBProxy(object):
-
- """Layers connection pooling behavior on top of a standard DB-API module.
-
- Proxies a DB-API 2.0 connect() call to a connection pool keyed to the
- specific connect parameters. Other functions and attributes are delegated
- to the underlying DB-API module.
- """
-
- def __init__(self, module, poolclass=QueuePool, **kw):
- """Initializes a new proxy.
-
- module
- a DB-API 2.0 module
-
- poolclass
- a Pool class, defaulting to QueuePool
-
- Other parameters are sent to the Pool object's constructor.
-
- """
-
- self.module = module
- self.kw = kw
- self.poolclass = poolclass
- self.pools = {}
- self._create_pool_mutex = threading.Lock()
-
- def close(self):
- for key in list(self.pools):
- del self.pools[key]
-
- def __del__(self):
- self.close()
-
- def __getattr__(self, key):
- return getattr(self.module, key)
-
- def get_pool(self, *args, **kw):
- key = self._serialize(*args, **kw)
- try:
- return self.pools[key]
- except KeyError:
- self._create_pool_mutex.acquire()
- try:
- if key not in self.pools:
- kw.pop('sa_pool_key', None)
- pool = self.poolclass(
- lambda: self.module.connect(*args, **kw), **self.kw)
- self.pools[key] = pool
- return pool
- else:
- return self.pools[key]
- finally:
- self._create_pool_mutex.release()
-
- def connect(self, *args, **kw):
- """Activate a connection to the database.
-
- Connect to the database using this DBProxy's module and the given
- connect arguments. If the arguments match an existing pool, the
- connection will be returned from the pool's current thread-local
- connection instance, or if there is no thread-local connection
- instance it will be checked out from the set of pooled connections.
-
- If the pool has no available connections and allows new connections
- to be created, a new database connection will be made.
-
- """
-
- return self.get_pool(*args, **kw).connect()
-
- def dispose(self, *args, **kw):
- """Dispose the pool referenced by the given connect arguments."""
-
- key = self._serialize(*args, **kw)
- try:
- del self.pools[key]
- except KeyError:
- pass
-
- def _serialize(self, *args, **kw):
- if "sa_pool_key" in kw:
- return kw['sa_pool_key']
-
- return tuple(
- list(args) +
- [(k, kw[k]) for k in sorted(kw)]
- )
--- /dev/null
+# sqlalchemy/pool/dbapi_proxy.py
+# Copyright (C) 2005-2018 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+
+
+"""DBAPI proxy utility.
+
+Provides transparent connection pooling on top of a Python DBAPI.
+
+This is legacy SQLAlchemy functionality that is not typically used
+today.
+
+"""
+
+from .impl import QueuePool
+from ..util import threading
+
+proxies = {}
+
+
+def manage(module, **params):
+ r"""Return a proxy for a DB-API module that automatically
+ pools connections.
+
+ Given a DB-API 2.0 module and pool management parameters, returns
+ a proxy for the module that will automatically pool connections,
+ creating new connection pools for each distinct set of connection
+ arguments sent to the decorated module's connect() function.
+
+ :param module: a DB-API 2.0 database module
+
+ :param poolclass: the class used by the pool module to provide
+ pooling. Defaults to :class:`.QueuePool`.
+
+ :param \**params: will be passed through to *poolclass*
+
+ """
+ try:
+ return proxies[module]
+ except KeyError:
+ return proxies.setdefault(module, _DBProxy(module, **params))
+
+
+def clear_managers():
+ """Remove all current DB-API 2.0 managers.
+
+ All pools and connections are disposed.
+ """
+
+ for manager in proxies.values():
+ manager.close()
+ proxies.clear()
+
+
+class _DBProxy(object):
+
+ """Layers connection pooling behavior on top of a standard DB-API module.
+
+ Proxies a DB-API 2.0 connect() call to a connection pool keyed to the
+ specific connect parameters. Other functions and attributes are delegated
+ to the underlying DB-API module.
+ """
+
+ def __init__(self, module, poolclass=QueuePool, **kw):
+ """Initializes a new proxy.
+
+ module
+ a DB-API 2.0 module
+
+ poolclass
+ a Pool class, defaulting to QueuePool
+
+ Other parameters are sent to the Pool object's constructor.
+
+ """
+
+ self.module = module
+ self.kw = kw
+ self.poolclass = poolclass
+ self.pools = {}
+ self._create_pool_mutex = threading.Lock()
+
+ def close(self):
+ for key in list(self.pools):
+ del self.pools[key]
+
+ def __del__(self):
+ self.close()
+
+ def __getattr__(self, key):
+ return getattr(self.module, key)
+
+ def get_pool(self, *args, **kw):
+ key = self._serialize(*args, **kw)
+ try:
+ return self.pools[key]
+ except KeyError:
+ self._create_pool_mutex.acquire()
+ try:
+ if key not in self.pools:
+ kw.pop('sa_pool_key', None)
+ pool = self.poolclass(
+ lambda: self.module.connect(*args, **kw), **self.kw)
+ self.pools[key] = pool
+ return pool
+ else:
+ return self.pools[key]
+ finally:
+ self._create_pool_mutex.release()
+
+ def connect(self, *args, **kw):
+ """Activate a connection to the database.
+
+ Connect to the database using this DBProxy's module and the given
+ connect arguments. If the arguments match an existing pool, the
+ connection will be returned from the pool's current thread-local
+ connection instance, or if there is no thread-local connection
+ instance it will be checked out from the set of pooled connections.
+
+ If the pool has no available connections and allows new connections
+ to be created, a new database connection will be made.
+
+ """
+
+ return self.get_pool(*args, **kw).connect()
+
+ def dispose(self, *args, **kw):
+ """Dispose the pool referenced by the given connect arguments."""
+
+ key = self._serialize(*args, **kw)
+ try:
+ del self.pools[key]
+ except KeyError:
+ pass
+
+ def _serialize(self, *args, **kw):
+ if "sa_pool_key" in kw:
+ return kw['sa_pool_key']
+
+ return tuple(
+ list(args) +
+ [(k, kw[k]) for k in sorted(kw)]
+ )
--- /dev/null
+# sqlalchemy/pool.py
+# Copyright (C) 2005-2018 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+
+
+"""Pool implementation classes.
+
+"""
+
+import traceback
+import weakref
+
+from .base import Pool, _ConnectionRecord
+from .. import exc
+from .. import util
+from ..util import queue as sqla_queue
+from ..util import chop_traceback
+from ..util import threading
+
+
+class QueuePool(Pool):
+
+ """A :class:`.Pool` that imposes a limit on the number of open connections.
+
+ :class:`.QueuePool` is the default pooling implementation used for
+ all :class:`.Engine` objects, unless the SQLite dialect is in use.
+
+ """
+
+ def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30,
+ **kw):
+ r"""
+ Construct a QueuePool.
+
+ :param creator: a callable function that returns a DB-API
+ connection object, same as that of :paramref:`.Pool.creator`.
+
+ :param pool_size: The size of the pool to be maintained,
+ defaults to 5. This is the largest number of connections that
+ will be kept persistently in the pool. Note that the pool
+ begins with no connections; once this number of connections
+ is requested, that number of connections will remain.
+ ``pool_size`` can be set to 0 to indicate no size limit; to
+ disable pooling, use a :class:`~sqlalchemy.pool.NullPool`
+ instead.
+
+ :param max_overflow: The maximum overflow size of the
+ pool. When the number of checked-out connections reaches the
+ size set in pool_size, additional connections will be
+ returned up to this limit. When those additional connections
+ are returned to the pool, they are disconnected and
+ discarded. It follows then that the total number of
+ simultaneous connections the pool will allow is pool_size +
+ `max_overflow`, and the total number of "sleeping"
+ connections the pool will allow is pool_size. `max_overflow`
+ can be set to -1 to indicate no overflow limit; no limit
+ will be placed on the total number of concurrent
+ connections. Defaults to 10.
+
+ :param timeout: The number of seconds to wait before giving up
+ on returning a connection. Defaults to 30.
+
+ :param \**kw: Other keyword arguments including
+ :paramref:`.Pool.recycle`, :paramref:`.Pool.echo`,
+ :paramref:`.Pool.reset_on_return` and others are passed to the
+ :class:`.Pool` constructor.
+
+ """
+ Pool.__init__(self, creator, **kw)
+ self._pool = sqla_queue.Queue(pool_size)
+ self._overflow = 0 - pool_size
+ self._max_overflow = max_overflow
+ self._timeout = timeout
+ self._overflow_lock = threading.Lock()
+
+ def _do_return_conn(self, conn):
+ try:
+ self._pool.put(conn, False)
+ except sqla_queue.Full:
+ try:
+ conn.close()
+ finally:
+ self._dec_overflow()
+
+ def _do_get(self):
+ use_overflow = self._max_overflow > -1
+
+ try:
+ wait = use_overflow and self._overflow >= self._max_overflow
+ return self._pool.get(wait, self._timeout)
+ except sqla_queue.Empty:
+ # don't do things inside of "except Empty", because when we say
+ # we timed out or can't connect and raise, Python 3 tells
+ # people the real error is queue.Empty which it isn't.
+ pass
+ if use_overflow and self._overflow >= self._max_overflow:
+ if not wait:
+ return self._do_get()
+ else:
+ raise exc.TimeoutError(
+ "QueuePool limit of size %d overflow %d reached, "
+ "connection timed out, timeout %d" %
+ (self.size(), self.overflow(), self._timeout), code="3o7r")
+
+ if self._inc_overflow():
+ try:
+ return self._create_connection()
+ except:
+ with util.safe_reraise():
+ self._dec_overflow()
+ else:
+ return self._do_get()
+
+ def _inc_overflow(self):
+ if self._max_overflow == -1:
+ self._overflow += 1
+ return True
+ with self._overflow_lock:
+ if self._overflow < self._max_overflow:
+ self._overflow += 1
+ return True
+ else:
+ return False
+
+ def _dec_overflow(self):
+ if self._max_overflow == -1:
+ self._overflow -= 1
+ return True
+ with self._overflow_lock:
+ self._overflow -= 1
+ return True
+
+ def recreate(self):
+ self.logger.info("Pool recreating")
+ return self.__class__(self._creator, pool_size=self._pool.maxsize,
+ max_overflow=self._max_overflow,
+ timeout=self._timeout,
+ recycle=self._recycle, echo=self.echo,
+ logging_name=self._orig_logging_name,
+ use_threadlocal=self._use_threadlocal,
+ reset_on_return=self._reset_on_return,
+ _dispatch=self.dispatch,
+ dialect=self._dialect)
+
+ def dispose(self):
+ while True:
+ try:
+ conn = self._pool.get(False)
+ conn.close()
+ except sqla_queue.Empty:
+ break
+
+ self._overflow = 0 - self.size()
+ self.logger.info("Pool disposed. %s", self.status())
+
+ def status(self):
+ return "Pool size: %d Connections in pool: %d "\
+ "Current Overflow: %d Current Checked out "\
+ "connections: %d" % (self.size(),
+ self.checkedin(),
+ self.overflow(),
+ self.checkedout())
+
+ def size(self):
+ return self._pool.maxsize
+
+ def checkedin(self):
+ return self._pool.qsize()
+
+ def overflow(self):
+ return self._overflow
+
+ def checkedout(self):
+ return self._pool.maxsize - self._pool.qsize() + self._overflow
+
+
+class NullPool(Pool):
+
+ """A Pool which does not pool connections.
+
+ Instead it literally opens and closes the underlying DB-API connection
+ per each connection open/close.
+
+ Reconnect-related functions such as ``recycle`` and connection
+ invalidation are not supported by this Pool implementation, since
+ no connections are held persistently.
+
+ .. versionchanged:: 0.7
+ :class:`.NullPool` is used by the SQlite dialect automatically
+ when a file-based database is used. See :ref:`sqlite_toplevel`.
+
+ """
+
+ def status(self):
+ return "NullPool"
+
+ def _do_return_conn(self, conn):
+ conn.close()
+
+ def _do_get(self):
+ return self._create_connection()
+
+ def recreate(self):
+ self.logger.info("Pool recreating")
+
+ return self.__class__(self._creator,
+ recycle=self._recycle,
+ echo=self.echo,
+ logging_name=self._orig_logging_name,
+ use_threadlocal=self._use_threadlocal,
+ reset_on_return=self._reset_on_return,
+ _dispatch=self.dispatch,
+ dialect=self._dialect)
+
+ def dispose(self):
+ pass
+
+
+class SingletonThreadPool(Pool):
+
+ """A Pool that maintains one connection per thread.
+
+ Maintains one connection per each thread, never moving a connection to a
+ thread other than the one which it was created in.
+
+ .. warning:: the :class:`.SingletonThreadPool` will call ``.close()``
+ on arbitrary connections that exist beyond the size setting of
+ ``pool_size``, e.g. if more unique **thread identities**
+ than what ``pool_size`` states are used. This cleanup is
+ non-deterministic and not sensitive to whether or not the connections
+ linked to those thread identities are currently in use.
+
+ :class:`.SingletonThreadPool` may be improved in a future release,
+ however in its current status it is generally used only for test
+ scenarios using a SQLite ``:memory:`` database and is not recommended
+ for production use.
+
+
+ Options are the same as those of :class:`.Pool`, as well as:
+
+ :param pool_size: The number of threads in which to maintain connections
+ at once. Defaults to five.
+
+ :class:`.SingletonThreadPool` is used by the SQLite dialect
+ automatically when a memory-based database is used.
+ See :ref:`sqlite_toplevel`.
+
+ """
+
+ def __init__(self, creator, pool_size=5, **kw):
+ kw['use_threadlocal'] = True
+ Pool.__init__(self, creator, **kw)
+ self._conn = threading.local()
+ self._all_conns = set()
+ self.size = pool_size
+
+ def recreate(self):
+ self.logger.info("Pool recreating")
+ return self.__class__(self._creator,
+ pool_size=self.size,
+ recycle=self._recycle,
+ echo=self.echo,
+ logging_name=self._orig_logging_name,
+ use_threadlocal=self._use_threadlocal,
+ reset_on_return=self._reset_on_return,
+ _dispatch=self.dispatch,
+ dialect=self._dialect)
+
+ def dispose(self):
+ """Dispose of this pool."""
+
+ for conn in self._all_conns:
+ try:
+ conn.close()
+ except Exception:
+ # pysqlite won't even let you close a conn from a thread
+ # that didn't create it
+ pass
+
+ self._all_conns.clear()
+
+ def _cleanup(self):
+ while len(self._all_conns) >= self.size:
+ c = self._all_conns.pop()
+ c.close()
+
+ def status(self):
+ return "SingletonThreadPool id:%d size: %d" % \
+ (id(self), len(self._all_conns))
+
+ def _do_return_conn(self, conn):
+ pass
+
+ def _do_get(self):
+ try:
+ c = self._conn.current()
+ if c:
+ return c
+ except AttributeError:
+ pass
+ c = self._create_connection()
+ self._conn.current = weakref.ref(c)
+ if len(self._all_conns) >= self.size:
+ self._cleanup()
+ self._all_conns.add(c)
+ return c
+
+
+class StaticPool(Pool):
+
+ """A Pool of exactly one connection, used for all requests.
+
+ Reconnect-related functions such as ``recycle`` and connection
+ invalidation (which is also used to support auto-reconnect) are not
+ currently supported by this Pool implementation but may be implemented
+ in a future release.
+
+ """
+
+ @util.memoized_property
+ def _conn(self):
+ return self._creator()
+
+ @util.memoized_property
+ def connection(self):
+ return _ConnectionRecord(self)
+
+ def status(self):
+ return "StaticPool"
+
+ def dispose(self):
+ if '_conn' in self.__dict__:
+ self._conn.close()
+ self._conn = None
+
+ def recreate(self):
+ self.logger.info("Pool recreating")
+ return self.__class__(creator=self._creator,
+ recycle=self._recycle,
+ use_threadlocal=self._use_threadlocal,
+ reset_on_return=self._reset_on_return,
+ echo=self.echo,
+ logging_name=self._orig_logging_name,
+ _dispatch=self.dispatch,
+ dialect=self._dialect)
+
+ def _create_connection(self):
+ return self._conn
+
+ def _do_return_conn(self, conn):
+ pass
+
+ def _do_get(self):
+ return self.connection
+
+
+class AssertionPool(Pool):
+
+ """A :class:`.Pool` that allows at most one checked out connection at
+ any given time.
+
+ This will raise an exception if more than one connection is checked out
+ at a time. Useful for debugging code that is using more connections
+ than desired.
+
+ .. versionchanged:: 0.7
+ :class:`.AssertionPool` also logs a traceback of where
+ the original connection was checked out, and reports
+ this in the assertion error raised.
+
+ """
+
+ def __init__(self, *args, **kw):
+ self._conn = None
+ self._checked_out = False
+ self._store_traceback = kw.pop('store_traceback', True)
+ self._checkout_traceback = None
+ Pool.__init__(self, *args, **kw)
+
+ def status(self):
+ return "AssertionPool"
+
+ def _do_return_conn(self, conn):
+ if not self._checked_out:
+ raise AssertionError("connection is not checked out")
+ self._checked_out = False
+ assert conn is self._conn
+
+ def dispose(self):
+ self._checked_out = False
+ if self._conn:
+ self._conn.close()
+
+ def recreate(self):
+ self.logger.info("Pool recreating")
+ return self.__class__(self._creator, echo=self.echo,
+ logging_name=self._orig_logging_name,
+ _dispatch=self.dispatch,
+ dialect=self._dialect)
+
+ def _do_get(self):
+ if self._checked_out:
+ if self._checkout_traceback:
+ suffix = ' at:\n%s' % ''.join(
+ chop_traceback(self._checkout_traceback))
+ else:
+ suffix = ''
+ raise AssertionError("connection is already checked out" + suffix)
+
+ if not self._conn:
+ self._conn = self._create_connection()
+
+ self._checked_out = True
+ if self._store_traceback:
+ self._checkout_traceback = traceback.format_stack()
+ return self._conn
for name in [b.name for b in self.buf.buffer]:
assert name in (
'sqlalchemy.engine.base.Engine.%s' % eng_name,
- 'sqlalchemy.pool.%s.%s' %
+ 'sqlalchemy.pool.impl.%s.%s' %
(eng.pool.__class__.__name__, pool_name)
)
for name in [b.name for b in self.buf.buffer]:
assert name in (
'sqlalchemy.engine.base.Engine',
- 'sqlalchemy.pool.%s' % eng.pool.__class__.__name__
+ 'sqlalchemy.pool.impl.%s' % eng.pool.__class__.__name__
)
def _named_engine(self, **kw):
self.assert_(p.checkedout() == 0)
def test_recycle(self):
- with patch("sqlalchemy.pool.time.time") as mock:
+ with patch("sqlalchemy.pool.base.time.time") as mock:
mock.return_value = 10000
p = self._queuepool_fixture(