Pool connection and sizing
--------------------------
-A pool can have a fixed size (specifying no *maxconn* or *maxconn* =
-*minconn*) or a dynamic size (when *maxconn* > *minconn*). In both cases, as
-soon as the pool is created, it will try to acquire *minconn* connections in
+A pool can have a fixed size (specifying no *max_size* or *max_size* =
+*min_size*) or a dynamic size (when *max_size* > *min_size*). In both cases, as
+soon as the pool is created, it will try to acquire *min_size* connections in
background.
If an attempt to create a connection fails, a new attempt will be made soon
alerts or to interrupt the program and allow the rest of your infrastructure
to restart it.
-If more than *minconn* connections are requested concurrently, new ones are
-created, up to *maxconn*. Note that the connections are always created by the
+If more than *min_size* connections are requested concurrently, new ones are
+created, up to *max_size*. Note that the connections are always created by the
background workers, not by the thread asking the connection: if a client
requires a new connection, and a previous client terminates its job before the
new connection is ready, the waiting client will be served the existing
.. __: https://github.com/brettwooldridge/HikariCP/blob/dev/documents/
Welcome-To-The-Jungle.md
-If a pool grows above *minconn*, but its usage decreases afterwards, a number
+If a pool grows above *min_size*, but its usage decreases afterwards, a number
of connections are eventually closed: one each the *max_idle* time specified
in the pool constructor.
======================= =====================================================
Metric Meaning
======================= =====================================================
- ``pool_min`` Current value for `~ConnectionPool.minconn`
- ``pool_max`` Current value for `~ConnectionPool.maxconn`
- ``pool_size`` Current number of connections in the pool, given,
- being prepared
+ ``pool_min`` Current value for `~ConnectionPool.min_size`
+ ``pool_max`` Current value for `~ConnectionPool.max_size`
+ ``pool_size`` Number of connections currently managed by the pool
+ (in the pool, given to clients, being prepared)
``pool_available`` Number of connections currently idle in the pool
``requests_waiting`` Number of requests currently waiting in a queue to
receive a connection
`~psycopg3.Connection.connect()` for details.
:type conninfo: `!str`
- :param minconn: The minimum number of connection the pool will hold. The
+ :param min_size: The minimum number of connection the pool will hold. The
pool will actively try to create new connections if some
are lost (closed, broken) and will try to never go below
- *minconn*. Default: 4
- :type minconn: `!int`
+ *min_size*. Default: 4
+ :type min_size: `!int`
- :param maxconn: The maximum number of connections the pool will hold. If
- `!None`, or equal to *minconn*, the pool will not grow or
- shrink. If larger than *minconn* the pool can grow if more
- than *minconn* connections are requested at the same time
+ :param max_size: The maximum number of connections the pool will hold. If
+ `!None`, or equal to *min_size*, the pool will not grow or
+ shrink. If larger than *min_size* the pool can grow if more
+ than *min_size* connections are requested at the same time
and will shrink back after the extra connections have been
unused for more than *max_idle* seconds. Default: `!None`.
- :type maxconn: `Optional[int]`
+ :type max_size: `Optional[int]`
:param kwargs: Extra arguments to pass to `!connect()`. Note that this is
*one dict argument* of the pool constructor, which is
:param max_idle: Maximum time a connection can be unused in the pool before
being closed, and the pool shrunk. This only happens to
- connections more than *minconn*, if *maxconn* allowed the
+ connections more than *min_size*, if *max_size* allowed the
pool to grow. Default: 10 minutes.
:type max_idle: `!float`
terminate the program (executing `sys.exit()`).
By default don't do anything: restart a new
connection attempt (if the number of connection
- fell below *minconn*).
+ fell below *min_size*).
:type reconnect_failed: ``Callable[[ConnectionPool], None]``
:param num_workers: Number of background worker threads used to maintain the
The name of the pool set on creation, or automatically generated if not
set.
- .. autoproperty:: minconn
- .. autoproperty:: maxconn
+ .. autoproperty:: min_size
+ .. autoproperty:: max_size
The current minimum and maximum size of the pool. Use `resize()` to
change them at runtime.
)
self._workers.append(t)
- # populate the pool with initial minconn connections in background
+ # populate the pool with initial min_size connections in background
for i in range(self._nconns):
self.run_task(AddConnection(self))
- # Schedule a task to shrink the pool if connections over minconn have
+ # Schedule a task to shrink the pool if connections over min_size have
# remained unused.
self.run_task(Schedule(self, ShrinkPool(self), self.max_idle))
# Allow only one thread at time to grow the pool (or returning
# connections might be starved).
- if self._nconns < self._maxconn and not self._growing:
+ if self._nconns < self._max_size and not self._growing:
self._nconns += 1
logger.info(
"growing pool %r to %s", self.name, self._nconns
await self.close()
async def resize(
- self, minconn: int, maxconn: Optional[int] = None
+ self, min_size: int, max_size: Optional[int] = None
) -> None:
- if maxconn is None:
- maxconn = minconn
- if maxconn < minconn:
- raise ValueError("maxconn must be greater or equal than minconn")
+ if max_size is None:
+ max_size = min_size
+ if max_size < min_size:
+ raise ValueError("max_size must be greater or equal than min_size")
- ngrow = max(0, minconn - self._minconn)
+ ngrow = max(0, min_size - self._min_size)
logger.info(
- "resizing %r to minconn=%s maxconn=%s", self.name, minconn, maxconn
+ "resizing %r to min_size=%s max_size=%s",
+ self.name,
+ min_size,
+ max_size,
)
async with self._lock:
- self._minconn = minconn
- self._maxconn = maxconn
+ self._min_size = min_size
+ self._max_size = max_size
self._nconns += ngrow
for i in range(ngrow):
await self._add_to_pool(conn)
if growing:
async with self._lock:
- if self._nconns < self._maxconn and self._waiting:
+ if self._nconns < self._max_size and self._waiting:
self._nconns += 1
logger.info(
"growing pool %r to %s", self.name, self._nconns
self._nconns_min = len(self._pool)
# If the pool can shrink and connections were unused, drop one
- if self._nconns > self._minconn and nconns_min > 0:
+ if self._nconns > self._min_size and nconns_min > 0:
to_close = self._pool.popleft()
self._nconns -= 1
self._nconns_min -= 1
conninfo: str = "",
*,
kwargs: Optional[Dict[str, Any]] = None,
- minconn: int = 4,
- maxconn: Optional[int] = None,
+ min_size: int = 4,
+ max_size: Optional[int] = None,
name: Optional[str] = None,
timeout: float = 30.0,
max_waiting: int = 0,
] = None,
num_workers: int = 3,
):
- if maxconn is None:
- maxconn = minconn
- if maxconn < minconn:
- raise ValueError("maxconn must be greater or equal than minconn")
+ if max_size is None:
+ max_size = min_size
+ if max_size < min_size:
+ raise ValueError("max_size must be greater or equal than min_size")
if not name:
num = BasePool._num_pool = BasePool._num_pool + 1
name = f"pool-{num}"
self._reconnect_failed: Callable[["BasePool[ConnectionType]"], None]
self._reconnect_failed = reconnect_failed or (lambda pool: None)
self.name = name
- self._minconn = minconn
- self._maxconn = maxconn
+ self._min_size = min_size
+ self._max_size = max_size
self.timeout = timeout
self.max_waiting = max_waiting
self.reconnect_timeout = reconnect_timeout
self.max_idle = max_idle
self.num_workers = num_workers
- self._nconns = minconn # currently in the pool, out, being prepared
+ self._nconns = min_size # currently in the pool, out, being prepared
self._pool: Deque[ConnectionType] = deque()
self._stats: "TCounter[str]" = Counter()
# Min number of connections in the pool in a max_idle unit of time.
# It is reset periodically by the ShrinkPool scheduled task.
- # It is used to shrink back the pool if maxcon > minconn and extra
+ # It is used to shrink back the pool if maxcon > min_size and extra
# connections have been acquired, if we notice that in the last
# max_idle interval they weren't all used.
- self._nconns_min = minconn
+ self._nconns_min = min_size
# Flag to allow the pool to grow only one connection at time. In case
# of spike, if threads are allowed to grow in parallel and connection
)
@property
- def minconn(self) -> int:
- return self._minconn
+ def min_size(self) -> int:
+ return self._min_size
@property
- def maxconn(self) -> int:
- return self._maxconn
+ def max_size(self) -> int:
+ return self._max_size
@property
def closed(self) -> bool:
Return immediate measures of the pool (not counters).
"""
return {
- self._POOL_MIN: self._minconn,
- self._POOL_MAX: self._maxconn,
+ self._POOL_MIN: self._min_size,
+ self._POOL_MAX: self._max_size,
self._POOL_SIZE: self._nconns,
self._POOL_AVAILABLE: len(self._pool),
}
for t in self._workers:
t.start()
- # populate the pool with initial minconn connections in background
+ # populate the pool with initial min_size connections in background
for i in range(self._nconns):
self.run_task(AddConnection(self))
- # Schedule a task to shrink the pool if connections over minconn have
+ # Schedule a task to shrink the pool if connections over min_size have
# remained unused.
self.schedule_task(ShrinkPool(self), self.max_idle)
def wait(self, timeout: float = 30.0) -> None:
"""
- Wait for the pool to be full (with `minconn` connections) after creation.
+ Wait for the pool to be full (with `min_size` connections) after creation.
Raise `PoolTimeout` if not ready within *timeout* sec.
# If there is space for the pool to grow, let's do it
# Allow only one thread at time to grow the pool (or returning
# connections might be starved).
- if self._nconns < self._maxconn and not self._growing:
+ if self._nconns < self._max_size and not self._growing:
self._nconns += 1
logger.info(
"growing pool %r to %s", self.name, self._nconns
) -> None:
self.close()
- def resize(self, minconn: int, maxconn: Optional[int] = None) -> None:
+ def resize(self, min_size: int, max_size: Optional[int] = None) -> None:
"""Change the size of the pool during runtime."""
- if maxconn is None:
- maxconn = minconn
- if maxconn < minconn:
- raise ValueError("maxconn must be greater or equal than minconn")
+ if max_size is None:
+ max_size = min_size
+ if max_size < min_size:
+ raise ValueError("max_size must be greater or equal than min_size")
- ngrow = max(0, minconn - self._minconn)
+ ngrow = max(0, min_size - self._min_size)
logger.info(
- "resizing %r to minconn=%s maxconn=%s", self.name, minconn, maxconn
+ "resizing %r to min_size=%s max_size=%s",
+ self.name,
+ min_size,
+ max_size,
)
with self._lock:
- self._minconn = minconn
- self._maxconn = maxconn
+ self._min_size = min_size
+ self._max_size = max_size
self._nconns += ngrow
for i in range(ngrow):
self._add_to_pool(conn)
if growing:
with self._lock:
- if self._nconns < self._maxconn and self._waiting:
+ if self._nconns < self._max_size and self._waiting:
self._nconns += 1
logger.info(
"growing pool %r to %s", self.name, self._nconns
self._nconns_min = len(self._pool)
# If the pool can shrink and connections were unused, drop one
- if self._nconns > self._minconn and nconns_min > 0:
+ if self._nconns > self._min_size and nconns_min > 0:
to_close = self._pool.popleft()
self._nconns -= 1
self._nconns_min -= 1
def test_defaults(dsn):
with pool.ConnectionPool(dsn) as p:
- assert p.minconn == p.maxconn == 4
+ assert p.min_size == p.max_size == 4
assert p.timeout == 30
assert p.max_idle == 10 * 60
assert p.max_lifetime == 60 * 60
assert p.num_workers == 3
-def test_minconn_maxconn(dsn):
- with pool.ConnectionPool(dsn, minconn=2) as p:
- assert p.minconn == p.maxconn == 2
+def test_min_size_max_size(dsn):
+ with pool.ConnectionPool(dsn, min_size=2) as p:
+ assert p.min_size == p.max_size == 2
- with pool.ConnectionPool(dsn, minconn=2, maxconn=4) as p:
- assert p.minconn == 2
- assert p.maxconn == 4
+ with pool.ConnectionPool(dsn, min_size=2, max_size=4) as p:
+ assert p.min_size == 2
+ assert p.max_size == 4
with pytest.raises(ValueError):
- pool.ConnectionPool(dsn, minconn=4, maxconn=2)
+ pool.ConnectionPool(dsn, min_size=4, max_size=2)
def test_connection_class(dsn):
class MyConn(psycopg3.Connection):
pass
- with pool.ConnectionPool(dsn, connection_class=MyConn, minconn=1) as p:
+ with pool.ConnectionPool(dsn, connection_class=MyConn, min_size=1) as p:
with p.connection() as conn:
assert isinstance(conn, MyConn)
def test_kwargs(dsn):
- with pool.ConnectionPool(dsn, kwargs={"autocommit": True}, minconn=1) as p:
+ with pool.ConnectionPool(
+ dsn, kwargs={"autocommit": True}, min_size=1
+ ) as p:
with p.connection() as conn:
assert conn.autocommit
def test_its_really_a_pool(dsn):
- with pool.ConnectionPool(dsn, minconn=2) as p:
+ with pool.ConnectionPool(dsn, min_size=2) as p:
with p.connection() as conn:
with conn.execute("select pg_backend_pid()") as cur:
(pid1,) = cur.fetchone()
def test_context(dsn):
- with pool.ConnectionPool(dsn, minconn=1) as p:
+ with pool.ConnectionPool(dsn, min_size=1) as p:
assert not p.closed
assert p.closed
def test_connection_not_lost(dsn):
- with pool.ConnectionPool(dsn, minconn=1) as p:
+ with pool.ConnectionPool(dsn, min_size=1) as p:
with pytest.raises(ZeroDivisionError):
with p.connection() as conn:
pid = conn.pgconn.backend_pid
times = []
t0 = time()
- with pool.ConnectionPool(dsn, minconn=5, num_workers=2) as p:
+ with pool.ConnectionPool(dsn, min_size=5, num_workers=2) as p:
p.wait(1.0)
want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
assert len(times) == len(want_times)
def test_wait_ready(dsn, monkeypatch):
delay_connection(monkeypatch, 0.1)
with pytest.raises(pool.PoolTimeout):
- with pool.ConnectionPool(dsn, minconn=4, num_workers=1) as p:
+ with pool.ConnectionPool(dsn, min_size=4, num_workers=1) as p:
p.wait(0.3)
- with pool.ConnectionPool(dsn, minconn=4, num_workers=1) as p:
+ with pool.ConnectionPool(dsn, min_size=4, num_workers=1) as p:
p.wait(0.5)
- with pool.ConnectionPool(dsn, minconn=4, num_workers=2) as p:
+ with pool.ConnectionPool(dsn, min_size=4, num_workers=2) as p:
p.wait(0.3)
p.wait(0.0001) # idempotent
def test_setup_no_timeout(dsn, proxy):
with pytest.raises(pool.PoolTimeout):
with pool.ConnectionPool(
- proxy.client_dsn, minconn=1, num_workers=1
+ proxy.client_dsn, min_size=1, num_workers=1
) as p:
p.wait(0.2)
- with pool.ConnectionPool(proxy.client_dsn, minconn=1, num_workers=1) as p:
+ with pool.ConnectionPool(proxy.client_dsn, min_size=1, num_workers=1) as p:
sleep(0.5)
assert not p._pool
proxy.start()
with conn.transaction():
conn.execute("set default_transaction_read_only to on")
- with pool.ConnectionPool(minconn=1, configure=configure) as p:
+ with pool.ConnectionPool(min_size=1, configure=configure) as p:
p.wait(timeout=1.0)
with p.connection() as conn:
assert inits == 1
def configure(conn):
conn.execute("select 1")
- with pool.ConnectionPool(minconn=1, configure=configure) as p:
+ with pool.ConnectionPool(min_size=1, configure=configure) as p:
with pytest.raises(pool.PoolTimeout):
p.wait(timeout=0.5)
with conn.transaction():
conn.execute("WAT")
- with pool.ConnectionPool(minconn=1, configure=configure) as p:
+ with pool.ConnectionPool(min_size=1, configure=configure) as p:
with pytest.raises(pool.PoolTimeout):
p.wait(timeout=0.5)
with conn.transaction():
conn.execute("set timezone to utc")
- with pool.ConnectionPool(minconn=1, reset=reset) as p:
+ with pool.ConnectionPool(min_size=1, reset=reset) as p:
with p.connection() as conn:
assert resets == 0
conn.execute("set timezone to '+2:00'")
def reset(conn):
conn.execute("reset all")
- with pool.ConnectionPool(minconn=1, reset=reset) as p:
+ with pool.ConnectionPool(min_size=1, reset=reset) as p:
with p.connection() as conn:
conn.execute("select 1")
pid1 = conn.pgconn.backend_pid
with conn.transaction():
conn.execute("WAT")
- with pool.ConnectionPool(minconn=1, reset=reset) as p:
+ with pool.ConnectionPool(min_size=1, reset=reset) as p:
with p.connection() as conn:
conn.execute("select 1")
pid1 = conn.pgconn.backend_pid
for retry in retries:
with retry:
results = []
- with pool.ConnectionPool(dsn, minconn=2) as p:
+ with pool.ConnectionPool(dsn, min_size=2) as p:
ts = [Thread(target=worker, args=(i,)) for i in range(6)]
[t.start() for t in ts]
[t.join() for t in ts]
errors = []
success = []
- with pool.ConnectionPool(dsn, minconn=1, max_waiting=3) as p:
+ with pool.ConnectionPool(dsn, min_size=1, max_waiting=3) as p:
p.wait()
ev = Event()
t = Thread(target=worker, args=(0.3, ev))
results = []
errors = []
- with pool.ConnectionPool(dsn, minconn=2, timeout=0.1) as p:
+ with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
ts = [Thread(target=worker, args=(i,)) for i in range(4)]
[t.start() for t in ts]
[t.join() for t in ts]
results = []
- with pool.ConnectionPool(dsn, minconn=2) as p:
+ with pool.ConnectionPool(dsn, min_size=2) as p:
ts = [
Thread(target=worker, args=(i, timeout))
for i, timeout in enumerate([0.4, 0.4, 0.1, 0.4, 0.4])
results = []
errors = []
- with pool.ConnectionPool(dsn, minconn=2, timeout=0.1) as p:
+ with pool.ConnectionPool(dsn, min_size=2, timeout=0.1) as p:
ts = [Thread(target=worker, args=(i,)) for i in range(4)]
[t.start() for t in ts]
[t.join() for t in ts]
def test_broken_reconnect(dsn):
- with pool.ConnectionPool(dsn, minconn=1) as p:
+ with pool.ConnectionPool(dsn, min_size=1) as p:
with p.connection() as conn:
with conn.execute("select pg_backend_pid()") as cur:
(pid1,) = cur.fetchone()
def test_intrans_rollback(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg3.pool")
- with pool.ConnectionPool(dsn, minconn=1) as p:
+ with pool.ConnectionPool(dsn, min_size=1) as p:
conn = p.getconn()
pid = conn.pgconn.backend_pid
conn.execute("create table test_intrans_rollback ()")
def test_inerror_rollback(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg3.pool")
- with pool.ConnectionPool(dsn, minconn=1) as p:
+ with pool.ConnectionPool(dsn, min_size=1) as p:
conn = p.getconn()
pid = conn.pgconn.backend_pid
with pytest.raises(psycopg3.ProgrammingError):
def test_active_close(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg3.pool")
- with pool.ConnectionPool(dsn, minconn=1) as p:
+ with pool.ConnectionPool(dsn, min_size=1) as p:
conn = p.getconn()
pid = conn.pgconn.backend_pid
cur = conn.cursor()
def test_fail_rollback_close(dsn, caplog, monkeypatch):
caplog.set_level(logging.WARNING, logger="psycopg3.pool")
- with pool.ConnectionPool(dsn, minconn=1) as p:
+ with pool.ConnectionPool(dsn, min_size=1) as p:
conn = p.getconn()
def bad_rollback():
def test_putconn_no_pool(dsn):
- with pool.ConnectionPool(dsn, minconn=1) as p:
+ with pool.ConnectionPool(dsn, min_size=1) as p:
conn = psycopg3.connect(dsn)
with pytest.raises(ValueError):
p.putconn(conn)
def test_putconn_wrong_pool(dsn):
- with pool.ConnectionPool(dsn, minconn=1) as p1:
- with pool.ConnectionPool(dsn, minconn=1) as p2:
+ with pool.ConnectionPool(dsn, min_size=1) as p1:
+ with pool.ConnectionPool(dsn, min_size=1) as p2:
conn = p1.getconn()
with pytest.raises(ValueError):
p2.putconn(conn)
def test_del_no_warning(dsn, recwarn):
- p = pool.ConnectionPool(dsn, minconn=2)
+ p = pool.ConnectionPool(dsn, min_size=2)
with p.connection() as conn:
conn.execute("select 1")
def test_closed_getconn(dsn):
- p = pool.ConnectionPool(dsn, minconn=1)
+ p = pool.ConnectionPool(dsn, min_size=1)
assert not p.closed
with p.connection():
pass
def test_closed_putconn(dsn):
- p = pool.ConnectionPool(dsn, minconn=1)
+ p = pool.ConnectionPool(dsn, min_size=1)
with p.connection() as conn:
pass
@pytest.mark.slow
def test_closed_queue(dsn):
- p = pool.ConnectionPool(dsn, minconn=1)
+ p = pool.ConnectionPool(dsn, min_size=1)
success = []
def w1():
for retry in retries:
with retry:
with pool.ConnectionPool(
- dsn, minconn=2, maxconn=4, num_workers=3
+ dsn, min_size=2, max_size=4, num_workers=3
) as p:
p.wait(1.0)
results = []
with p.connection() as conn:
conn.execute("select pg_sleep(0.1)")
- with pool.ConnectionPool(dsn, minconn=2, maxconn=4, max_idle=0.2) as p:
+ with pool.ConnectionPool(dsn, min_size=2, max_size=4, max_idle=0.2) as p:
p.wait(5.0)
assert p.max_idle == 0.2
monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
proxy.start()
- with pool.ConnectionPool(proxy.client_dsn, minconn=1) as p:
+ with pool.ConnectionPool(proxy.client_dsn, min_size=1) as p:
p.wait(2.0)
proxy.stop()
with pool.ConnectionPool(
proxy.client_dsn,
name="this-one",
- minconn=1,
+ min_size=1,
reconnect_timeout=1.0,
reconnect_failed=failed,
) as p:
@pytest.mark.slow
def test_uniform_use(dsn):
- with pool.ConnectionPool(dsn, minconn=4) as p:
+ with pool.ConnectionPool(dsn, min_size=4) as p:
counts = Counter()
for i in range(8):
with p.connection() as conn:
size = []
- with pool.ConnectionPool(dsn, minconn=2, max_idle=0.2) as p:
+ with pool.ConnectionPool(dsn, min_size=2, max_idle=0.2) as p:
s = Thread(target=sampler)
s.start()
sleep(0.2)
p.resize(4)
- assert p.minconn == 4
- assert p.maxconn == 4
+ assert p.min_size == 4
+ assert p.max_size == 4
sleep(0.4)
p.resize(2)
- assert p.minconn == 2
- assert p.maxconn == 2
+ assert p.min_size == 2
+ assert p.max_size == 2
sleep(0.6)
@pytest.mark.slow
def test_max_lifetime(dsn):
- with pool.ConnectionPool(dsn, minconn=1, max_lifetime=0.2) as p:
+ with pool.ConnectionPool(dsn, min_size=1, max_lifetime=0.2) as p:
sleep(0.1)
pids = []
for i in range(5):
def test_check(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg3.pool")
- with pool.ConnectionPool(dsn, minconn=4) as p:
+ with pool.ConnectionPool(dsn, min_size=4) as p:
p.wait(1.0)
with p.connection() as conn:
pid = conn.pgconn.backend_pid
with p.connection() as conn:
conn.execute("select pg_sleep(0.2)")
- with pool.ConnectionPool(dsn, minconn=2, maxconn=4) as p:
+ with pool.ConnectionPool(dsn, min_size=2, max_size=4) as p:
p.wait(2.0)
stats = p.get_stats()
except pool.PoolTimeout:
pass
- with pool.ConnectionPool(dsn, minconn=3) as p:
+ with pool.ConnectionPool(dsn, min_size=3) as p:
p.wait(2.0)
ts = [Thread(target=worker, args=(i,)) for i in range(7)]
def test_stats_connect(dsn, proxy, monkeypatch):
proxy.start()
delay_connection(monkeypatch, 0.2)
- with pool.ConnectionPool(proxy.client_dsn, minconn=3) as p:
+ with pool.ConnectionPool(proxy.client_dsn, min_size=3) as p:
p.wait()
stats = p.get_stats()
assert stats["connections_num"] == 3
with p.connection():
sleep(0.002)
- with pool.ConnectionPool(dsn, minconn=5, maxconn=10) as p:
+ with pool.ConnectionPool(dsn, min_size=5, max_size=10) as p:
p.wait()
ts = [Thread(target=worker) for i in range(50)]
async def test_defaults(dsn):
async with pool.AsyncConnectionPool(dsn) as p:
- assert p.minconn == p.maxconn == 4
+ assert p.min_size == p.max_size == 4
assert p.timeout == 30
assert p.max_idle == 10 * 60
assert p.max_lifetime == 60 * 60
assert p.num_workers == 3
-async def test_minconn_maxconn(dsn):
- async with pool.AsyncConnectionPool(dsn, minconn=2) as p:
- assert p.minconn == p.maxconn == 2
+async def test_min_size_max_size(dsn):
+ async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
+ assert p.min_size == p.max_size == 2
- async with pool.AsyncConnectionPool(dsn, minconn=2, maxconn=4) as p:
- assert p.minconn == 2
- assert p.maxconn == 4
+ async with pool.AsyncConnectionPool(dsn, min_size=2, max_size=4) as p:
+ assert p.min_size == 2
+ assert p.max_size == 4
with pytest.raises(ValueError):
- pool.AsyncConnectionPool(dsn, minconn=4, maxconn=2)
+ pool.AsyncConnectionPool(dsn, min_size=4, max_size=2)
async def test_connection_class(dsn):
pass
async with pool.AsyncConnectionPool(
- dsn, connection_class=MyConn, minconn=1
+ dsn, connection_class=MyConn, min_size=1
) as p:
async with p.connection() as conn:
assert isinstance(conn, MyConn)
async def test_kwargs(dsn):
async with pool.AsyncConnectionPool(
- dsn, kwargs={"autocommit": True}, minconn=1
+ dsn, kwargs={"autocommit": True}, min_size=1
) as p:
async with p.connection() as conn:
assert conn.autocommit
async def test_its_really_a_pool(dsn):
- async with pool.AsyncConnectionPool(dsn, minconn=2) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
async with p.connection() as conn:
cur = await conn.execute("select pg_backend_pid()")
(pid1,) = await cur.fetchone()
async def test_context(dsn):
- async with pool.AsyncConnectionPool(dsn, minconn=1) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
assert not p.closed
assert p.closed
async def test_connection_not_lost(dsn):
- async with pool.AsyncConnectionPool(dsn, minconn=1) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
with pytest.raises(ZeroDivisionError):
async with p.connection() as conn:
pid = conn.pgconn.backend_pid
t0 = time()
async with pool.AsyncConnectionPool(
- dsn, minconn=5, num_workers=2
+ dsn, min_size=5, num_workers=2
) as p:
await p.wait(1.0)
want_times = [0.1, 0.1, 0.2, 0.2, 0.3]
delay_connection(monkeypatch, 0.1)
with pytest.raises(pool.PoolTimeout):
async with pool.AsyncConnectionPool(
- dsn, minconn=4, num_workers=1
+ dsn, min_size=4, num_workers=1
) as p:
await p.wait(0.3)
- async with pool.AsyncConnectionPool(dsn, minconn=4, num_workers=1) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=4, num_workers=1) as p:
await p.wait(0.5)
- async with pool.AsyncConnectionPool(dsn, minconn=4, num_workers=2) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=4, num_workers=2) as p:
await p.wait(0.3)
await p.wait(0.0001) # idempotent
async def test_setup_no_timeout(dsn, proxy):
with pytest.raises(pool.PoolTimeout):
async with pool.AsyncConnectionPool(
- proxy.client_dsn, minconn=1, num_workers=1
+ proxy.client_dsn, min_size=1, num_workers=1
) as p:
await p.wait(0.2)
async with pool.AsyncConnectionPool(
- proxy.client_dsn, minconn=1, num_workers=1
+ proxy.client_dsn, min_size=1, num_workers=1
) as p:
await asyncio.sleep(0.5)
assert not p._pool
async with conn.transaction():
await conn.execute("set default_transaction_read_only to on")
- async with pool.AsyncConnectionPool(minconn=1, configure=configure) as p:
+ async with pool.AsyncConnectionPool(min_size=1, configure=configure) as p:
await p.wait(timeout=1.0)
async with p.connection() as conn:
assert inits == 1
async def configure(conn):
await conn.execute("select 1")
- async with pool.AsyncConnectionPool(minconn=1, configure=configure) as p:
+ async with pool.AsyncConnectionPool(min_size=1, configure=configure) as p:
with pytest.raises(pool.PoolTimeout):
await p.wait(timeout=0.5)
async with conn.transaction():
await conn.execute("WAT")
- async with pool.AsyncConnectionPool(minconn=1, configure=configure) as p:
+ async with pool.AsyncConnectionPool(min_size=1, configure=configure) as p:
with pytest.raises(pool.PoolTimeout):
await p.wait(timeout=0.5)
async with conn.transaction():
await conn.execute("set timezone to utc")
- async with pool.AsyncConnectionPool(minconn=1, reset=reset) as p:
+ async with pool.AsyncConnectionPool(min_size=1, reset=reset) as p:
async with p.connection() as conn:
assert resets == 0
await conn.execute("set timezone to '+2:00'")
async def reset(conn):
await conn.execute("reset all")
- async with pool.AsyncConnectionPool(minconn=1, reset=reset) as p:
+ async with pool.AsyncConnectionPool(min_size=1, reset=reset) as p:
async with p.connection() as conn:
await conn.execute("select 1")
pid1 = conn.pgconn.backend_pid
async with conn.transaction():
await conn.execute("WAT")
- async with pool.AsyncConnectionPool(minconn=1, reset=reset) as p:
+ async with pool.AsyncConnectionPool(min_size=1, reset=reset) as p:
async with p.connection() as conn:
await conn.execute("select 1")
pid1 = conn.pgconn.backend_pid
async for retry in retries:
with retry:
results = []
- async with pool.AsyncConnectionPool(dsn, minconn=2) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
ts = [create_task(worker(i)) for i in range(6)]
await asyncio.gather(*ts)
errors = []
success = []
- async with pool.AsyncConnectionPool(dsn, minconn=1, max_waiting=3) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=1, max_waiting=3) as p:
await p.wait()
ev = asyncio.Event()
create_task(worker(0.3, ev))
results = []
errors = []
- async with pool.AsyncConnectionPool(dsn, minconn=2, timeout=0.1) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=2, timeout=0.1) as p:
ts = [create_task(worker(i)) for i in range(4)]
await asyncio.gather(*ts)
if timeout > 0.2:
raise
- async with pool.AsyncConnectionPool(dsn, minconn=2) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=2) as p:
results = []
ts = [
create_task(worker(i, timeout))
results = []
errors = []
- async with pool.AsyncConnectionPool(dsn, minconn=2, timeout=0.1) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=2, timeout=0.1) as p:
ts = [create_task(worker(i)) for i in range(4)]
await asyncio.gather(*ts)
async def test_broken_reconnect(dsn):
- async with pool.AsyncConnectionPool(dsn, minconn=1) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
async with p.connection() as conn:
cur = await conn.execute("select pg_backend_pid()")
(pid1,) = await cur.fetchone()
async def test_intrans_rollback(dsn, caplog):
- async with pool.AsyncConnectionPool(dsn, minconn=1) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
conn = await p.getconn()
pid = conn.pgconn.backend_pid
await conn.execute("create table test_intrans_rollback ()")
async def test_inerror_rollback(dsn, caplog):
- async with pool.AsyncConnectionPool(dsn, minconn=1) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
conn = await p.getconn()
pid = conn.pgconn.backend_pid
with pytest.raises(psycopg3.ProgrammingError):
async def test_active_close(dsn, caplog):
- async with pool.AsyncConnectionPool(dsn, minconn=1) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
conn = await p.getconn()
pid = conn.pgconn.backend_pid
cur = conn.cursor()
async def test_fail_rollback_close(dsn, caplog, monkeypatch):
- async with pool.AsyncConnectionPool(dsn, minconn=1) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
conn = await p.getconn()
async def bad_rollback():
async def test_putconn_no_pool(dsn):
- async with pool.AsyncConnectionPool(dsn, minconn=1) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=1) as p:
conn = psycopg3.connect(dsn)
with pytest.raises(ValueError):
await p.putconn(conn)
async def test_putconn_wrong_pool(dsn):
- async with pool.AsyncConnectionPool(dsn, minconn=1) as p1:
- async with pool.AsyncConnectionPool(dsn, minconn=1) as p2:
+ async with pool.AsyncConnectionPool(dsn, min_size=1) as p1:
+ async with pool.AsyncConnectionPool(dsn, min_size=1) as p2:
conn = await p1.getconn()
with pytest.raises(ValueError):
await p2.putconn(conn)
async def test_closed_getconn(dsn):
- p = pool.AsyncConnectionPool(dsn, minconn=1)
+ p = pool.AsyncConnectionPool(dsn, min_size=1)
assert not p.closed
async with p.connection():
pass
async def test_closed_putconn(dsn):
- p = pool.AsyncConnectionPool(dsn, minconn=1)
+ p = pool.AsyncConnectionPool(dsn, min_size=1)
async with p.connection() as conn:
pass
@pytest.mark.slow
async def test_closed_queue(dsn):
- p = pool.AsyncConnectionPool(dsn, minconn=1)
+ p = pool.AsyncConnectionPool(dsn, min_size=1)
success = []
async def w1():
async for retry in retries:
with retry:
async with pool.AsyncConnectionPool(
- dsn, minconn=2, maxconn=4, num_workers=3
+ dsn, min_size=2, max_size=4, num_workers=3
) as p:
await p.wait(1.0)
ts = []
await conn.execute("select pg_sleep(0.1)")
async with pool.AsyncConnectionPool(
- dsn, minconn=2, maxconn=4, max_idle=0.2
+ dsn, min_size=2, max_size=4, max_idle=0.2
) as p:
await p.wait(5.0)
assert p.max_idle == 0.2
monkeypatch.setattr(pool.base.ConnectionAttempt, "DELAY_JITTER", 0.0)
proxy.start()
- async with pool.AsyncConnectionPool(proxy.client_dsn, minconn=1) as p:
+ async with pool.AsyncConnectionPool(proxy.client_dsn, min_size=1) as p:
await p.wait(2.0)
proxy.stop()
async with pool.AsyncConnectionPool(
proxy.client_dsn,
name="this-one",
- minconn=1,
+ min_size=1,
reconnect_timeout=1.0,
reconnect_failed=failed,
) as p:
@pytest.mark.slow
async def test_uniform_use(dsn):
- async with pool.AsyncConnectionPool(dsn, minconn=4) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=4) as p:
counts = Counter()
for i in range(8):
async with p.connection() as conn:
size = []
- async with pool.AsyncConnectionPool(dsn, minconn=2, max_idle=0.2) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=2, max_idle=0.2) as p:
s = create_task(sampler())
await asyncio.sleep(0.3)
await asyncio.sleep(0.2)
await p.resize(4)
- assert p.minconn == 4
- assert p.maxconn == 4
+ assert p.min_size == 4
+ assert p.max_size == 4
await asyncio.sleep(0.4)
await p.resize(2)
- assert p.minconn == 2
- assert p.maxconn == 2
+ assert p.min_size == 2
+ assert p.max_size == 2
await asyncio.sleep(0.6)
@pytest.mark.slow
async def test_max_lifetime(dsn):
- async with pool.AsyncConnectionPool(dsn, minconn=1, max_lifetime=0.2) as p:
+ async with pool.AsyncConnectionPool(
+ dsn, min_size=1, max_lifetime=0.2
+ ) as p:
await asyncio.sleep(0.1)
pids = []
for i in range(5):
async def test_check(dsn, caplog):
caplog.set_level(logging.WARNING, logger="psycopg3.pool")
- async with pool.AsyncConnectionPool(dsn, minconn=4) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=4) as p:
await p.wait(1.0)
async with p.connection() as conn:
pid = conn.pgconn.backend_pid
async with p.connection() as conn:
await conn.execute("select pg_sleep(0.2)")
- async with pool.AsyncConnectionPool(dsn, minconn=2, maxconn=4) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=2, max_size=4) as p:
await p.wait(2.0)
stats = p.get_stats()
except pool.PoolTimeout:
pass
- async with pool.AsyncConnectionPool(dsn, minconn=3) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=3) as p:
await p.wait(2.0)
ts = [create_task(worker(i)) for i in range(7)]
async def test_stats_connect(dsn, proxy, monkeypatch):
proxy.start()
delay_connection(monkeypatch, 0.2)
- async with pool.AsyncConnectionPool(proxy.client_dsn, minconn=3) as p:
+ async with pool.AsyncConnectionPool(proxy.client_dsn, min_size=3) as p:
await p.wait()
stats = p.get_stats()
assert stats["connections_num"] == 3
async with p.connection():
await asyncio.sleep(0.002)
- async with pool.AsyncConnectionPool(dsn, minconn=5, maxconn=10) as p:
+ async with pool.AsyncConnectionPool(dsn, min_size=5, max_size=10) as p:
await p.wait()
ts = [create_task(worker()) for i in range(50)]