pos: Optional[WaitingClient] = None
if self._pool:
# Take a connection ready out of the pool
- conn = self._pool.pop(-1)
+ conn = self._pool.pop(-1)[0]
else:
# No connection available: put the client in the waiting queue
pos = WaitingClient()
self._waiting.append(pos)
+ # If there is space for the pool to grow, let's do it
+ if self._nconns < self.maxconn:
+ logger.debug("growing pool %s", self.name)
+ self._nconns += 1
+ self.add_task(AddConnection(self))
+
# If we are in the waiting queue, wait to be assigned a connection
# (outside the critical section, so only the waiting client is locked)
if pos:
import pytest
import psycopg3
-from psycopg3.pq import TransactionStatus
from psycopg3 import pool
+from psycopg3.pq import TransactionStatus
def test_minconn_maxconn(dsn):
with p.connection() as conn:
conn.execute("select 1")
- while len(p._pool) < p.minconn:
- sleep(0.01)
+ wait_pool_full(p)
ref = weakref.ref(p)
del p
t1.join()
t2.join()
assert len(success) == 2
+
+
+@pytest.mark.slow
+def test_grow(dsn, monkeypatch):
+ p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, num_workers=3)
+ wait_pool_full(p)
+ delay_connection(monkeypatch, 0.1)
+ ts = []
+ results = []
+
+ def worker(n):
+ t0 = time()
+ with p.connection() as conn:
+ conn.execute("select 1 from pg_sleep(0.2)")
+ t1 = time()
+ results.append((n, t1 - t0))
+
+ for i in range(6):
+ t = Thread(target=worker, args=(i,))
+ t.start()
+ ts.append(t)
+
+ for t in ts:
+ t.join()
+
+ deltas = [0.2, 0.2, 0.3, 0.3, 0.4, 0.4]
+ for (_, got), want in zip(results, deltas):
+ assert got == pytest.approx(want, 0.1)
+
+
+def delay_connection(monkeypatch, sec):
+ """
+ Return a _connect_gen function delayed by the amount of seconds
+ """
+ connect_gen_orig = psycopg3.Connection._connect_gen
+
+ def connect_gen_delayed(*args, **kwargs):
+ psycopg3.pool.logger.debug("delaying connection")
+ sleep(sec)
+ rv = yield from connect_gen_orig(*args, **kwargs)
+ return rv
+
+ monkeypatch.setattr(
+ psycopg3.Connection, "_connect_gen", connect_gen_delayed
+ )
+
+
+def wait_pool_full(pool):
+ while len(pool._pool) < pool.minconn:
+ sleep(0.01)