minconn: int = 4,
maxconn: Optional[int] = None,
name: Optional[str] = None,
- timeout_sec: float = 30.0,
- max_idle_sec: float = 10 * 60.0,
+ timeout: float = 30.0,
+ max_idle: float = 10 * 60.0,
reconnect_timeout: float = 5 * 60.0,
reconnect_failed: Optional[Callable[["ConnectionPool"], None]] = None,
num_workers: int = 3,
self.name = name
self.minconn = minconn
self.maxconn = maxconn
- self.timeout_sec = timeout_sec
+ self.timeout = timeout
self.reconnect_timeout = reconnect_timeout
- self.max_idle_sec = max_idle_sec
+ self.max_idle = max_idle
self.num_workers = num_workers
self._nconns = minconn # currently in the pool, out, being prepared
self.add_task(AddInitialConnection(self, event))
# Wait for the pool to be full or throw an error
- if not event.wait(timeout=timeout_sec):
+ if not event.wait(timeout=timeout):
self.close() # stop all the threads
raise PoolTimeout(
- f"pool initialization incomplete after {timeout_sec} sec"
+ f"pool initialization incomplete after {timeout} sec"
)
def __repr__(self) -> str:
@contextmanager
def connection(
- self, timeout_sec: Optional[float] = None
+ self, timeout: Optional[float] = None
) -> Iterator[Connection]:
"""Context manager to obtain a connection from the pool.
Returned the connection immediately if available, otherwise wait up to
- *timeout_sec* or `self.timeout_sec` and throw `PoolTimeout` if a
+ *timeout* or `self.timeout` and throw `PoolTimeout` if a
connection is available in time.
Upon context exit, return the connection to the pool. Apply the normal
of success/error). If the connection is no more in working state
replace it with a new one.
"""
- conn = self.getconn(timeout_sec=timeout_sec)
+ conn = self.getconn(timeout=timeout)
try:
with conn:
yield conn
finally:
self.putconn(conn)
- def getconn(self, timeout_sec: Optional[float] = None) -> Connection:
+ def getconn(self, timeout: Optional[float] = None) -> Connection:
"""Obtain a contection from the pool.
You should preferrably use `connection()`. Use this function only if
# If we are in the waiting queue, wait to be assigned a connection
# (outside the critical section, so only the waiting client is locked)
if pos:
- if timeout_sec is None:
- timeout_sec = self.timeout_sec
- conn = pos.wait(timeout=timeout_sec)
+ if timeout is None:
+ timeout = self.timeout
+ conn = pos.wait(timeout=timeout)
# Tell the connection it belongs to a pool to avoid closing on __exit__
# Note that this property shouldn't be set while the connection is in
# Also check if it's time to shrink the pool
if (
self._nconns > self.minconn
- and now - self._pool[0][1] > self.max_idle_sec
+ and now - self._pool[0][1] > self.max_idle
):
to_close, t0 = self._pool.popleft()
logger.debug(
def test_defaults(dsn):
p = pool.ConnectionPool(dsn)
assert p.minconn == p.maxconn == 4
- assert p.timeout_sec == 30
- assert p.max_idle_sec == 600
+ assert p.timeout == 30
+ assert p.max_idle == 600
assert p.num_workers == 3
def test_init_timeout(dsn, monkeypatch):
delay_connection(monkeypatch, 0.1)
with pytest.raises(pool.PoolTimeout):
- pool.ConnectionPool(dsn, minconn=4, num_workers=1, timeout_sec=0.3)
+ pool.ConnectionPool(dsn, minconn=4, num_workers=1, timeout=0.3)
- p = pool.ConnectionPool(dsn, minconn=4, num_workers=1, timeout_sec=0.5)
+ p = pool.ConnectionPool(dsn, minconn=4, num_workers=1, timeout=0.5)
p.close()
- p = pool.ConnectionPool(dsn, minconn=4, num_workers=2, timeout_sec=0.3)
+ p = pool.ConnectionPool(dsn, minconn=4, num_workers=2, timeout=0.3)
p.close()
@pytest.mark.slow
def test_queue_timeout(dsn):
- p = pool.ConnectionPool(dsn, minconn=2, timeout_sec=0.1)
+ p = pool.ConnectionPool(dsn, minconn=2, timeout=0.1)
results = []
errors = []
@pytest.mark.slow
def test_queue_timeout_override(dsn):
- p = pool.ConnectionPool(dsn, minconn=2, timeout_sec=0.1)
+ p = pool.ConnectionPool(dsn, minconn=2, timeout=0.1)
results = []
errors = []
t0 = time()
timeout = 0.25 if n == 3 else None
try:
- with p.connection(timeout_sec=timeout) as conn:
+ with p.connection(timeout=timeout) as conn:
(pid,) = conn.execute(
"select pg_backend_pid() from pg_sleep(0.2)"
).fetchone()
@pytest.mark.slow
def test_shrink(dsn, monkeypatch):
p = pool.ConnectionPool(
- dsn, minconn=2, maxconn=4, num_workers=3, max_idle_sec=0.2
+ dsn, minconn=2, maxconn=4, num_workers=3, max_idle=0.2
)
- assert p.max_idle_sec == 0.2
+ assert p.max_idle == 0.2
def worker(n):
with p.connection() as conn:
monkeypatch.setattr(pool.AddConnection, "DELAY_JITTER", 0.0)
proxy.start()
- p = pool.ConnectionPool(proxy.client_dsn, minconn=1, timeout_sec=2)
+ p = pool.ConnectionPool(proxy.client_dsn, minconn=1, timeout=2)
proxy.stop()
with pytest.raises(psycopg3.OperationalError):
proxy.client_dsn,
name="this-one",
minconn=1,
- timeout_sec=2,
+ timeout=2,
reconnect_timeout=1.0,
reconnect_failed=failed,
)