From b64a3dd87a4204ce6d4f2793a7a3f7fbf72eb01f Mon Sep 17 00:00:00 2001 From: Taem Park Date: Tue, 21 Aug 2018 10:54:45 -0400 Subject: [PATCH] Add LIFO for connection pooling Added new "lifo" mode to :class:`.QueuePool`, typically enabled by setting the flag :paramref:`.create_engine.pool_use_lifo` to True. "lifo" mode means the same connection just checked in will be the first to be checked out again, allowing excess connections to be cleaned up from the server side during periods of the pool being only partially utilized. Pull request courtesy Taem Park. Change-Id: Idb5e299c5082b3e6b547bd03022acf65fdc34f35 Pull-request: https://github.com/zzzeek/sqlalchemy/pull/467 --- doc/build/changelog/migration_13.rst | 28 ++++++++ doc/build/changelog/unreleased_13/pr467.rst | 13 ++++ doc/build/core/pooling.rst | 35 ++++++++++ lib/sqlalchemy/engine/__init__.py | 15 ++++ lib/sqlalchemy/engine/strategies.py | 3 +- lib/sqlalchemy/pool/impl.py | 19 ++++- lib/sqlalchemy/util/queue.py | 13 +++- test/engine/test_pool.py | 77 +++++++++++++++++++++ 8 files changed, 198 insertions(+), 5 deletions(-) create mode 100644 doc/build/changelog/unreleased_13/pr467.rst diff --git a/doc/build/changelog/migration_13.rst b/doc/build/changelog/migration_13.rst index 1450fea527..423b69579e 100644 --- a/doc/build/changelog/migration_13.rst +++ b/doc/build/changelog/migration_13.rst @@ -313,6 +313,34 @@ well as for casting decimal bind values for MySQL. :ticket:`3981` +.. _change_pr467: + +New last-in-first-out strategy for QueuePool +--------------------------------------------- + +The connection pool usually used by :func:`.create_engine` is known +as :class:`.QueuePool`. This pool uses an object equivalent to Python's +built-in ``Queue`` class in order to store database connections waiting +to be used. The ``Queue`` features first-in-first-out behavior, which is +intended to provide a round-robin use of the database connections that are +persistently in the pool. However, a potential downside of this is that +when the utilization of the pool is low, the re-use of each connection in series +means that a server-side timeout strategy that attempts to reduce unused +connections is prevented from shutting down these connections. To suit +this use case, a new flag :paramref:`.create_engine.pool_use_lifo` is added +which reverses the ``.get()`` method of the ``Queue`` to pull the connection +from the beginning of the queue instead of the end, essentially turning the +"queue" into a "stack" (adding a whole new pool called ``StackPool`` was +considered, however this was too much verbosity). + +.. seealso:: + + :ref:`pool_use_lifo` + + + + + Key Behavioral Changes - Core ============================= diff --git a/doc/build/changelog/unreleased_13/pr467.rst b/doc/build/changelog/unreleased_13/pr467.rst new file mode 100644 index 0000000000..7e6e9fd029 --- /dev/null +++ b/doc/build/changelog/unreleased_13/pr467.rst @@ -0,0 +1,13 @@ +.. change:: + :tags: feature, engine + + Added new "lifo" mode to :class:`.QueuePool`, typically enabled by setting + the flag :paramref:`.create_engine.pool_use_lifo` to True. "lifo" mode + means the same connection just checked in will be the first to be checked + out again, allowing excess connections to be cleaned up from the server + side during periods of the pool being only partially utilized. Pull request + courtesy Taem Park. + + .. seealso:: + + :ref:`change_pr467` diff --git a/doc/build/core/pooling.rst b/doc/build/core/pooling.rst index 565d8ee1df..e5abf92ff6 100644 --- a/doc/build/core/pooling.rst +++ b/doc/build/core/pooling.rst @@ -396,6 +396,41 @@ a DBAPI connection might be invalidated include: All invalidations which occur will invoke the :meth:`.PoolEvents.invalidate` event. +.. _pool_use_lifo: + +Using FIFO vs. LIFO +------------------- + +The :class:`.QueuePool` class features a flag called +:paramref:`.QueuePool.use_lifo`, which can also be accessed from +:func:`.create_engine` via the flag :paramref:`.create_engine.pool_use_lifo`. +Setting this flag to ``True`` causes the pool's "queue" behavior to instead be +that of a "stack", e.g. the last connection to be returned to the pool is the +first one to be used on the next request. In contrast to the pool's long- +standing behavior of first-in-first-out, which produces a round-robin effect of +using each connection in the pool in series, lifo mode allows excess +connections to remain idle in the pool, allowing server-side timeout schemes to +close these connections out. The difference between FIFO and LIFO is +basically whether or not its desirable for the pool to keep a full set of +connections ready to go even during idle periods:: + + engine = create_engine( + "postgreql://", pool_use_lifo=True, pool_pre_ping=True) + +Above, we also make use of the :paramref:`.create_engine.pool_pre_ping` flag +so that connections which are closed from the server side are gracefully +handled by the connection pool and replaced with a new connection. + +Note that the flag only applies to :class:`.QueuePool` use. + +.. versionadded:: 1.3 + +.. seealso:: + + :ref:`pool_disconnects` + + + Using Connection Pools with Multiprocessing ------------------------------------------- diff --git a/lib/sqlalchemy/engine/__init__.py b/lib/sqlalchemy/engine/__init__.py index b0d765b8ea..d7197fe74c 100644 --- a/lib/sqlalchemy/engine/__init__.py +++ b/lib/sqlalchemy/engine/__init__.py @@ -399,6 +399,21 @@ def create_engine(*args, **kwargs): up on getting a connection from the pool. This is only used with :class:`~sqlalchemy.pool.QueuePool`. + :param pool_use_lifo=False: use LIFO (last-in-first-out) when retrieving + connections from :class:`.QueuePool` instead of FIFO + (first-in-first-out). Using LIFO, a server-side timeout scheme can + reduce the number of connections used during non- peak periods of + use. When planning for server-side timeouts, ensure that a recycle or + pre-ping strategy is in use to gracefully handle stale connections. + + .. versionadded:: 1.3 + + .. seealso:: + + :ref:`pool_use_lifo` + + :ref:`pool_disconnects` + :param plugins: string list of plugin names to load. See :class:`.CreateEnginePlugin` for background. diff --git a/lib/sqlalchemy/engine/strategies.py b/lib/sqlalchemy/engine/strategies.py index 0ec6aa06f5..d4f5185de6 100644 --- a/lib/sqlalchemy/engine/strategies.py +++ b/lib/sqlalchemy/engine/strategies.py @@ -123,7 +123,8 @@ class DefaultEngineStrategy(EngineStrategy): 'events': 'pool_events', 'use_threadlocal': 'pool_threadlocal', 'reset_on_return': 'pool_reset_on_return', - 'pre_ping': 'pool_pre_ping'} + 'pre_ping': 'pool_pre_ping', + 'use_lifo': 'pool_use_lifo'} for k in util.get_cls_kwargs(poolclass): tk = translate.get(k, k) if tk in kwargs: diff --git a/lib/sqlalchemy/pool/impl.py b/lib/sqlalchemy/pool/impl.py index c456618951..3058d62472 100644 --- a/lib/sqlalchemy/pool/impl.py +++ b/lib/sqlalchemy/pool/impl.py @@ -30,7 +30,7 @@ class QueuePool(Pool): """ - def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30, + def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30, use_lifo=False, **kw): r""" Construct a QueuePool. @@ -63,6 +63,21 @@ class QueuePool(Pool): :param timeout: The number of seconds to wait before giving up on returning a connection. Defaults to 30. + :param use_lifo: use LIFO (last-in-first-out) when retrieving + connections instead of FIFO (first-in-first-out). Using LIFO, a + server-side timeout scheme can reduce the number of connections used + during non-peak periods of use. When planning for server-side + timeouts, ensure that a recycle or pre-ping strategy is in use to + gracefully handle stale connections. + + .. versionadded:: 1.3 + + .. seealso:: + + :ref:`pool_use_lifo` + + :ref:`pool_disconnects` + :param \**kw: Other keyword arguments including :paramref:`.Pool.recycle`, :paramref:`.Pool.echo`, :paramref:`.Pool.reset_on_return` and others are passed to the @@ -70,7 +85,7 @@ class QueuePool(Pool): """ Pool.__init__(self, creator, **kw) - self._pool = sqla_queue.Queue(pool_size) + self._pool = sqla_queue.Queue(pool_size, use_lifo=use_lifo) self._overflow = 0 - pool_size self._max_overflow = max_overflow self._timeout = timeout diff --git a/lib/sqlalchemy/util/queue.py b/lib/sqlalchemy/util/queue.py index 1958702c7c..640f70ea95 100644 --- a/lib/sqlalchemy/util/queue.py +++ b/lib/sqlalchemy/util/queue.py @@ -39,10 +39,12 @@ class Full(Exception): class Queue: - def __init__(self, maxsize=0): + def __init__(self, maxsize=0, use_lifo=False): """Initialize a queue object with a given maximum size. If `maxsize` is <= 0, the queue size is infinite. + + If `use_lifo` is True, this Queue acts like a Stack (LIFO). """ self._init(maxsize) @@ -57,6 +59,8 @@ class Queue: # Notify not_full whenever an item is removed from the queue; # a thread waiting to put is notified then. self.not_full = threading.Condition(self.mutex) + # If this queue uses LIFO or FIFO + self.use_lifo = use_lifo def qsize(self): """Return the approximate size of the queue (not reliable!).""" @@ -196,4 +200,9 @@ class Queue: # Get an item from the queue def _get(self): - return self.queue.popleft() + if self.use_lifo: + # LIFO + return self.queue.pop() + else: + # FIFO + return self.queue.popleft() diff --git a/test/engine/test_pool.py b/test/engine/test_pool.py index 61737d253d..99e50f582a 100644 --- a/test/engine/test_pool.py +++ b/test/engine/test_pool.py @@ -1983,6 +1983,83 @@ class QueuePoolTest(PoolTestBase): rec.checkin ) + def test_lifo(self): + c1, c2, c3 = Mock(), Mock(), Mock() + connections = [c1, c2, c3] + + def creator(): + return connections.pop(0) + + p = pool.QueuePool(creator, use_lifo=True) + + pc1 = p.connect() + pc2 = p.connect() + pc3 = p.connect() + + pc1.close() + pc2.close() + pc3.close() + + for i in range(5): + pc1 = p.connect() + is_(pc1.connection, c3) + pc1.close() + + pc1 = p.connect() + is_(pc1.connection, c3) + + pc2 = p.connect() + is_(pc2.connection, c2) + pc2.close() + + pc3 = p.connect() + is_(pc3.connection, c2) + + pc2 = p.connect() + is_(pc2.connection, c1) + + pc2.close() + pc3.close() + pc1.close() + + def test_fifo(self): + c1, c2, c3 = Mock(), Mock(), Mock() + connections = [c1, c2, c3] + + def creator(): + return connections.pop(0) + + p = pool.QueuePool(creator) + + pc1 = p.connect() + pc2 = p.connect() + pc3 = p.connect() + + pc1.close() + pc2.close() + pc3.close() + + pc1 = p.connect() + is_(pc1.connection, c1) + pc1.close() + + pc1 = p.connect() + is_(pc1.connection, c2) + + pc2 = p.connect() + is_(pc2.connection, c3) + pc2.close() + + pc3 = p.connect() + is_(pc3.connection, c1) + + pc2 = p.connect() + is_(pc2.connection, c3) + + pc2.close() + pc3.close() + pc1.close() + class ResetOnReturnTest(PoolTestBase): def _fixture(self, **kw): -- 2.47.2