pool.ConnectionPool(dsn, minconn=4, maxconn=2, num_workers=0)
-def test_pool(dsn):
- p = pool.ConnectionPool(dsn, minconn=2, timeout_sec=1.0)
+def test_its_really_a_pool(dsn):
+ p = pool.ConnectionPool(dsn, minconn=2)
with p.connection() as conn:
with conn.execute("select pg_backend_pid()") as cur:
(pid1,) = cur.fetchone()
assert conn.pgconn.backend_pid in (pid1, pid2)
+def test_connection_not_lost(dsn):
+ p = pool.ConnectionPool(dsn, minconn=1)
+ with pytest.raises(ZeroDivisionError):
+ with p.connection() as conn:
+ pid = conn.pgconn.backend_pid
+ 1 / 0
+
+ with p.connection() as conn2:
+ assert conn2.pgconn.backend_pid == pid
+
+
@pytest.mark.slow
def test_queue(dsn):
p = pool.ConnectionPool(dsn, minconn=2)