From: Daniele Varrazzo Date: Thu, 25 Feb 2021 20:57:20 +0000 (+0100) Subject: Add pool.wait_ready, drop connection_timeout on init X-Git-Tag: 3.0.dev0~87^2~44 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=882ed791e814c12ad8d1b6ee3208536901918daa;p=thirdparty%2Fpsycopg.git Add pool.wait_ready, drop connection_timeout on init This way init should never block. Always fill the connection in background but a client can decide if they want to wait for it to be ready. --- diff --git a/psycopg3/psycopg3/pool/pool.py b/psycopg3/psycopg3/pool/pool.py index b6718f9f2..ce6282ed3 100644 --- a/psycopg3/psycopg3/pool/pool.py +++ b/psycopg3/psycopg3/pool/pool.py @@ -39,7 +39,6 @@ class ConnectionPool: maxconn: Optional[int] = None, name: Optional[str] = None, timeout: float = 30.0, - setup_timeout: float = 30.0, max_idle: float = 10 * 60.0, reconnect_timeout: float = 5 * 60.0, reconnect_failed: Optional[Callable[["ConnectionPool"], None]] = None, @@ -87,6 +86,9 @@ class ConnectionPool: # max_idle interval they weren't all used. self._nconns_min = minconn + # to notify that the pool is full + self._pool_full_event: Optional[threading.Event] = None + self._tasks: "Queue[tasks.MaintenanceTask]" = Queue() self._workers: List[threading.Thread] = [] for i in range(num_workers): @@ -108,22 +110,9 @@ class ConnectionPool: for t in self._workers: t.start() - # Populate the pool with initial minconn connections - # Block if setup_timeout is > 0, otherwise fill the pool in background - if setup_timeout > 0: - event = threading.Event() - for i in range(self._nconns): - self.run_task(tasks.AddInitialConnection(self, event)) - - # Wait for the pool to be full or throw an error - if not event.wait(timeout=setup_timeout): - self.close() # stop all the threads - raise PoolTimeout( - f"pool initialization incomplete after {setup_timeout} sec" - ) - else: - for i in range(self._nconns): - self.run_task(tasks.AddConnection(self)) + # Populate the pool with initial minconn connections in background + for i in range(self._nconns): + self.run_task(tasks.AddConnection(self)) # Schedule a task to shrink the pool if connections over minconn have # remained unused. However if the pool cannot't grow don't bother. @@ -142,6 +131,27 @@ class ConnectionPool: if hasattr(self, "_closed"): self.close(timeout=0) + def wait_ready(self, timeout: float = 30.0) -> None: + """ + Wait for the pool to be full after init. + + Raise `PoolTimeout` if not ready within *timeout* sec. + """ + with self._lock: + assert not self._pool_full_event + if len(self._pool) >= self._nconns: + return + self._pool_full_event = threading.Event() + + if not self._pool_full_event.wait(timeout): + self.close() # stop all the threads + raise PoolTimeout( + f"pool initialization incomplete after {timeout} sec" + ) + + with self._lock: + self._pool_full_event = None + @contextmanager def connection( self, timeout: Optional[float] = None @@ -355,24 +365,6 @@ class ConnectionPool: conn._pool = self return conn - def _add_initial_connection(self, event: threading.Event) -> None: - """Create a new connection at the beginning of the pool life. - - Trigger *event* if all the connections necessary have been added. - """ - conn = self._connect() - conn._pool = None # avoid a reference loop - - with self._lock: - assert ( - not self._waiting - ), "clients waiting in a pool being initialised" - self._pool.append(conn) - trigger_event = len(self._pool) >= self._nconns - - if trigger_event: - event.set() - def _add_connection(self, attempt: Optional[ConnectionAttempt]) -> None: """Try to connect and add the connection to the pool. @@ -450,6 +442,11 @@ class ConnectionPool: # No client waiting for a connection: put it back into the pool self._pool.append(conn) + # If we have been asked to wait for pool init, notify the + # waiter if the pool is full. + if self._pool_full_event and len(self._pool) >= self._nconns: + self._pool_full_event.set() + def _reset_connection(self, conn: Connection) -> None: """ Bring a connection to IDLE state or close it. diff --git a/psycopg3/psycopg3/pool/tasks.py b/psycopg3/psycopg3/pool/tasks.py index fe1a11ba4..5b0690183 100644 --- a/psycopg3/psycopg3/pool/tasks.py +++ b/psycopg3/psycopg3/pool/tasks.py @@ -5,7 +5,6 @@ Maintenance tasks for the connection pools. # Copyright (C) 2021 The Psycopg Team import logging -import threading from abc import ABC, abstractmethod from typing import Optional, TYPE_CHECKING from weakref import ref @@ -69,20 +68,6 @@ class StopWorker(MaintenanceTask): pass -class AddInitialConnection(MaintenanceTask): - """Add a new connection into to the pool. - - If the desired number of connections is reached notify the event. - """ - - def __init__(self, pool: "ConnectionPool", event: threading.Event): - super().__init__(pool) - self.event = event - - def _run(self, pool: "ConnectionPool") -> None: - pool._add_initial_connection(self.event) - - class AddConnection(MaintenanceTask): def __init__( self, diff --git a/tests/pool/test_pool.py b/tests/pool/test_pool.py index 93c4cb37c..87309b8ee 100644 --- a/tests/pool/test_pool.py +++ b/tests/pool/test_pool.py @@ -68,17 +68,16 @@ def test_concurrent_filling(dsn, monkeypatch): t0 = time() times = [] - add_orig = pool.ConnectionPool._add_initial_connection + add_orig = pool.ConnectionPool._add_to_pool - def add_time(self, event): - add_orig(self, event) + def add_time(self, conn): times.append(time() - t0) + add_orig(self, conn) - monkeypatch.setattr( - pool.ConnectionPool, "_add_initial_connection", add_time - ) + monkeypatch.setattr(pool.ConnectionPool, "_add_to_pool", add_time) - pool.ConnectionPool(dsn, minconn=5, num_workers=2) + p = pool.ConnectionPool(dsn, minconn=5, num_workers=2) + p.wait_ready(5.0) want_times = [0.1, 0.1, 0.2, 0.2, 0.3] assert len(times) == len(want_times) for got, want in zip(times, want_times): @@ -86,27 +85,28 @@ def test_concurrent_filling(dsn, monkeypatch): @pytest.mark.slow -def test_setup_timeout(dsn, monkeypatch): +def test_wait_ready(dsn, monkeypatch): delay_connection(monkeypatch, 0.1) with pytest.raises(pool.PoolTimeout): - pool.ConnectionPool(dsn, minconn=4, num_workers=1, setup_timeout=0.3) + p = pool.ConnectionPool(dsn, minconn=4, num_workers=1) + p.wait_ready(0.3) - p = pool.ConnectionPool(dsn, minconn=4, num_workers=1, setup_timeout=0.5) + p = pool.ConnectionPool(dsn, minconn=4, num_workers=1) + p.wait_ready(0.5) p.close() - p = pool.ConnectionPool(dsn, minconn=4, num_workers=2, setup_timeout=0.3) + p = pool.ConnectionPool(dsn, minconn=4, num_workers=2) + p.wait_ready(0.3) + p.wait_ready(0.0001) # idempotent p.close() @pytest.mark.slow def test_setup_no_timeout(dsn, proxy): with pytest.raises(pool.PoolTimeout): - pool.ConnectionPool( - proxy.client_dsn, minconn=1, num_workers=1, setup_timeout=0.2 - ) + p = pool.ConnectionPool(proxy.client_dsn, minconn=1, num_workers=1) + p.wait_ready(0.2) - p = pool.ConnectionPool( - proxy.client_dsn, minconn=1, num_workers=1, setup_timeout=0 - ) + p = pool.ConnectionPool(proxy.client_dsn, minconn=1, num_workers=1) sleep(0.5) assert not p._pool proxy.start() @@ -382,8 +382,7 @@ def test_del_no_warning(dsn, recwarn): with p.connection() as conn: conn.execute("select 1") - wait_pool_full(p) - + p.wait_ready() ref = weakref.ref(p) del p assert not ref() @@ -458,6 +457,7 @@ def test_closed_queue(dsn): @pytest.mark.slow def test_grow(dsn, monkeypatch): p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, num_workers=3) + p.wait_ready(5.0) delay_connection(monkeypatch, 0.1) ts = [] results = [] @@ -500,6 +500,7 @@ def test_shrink(dsn, monkeypatch): monkeypatch.setattr(ShrinkPool, "_run", run_hacked) p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, max_idle=0.2) + p.wait_ready(5.0) assert p.max_idle == 0.2 def worker(n): @@ -529,7 +530,8 @@ def test_reconnect(proxy, caplog, monkeypatch): monkeypatch.setattr(pool.pool.ConnectionAttempt, "DELAY_JITTER", 0.0) proxy.start() - p = pool.ConnectionPool(proxy.client_dsn, minconn=1, setup_timeout=2.0) + p = pool.ConnectionPool(proxy.client_dsn, minconn=1) + p.wait_ready(2.0) proxy.stop() with pytest.raises(psycopg3.OperationalError): @@ -538,7 +540,7 @@ def test_reconnect(proxy, caplog, monkeypatch): sleep(1.0) proxy.start() - wait_pool_full(p) + p.wait_ready() with p.connection() as conn: conn.execute("select 1") @@ -569,10 +571,10 @@ def test_reconnect_failure(proxy): proxy.client_dsn, name="this-one", minconn=1, - setup_timeout=2.0, reconnect_timeout=1.0, reconnect_failed=failed, ) + p.wait_ready(2.0) proxy.stop() with pytest.raises(psycopg3.OperationalError): @@ -620,8 +622,3 @@ def delay_connection(monkeypatch, sec): return rv monkeypatch.setattr(psycopg3.Connection, "connect", connect_delay) - - -def wait_pool_full(pool): - while len(pool._pool) < pool._nconns: - sleep(0.01)