:maxdepth: 1
:caption: Contents:
+ ../async
+ ../copy
+ ../cursors
../row-factories
+ ../connection-pools
../adaptation
../prepared
- ../copy
- ../async
- ../cursors
../cursor
../sql
../errors
+ ../pool
../types
../pq
# (1,)
+.. index:: disconnections
+
+.. _disconnections:
+
Detecting disconnections
------------------------
--- /dev/null
+.. currentmodule:: psycopg3.pool
+
+.. _connection-pools:
+
+Connection pools
+================
+
+A `connection pool`__ is an object managing a set of connections and allowing
+their use to functions needing one. Because the time to establish a new
+connection can be relatively long, keeping connections open can reduce the
+latency of a program operations.
+
+.. __: https://en.wikipedia.org/wiki/Connection_pool
+
+This page explains a few basic concepts of `!psycopg3` connection pool's
+behaviour. Please refer to the `ConnectionPool` object API for details about
+the pool operations.
+
+
+Pool life cycle
+---------------
+
+A typical way to use the pool is to create a single instance of it, as a
+global object, and to use this object in the rest of the program, allowing
+other functions, modules, threads to use it. This is only a common use
+however, and not the necessary one; in particular the connection pool act as a
+context manager and can be closed automatically at the end of its ``with``
+block::
+
+ with my_pool as ConnectionPool(conninfo, **kwargs):
+ run_app(my_pool)
+
+ # the pool is now closed
+
+If necessary, or convenient, your application may create more than one pool,
+for instance to connect to more than one database or to provide separate
+read-only and read/write connections.
+
+Once a pool is instantiated, the constructor returns immediately, while the
+background workers try to create the required number of connections to fill
+the pool. If your application is misconfigured, or the network is down, it
+means that the pool will be available but threads requesting a connection will
+fail with a `PoolTimeout` after the `~ConnectionPool.connection()` timeout is
+expired. If this behaviour is not desirable you should call the
+`~ConnectionPool.wait()` method after creating the pool, which will block
+until the pool is full or will throw a `PoolTimeout` if the pool isn't ready
+within an allocated time.
+
+The pool background workers create connections according to the parameters
+*conninfo*, *kwargs*, *connection_class* passed to the pool constructor. Once
+a connection is created it is also passed to the *configure()* callback, if
+provided, after which it is put in the pool (or passed to a client requesting
+it, if someone is already knocking at the door). If a connection expires
+(it passes *max_lifetime*), is returned to the pool in broken state, is found
+closed by `~ConnectionPool.check()`, then is disposed of and a new connection
+attempt is started in background.
+
+When the pool is no more to be used you should call the
+`~ConnectionPool.close()` method (unless the ``with`` syntax is used). If the
+pool is a global object it may be unclear how to do so. Missing a call to
+`!close()` shouldn't be a big problem, it should just result in a few warnings
+printed. However, if you think that's sloppy, you can use the `atexit` module
+to have the `!close()` function called at the end of the program.
+
+
+Using connections from the pool
+-------------------------------
+
+The pool can be used to request connection from multiple threads - it is
+hardly useful otherwise! If more connections than the ones available in the
+pool are requested, the requesting threads are queued and are served a
+connection as soon as one is available again: either because another client
+has finished using it or because the pool is allowed to grow and a new
+connection is ready.
+
+The main way to use the pool is to obtain a connection using the
+`~ConnectionPool.connection()` context, which return a `~psycopg3.Connection`
+or subclass::
+
+ with my_pool.connection() as conn:
+ conn.execute("what you want")
+
+At the end of the block the connection is returned to the pool and shouldn't
+be used anymore by the code which obtained it. If a *reset()* function is
+specified in the pool constructor it is called on the connection before
+returning it to the pool. Note that the *reset()* function is called in a
+working thread, so that the thread which used the connection can keep its
+execution witout being slowed down.
+
+
+Pool connection and sizing
+--------------------------
+
+A pool can have a fixed size (specifying no *maxconn* or *maxconn* =
+*minconn*) or a dynamic size (when *maxconn* > *minconn*). In both cases, as
+soon as the pool is created, it will try to acquire *minconn* connections in
+background.
+
+If an attempt to create a connection fails, a new attempt will be made soon
+after, using an exponential backoff to increase the time between attempts,
+until a maximum of *reconnect_timeout* is reached. When that happens, the pool
+will call the *reconnect_failed()* function, if provided to the pool, and just
+start a new connection attempt. You can use this function either to send
+alerts or to interrupt the program and allow the rest of your infrastructure
+to restart it.
+
+If more than *minconn* connections are requested concurrently, new ones are
+created, up to *maxconn*. Note that the connections are always created by the
+background workers, not by the thread asking the connection: if a client
+requires a new connection, and a previous client terminates its job before the
+new connection is ready, the waiting client will be served the existing
+connection. This is especially useful in scenarios where the time to connect
+is longer than the time the connection is used (see `this analysis`__, for
+instance).
+
+.. __: https://github.com/brettwooldridge/HikariCP/blob/dev/documents/
+ Welcome-To-The-Jungle.md
+
+If a pool grows above *minconn*, but its usage decreases afterwards, a number
+of connections are eventually closed: one each the *max_idle* time specified
+in the pool constructor.
+
+
+What's the right size for the pool
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Big question. Who knows. However, probably not as large as you imagine. Please
+take a look at `this this analysis`__ for some ideas.
+
+.. __: https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing
+
+Something useful you can do is probably to use the
+`~ConnectionPool.get_stats()` method and monitor the behaviour of your
+program, eventually adjusting the size of the pool using the
+`~ConnectionPool.resize()` method.
+
+
+Connections quality
+-------------------
+
+The state of the connection is verified when a connection is returned to the
+pool: if a connection is broken during its usage it will be discarded on
+return and a new connection will be created.
+
+.. warning::
+
+ The health of the connection is not checked when the pool gives it to a
+ client.
+
+Why not? Because doing so would require an extra network roundtrip: we want to
+save you from its latency. Before getting too angry about it, think that the
+connection can be lost any moment while your program is using it. As your
+program should be already able to cope with a loss of a connection during its
+process it should be able to tolerate to be served a broken connection:
+unpleasant but not the end of the world.
+
+.. warning::
+
+ The health of the connection is not checked when the connection is in the
+ pool.
+
+Does the pool keep a watchful eye on the quality of the connections inside it?
+No, it doesn't. Why not? Because you will do it for us! Your program is only
+a big ruse to make sure the connections are kept alive...
+
+Not (entirely) trolling: if you are using a connection pool, we assume that
+you are using and returning connections at a good pace. If the pool had to
+check for the quality of a broken connection before your program notices it,
+it should be polling each connection even faster than your program uses them.
+Your database server wouldn't be amused...
+
+Can you do something better than that? Of course you can: there is always a
+better way than polling. You can use the same recipe of :ref:`disconnections`:
+you can dedicate a thread (and a connection) to listen for activity on the
+connection. If any activity is detected you can call the pool
+`~ConnectionPool.check()` method, which will make every connection in the pool
+briefly unavailable and run a quick check on them, returning them to the pool
+if they are still working or creating a new connection if they aren't.
+
+If you set up a similar check in your program, in case the database connection
+is temporarily lost, we cannot do anything for the thread which had taken
+already a connection from the pool, but no other thread should be served a
+broken connection, because `!check()` would empty the pool and refill it with
+working connection, as soon as they are available.
+
+Faster than you can say poll. Or pool.
+
+
+.. _pool-stats:
+
+Pool stats
+----------
+
+The pool can return information about its usage using the methods
+`~ConnectionPool.get_stats()` or `~ConnectionPool.pop_stats()`. Both methods
+return the same values, but the latter reset the counters after its use. The
+values can be send to a monitoring system such as Graphite_ or Prometheus_.
+
+.. _Graphite: https://graphiteapp.org/
+.. _Prometheus: https://prometheus.io/
+
+The following values should be provided, but please don't consider them as a
+rigid interface: they may change. Keys whose value is 0 may be not returned.
+
+
+======================= =====================================================
+Metric Meaning
+======================= =====================================================
+ ``pool_min`` Current value for `~ConnectionPool.minconn`
+ ``pool_max`` Current value for `~ConnectionPool.maxconn`
+ ``pool_size`` Current number of connections in the pool, given,
+ being prepared
+ ``pool_available`` Number of connections currently idle in the pool
+ ``queue_length`` Number of client in the queue waiting for a
+ connection
+ ``usage_ms`` Total usage time of the connections outside the pool
+ ``requests_num`` Number of connections requested to the pool
+ ``requests_queued`` Number of requests queued because a connection wasn't
+ immedately available
+ ``requests_wait_ms`` Total time in the queue for the clients waiting
+ ``requests_timeouts`` Number of waiting clients whose request timed out
+ ``returns_bad`` Number of connections returned to the pool in a bad
+ state
+ ``connections_num`` Number of connection attempts made by the pool to the
+ server
+ ``connections_ms`` Total time spent to establish connections with the
+ server
+ ``connections_errors`` Number of failed connection attempts
+ ``connections_lost`` Number of connections lost identified by
+ `~ConnectionPool.check()`
+======================= =====================================================
.. automethod:: close
- .. note:: You can use :ref:`with connect(): ...<usage>` to
+ .. note:: You can use :ref:`with connect(): ...<with-connection>` to
close the connection automatically when the block is exited.
.. autoattribute:: closed
.. index::
single: Error; Class
-.. module:: psycopg3.errors
+.. module:: psycopg3
This module exposes objects to represent and examine database errors.
.. autoattribute:: diag
+
+.. module:: psycopg3.errors
+
.. autoclass:: Diagnostic()
The object is returned by the `Error.diag` attribute and is passed to the
server versions.
+.. currentmodule:: psycopg3
+
.. index::
single: Exceptions; DB-API
\|__ `ProgrammingError`
\|__ `NotSupportedError`
-These classes are also exposed by the ``psycopg3`` module.
+These classes are also exposed both by the `!psycopg3` and the
+`!psycopg3.errors` module.
.. autoexception:: Warning()
.. autoexception:: InterfaceError()
.. autoexception:: NotSupportedError()
+.. currentmodule:: psycopg3.errors
+
.. index::
single: Exceptions; PostgreSQL
--- /dev/null
+`pool` -- Connection pool implementations
+=========================================
+
+.. index::
+ double: Connection; Pool
+
+.. module:: psycopg3.pool
+
+The package contains two connection pool implementations. A connection pool
+creates and maintains a limited amount of `~psycopg3.Connection` instances and
+allows a larger number of users to use them. See :ref:`connection-pools` for
+more details and usage pattern.
+
+There package implement two connection pools: `ConnectionPool` is a
+synchronous connection pool yielding `~psycopg3.Connection` objects and can be
+used by multithread applications. `AsyncConnectionPool` has a similar
+interface, but with `asyncio` functions replacing blocking functions, and
+yields `~psycopg3.AsyncConnection` instances.
+
+The intended use (but not mandatory) is to create a single connection pool, as
+a global object exposed by a module in your application, and use the same
+instance from the rest of the code (especially the
+`~ConnectionPool.connection()` method.
+
+
+The `!ConnectionPool` class
+---------------------------
+
+.. autoclass:: ConnectionPool(conninfo, *, **arguments)
+
+ This class implements a connection pool serving `~psycopg2.Connection`
+ instances (or subclasses).
+
+ :param conninfo: The connection string. See
+ `~psycopg3.Connection.connect()` for details.
+ :type conninfo: `!str`
+
+ :param minconn: The minimum number of connection the pool will hold. The
+ pool will actively try to create new connections if some
+ are lost (closed, broken) and will try to never go below
+ *minconn*. Default: 4
+ :type minconn: `!int`
+
+ :param maxconn: The maximum number of connections the pool will hold. If
+ `!None`, or equal to *minconn*, the pool will not grow or
+ shrink. If larger than *minconn* the pool can grow if more
+ than *minconn* connections are requested at the same time
+ and will shrink back after the extra connections have been
+ unused for more than *max_idle* seconds. Default: `!None`.
+ :type maxconn: `Optional[int]`
+
+ :param kwargs: Extra arguments to pass to `!connect()`. Note that this is
+ *one dict argument* of the pool constructor, which is
+ expanded as `connect()` keyword parameters.
+
+ :type kwargs: `!dict`
+
+ :param configure: A callback to configure a connection after creation.
+ Useful, for instance, to configure its adapters. If the
+ connection is used to run internal queries (to inspect the
+ database) make sure to close an eventual transaction
+ before leaving the function.
+ :type configure: `Callable[[Connection], None]`
+
+ :param reset: A callback to reset a function after it has been returned to
+ the pool. The connection is guaranteed to be passed to the
+ *reset()* function in "idle" state (no transaction). When
+ leaving the *reset()* function the connection must be left in
+ *idle* state, otherwise is discarded.
+ :type reset: `Callable[[Connection], None]`
+
+ :param connection_class: The class of the connections to serve. Default:
+ `~psycopg3.Connection`. It should be a
+ `!Connection` subclass.
+ :type connection_class: ``Type[Connection]``
+
+ :param name: An optional name to give to the pool, useful, for instance, to
+ identify it in the logs if more than one pool is used. If
+ `!None` (default) pick a sequential name such as ``pool-1``,
+ ``pool-2`` etc.
+ :type name: `!str`
+
+ :param timeout: The default maximum time in seconts that a client can wait
+ to receive a connection from the pool (using `connection()`
+ or `getconn()`). Note that these methods allow to override
+ the *timeout* default. Default: 30 seconds.
+ :type timeout: `!float`
+
+ :param max_lifetime: The maximum lifetime of a connection in the pool, in
+ seconds. Connections used for longer get closed and
+ replaced by a new one. The amount is reduced by a
+ random 10% to avoid mass eviction. Default: one hour.
+ :type max_lifetime: `!float`
+
+ :param max_idle: Maximum time a connection can be unused in the pool before
+ being closed, and the pool shrunk. This only happens to
+ connections more than *minconn*, if *maxconn* allowed the
+ pool to grow. Default: 10 minutes.
+ :type max_idle: `!float`
+
+ :param reconnect_timeout: Maximum time in seconds the pool will try to
+ create a connection. If a connection attempt
+ fails, the pool will try to reconnect a few
+ times, using an exponential backoff and some
+ random factor to avoid mass attempts. If repeated
+ attempt fails, after *reconnect_timeout* second
+ the attempt is aborted and the *reconnect_failed*
+ callback invoked. Default: 5 minutes.
+ :type reconnect_timeout: `!float`
+
+ :param reconnect_failed: Callback invoked if an attempt to create a new
+ connection fails for more than *reconnect_timeout*
+ seconds. The user may decide, for instance, to
+ terminate the program (executing `sys.exit()`).
+ By default don't do anything: restart a new
+ connection attempt (if the number of connection
+ fell below *minconn*).
+ :type reconnect_failed: ``Callable[[ConnectionPool], None]``
+
+ :param num_workers: Number of background worker threads used to maintain the
+ pool state. Background workers are used for example to
+ create new connections and to clean up connections when
+ they are returned to the pool. Default: 3.
+ :type num_workers: `!int`
+
+ .. automethod:: wait
+ .. automethod:: connection
+
+ .. code:: python
+
+ with my_pool.connection() as conn:
+ conn.execute(...)
+
+ # the connection is now back in the pool
+
+ .. automethod:: close
+
+ .. note::
+
+ The pool can be used as context manager too, in which case it will
+ be closed at the end of the block:
+
+ .. code:: python
+
+ with ConnectionPool(...) as pool:
+ # code using the pool
+
+ .. attribute:: name
+ :type: str
+
+ The name of the pool set on creation, or automatically generated if not
+ set.
+
+ .. autoproperty:: minconn
+ .. autoproperty:: maxconn
+
+ The current minimum and maximum size of the pool. Use `resize()` to
+ change them at runtime.
+
+ .. automethod:: resize
+ .. automethod:: check
+ .. automethod:: get_stats
+ .. automethod:: pop_stats
+
+ See :ref:`pool-stats` for the metrics returned.
+
+ .. rubric:: Functionalities you may not need
+
+ .. automethod:: getconn
+ .. automethod:: putconn
+
+
+.. autoclass:: PoolTimeout()
+
+ Subclass of `~psycopg3.OperationalError`
+
+.. autoclass:: PoolClosed()
+
+ Subclass of `~psycopg3.OperationalError`
+
+
+The `!AsyncConnectionPool` class
+--------------------------------
+
+`!AsyncConnectionPool` has a very similar interface to the `ConnectionPool`
+class but its blocking method are implemented as `async` coroutines. It
+returns `AsyncConnection` instances, or its subclasses if specified so in the
+*connection_class* parameter.
+
+Only the function with different signature from `!ConnectionPool` are
+listed here.
+
+.. autoclass:: AsyncConnectionPool(conninfo, *, **arguments)
+
+ All the other parameters are the same.
+
+ :param configure: A callback to configure a connection after creation.
+ :type configure: `async Callable[[AsyncConnection], None]`
+
+ :param reset: A callback to reset a function after it has been returned to
+ the pool.
+ :type reset: `async Callable[[AsyncConnection], None]`
+
+ :param connection_class: The class of the connections to serve. Default:
+ `~psycopg3.AsyncConnection`. It should be an
+ `!AsyncConnection` subclass.
+ :type connection_class: ``Type[AsyncConnection]``
+
+ .. automethod:: wait
+ .. automethod:: connection
+
+ .. code:: python
+
+ async with my_pool.connection() as conn:
+ await conn.execute(...)
+
+ # the connection is now back in the pool
+
+ .. automethod:: close
+
+ .. note::
+
+ The pool can be used as context manager too, in which case it will
+ be closed at the end of the block:
+
+ .. code:: python
+
+ async with AsyncConnectionPool(...) as pool:
+ # code using the pool
+
+ .. automethod:: resize
+ .. automethod:: check
+ .. automethod:: getconn
+ .. automethod:: putconn
- :ref:`query-parameters`.
- :ref:`types-adaptation`.
- :ref:`transactions`.
+
+
+.. index::
+ pair: Connection; ``with``
+
+.. _with-connection:
+
+Connection context
+------------------
+
+`!psycopg3` `Connection` can be used as a context manager:
+
+.. code:: python
+
+ with psycopg3.connect() as conn:
+ ... # use the connection
+
+ # the connection is now closed
+
+When the block is exited, if there is a transaction open, it will be
+committed. If an exception is raised within the block the transaction is
+rolled back. In either case the connection is closed.
+
+`AsyncConnection` can be also used as context manager, using ``async with``,
+but be careful about its quirkiness: see :ref:`async-with` for details.
self.run_task(Schedule(self, ShrinkPool(self), self.max_idle))
async def wait(self, timeout: float = 30.0) -> None:
- """
- Wait for the pool to be full after init.
-
- Raise `PoolTimeout` if not ready within *timeout* sec.
- """
async with self._lock:
assert not self._pool_full_event
if len(self._pool) >= self._nconns:
async def connection(
self, timeout: Optional[float] = None
) -> AsyncIterator[AsyncConnection]:
- """Context manager to obtain a connection from the pool.
-
- Returned the connection immediately if available, otherwise wait up to
- *timeout* or `self.timeout` and throw `PoolTimeout` if a connection is
- not available in time.
-
- Upon context exit, return the connection to the pool. Apply the normal
- connection context behaviour (commit/rollback the transaction in case
- of success/error). If the connection is no more in working state
- replace it with a new one.
- """
conn = await self.getconn(timeout=timeout)
t0 = monotonic()
try:
async def getconn(
self, timeout: Optional[float] = None
) -> AsyncConnection:
- """Obtain a contection from the pool.
-
- You should preferrably use `connection()`. Use this function only if
- it is not possible to use the connection as context manager.
-
- After using this function you *must* call a corresponding `putconn()`:
- failing to do so will deplete the pool. A depleted pool is a sad pool:
- you don't want a depleted pool.
- """
logger.info("connection requested to %r", self.name)
self._stats[self._REQUESTS_NUM] += 1
# Critical section: decide here if there's a connection ready
return conn
async def putconn(self, conn: AsyncConnection) -> None:
- """Return a connection to the loving hands of its pool.
-
- Use this function only paired with a `getconn()`. You don't need to use
- it if you use the much more comfortable `connection()` context manager.
- """
# Quick check to discard the wrong connection
pool = getattr(conn, "_pool", None)
if pool is not self:
await self._return_connection(conn)
async def close(self, timeout: float = 5.0) -> None:
- """Close the pool and make it unavailable to new clients.
-
- All the waiting and future client will fail to acquire a connection
- with a `PoolClosed` exception. Currently used connections will not be
- closed until returned to the pool.
-
- Wait *timeout* for threads to terminate their job, if positive.
- """
if self._closed:
return
self.run_task(AddConnection(self))
async def check(self) -> None:
- """Verify the state of the connections currently in the pool.
-
- Test each connection: if it works return it to the pool, otherwise
- dispose of it and create a new one.
- """
async with self._lock:
conns = list(self._pool)
self._pool.clear()
def wait(self, timeout: float = 30.0) -> None:
"""
- Wait for the pool to be full after init.
+ Wait for the pool to be full (with `minconn` connections) after creation.
Raise `PoolTimeout` if not ready within *timeout* sec.
+
+ Calling this method is not mandatory: you can try and use the pool
+ immediately after its creation. The first client will be served as soon
+ as a connection is ready. You can use this method if you prefer your
+ program to terminate in case the environment is not configured
+ properly, rather than trying to stay up the harder it can.
"""
with self._lock:
assert not self._pool_full_event
not available in time.
Upon context exit, return the connection to the pool. Apply the normal
- connection context behaviour (commit/rollback the transaction in case
- of success/error). If the connection is no more in working state
- replace it with a new one.
+ :ref:`connection context behaviour <with-connection>` (commit/rollback
+ the transaction in case of success/error). If the connection is no more
+ in working state replace it with a new one.
+
"""
conn = self.getconn(timeout=timeout)
t0 = monotonic()
else:
self._return_connection(conn)
- def close(self, timeout: float = 1.0) -> None:
+ def close(self, timeout: float = 5.0) -> None:
"""Close the pool and make it unavailable to new clients.
All the waiting and future client will fail to acquire a connection
with a `PoolClosed` exception. Currently used connections will not be
closed until returned to the pool.
- Wait *timeout* for threads to terminate their job, if positive.
+ Wait *timeout* for threads to terminate their job, if positive. If
+ timeout expires the pool is closed anyway, although it may raise some
+ warnings on exit.
"""
if self._closed:
return
self.close()
def resize(self, minconn: int, maxconn: Optional[int] = None) -> None:
+ """Change the size of the pool during runtime."""
if maxconn is None:
maxconn = minconn
if maxconn < minconn: