]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Add basic pool functionality test for queuing and timeout
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 13 Feb 2021 17:02:08 +0000 (18:02 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 12 Mar 2021 04:07:25 +0000 (05:07 +0100)
psycopg3/psycopg3/pool.py
tests/test_pool.py

index aced55ba76bc38cd2b67e1b70bdc48cc28a3e284..d3e081ba124ea34c1e3d2ac2cdf38f6e871f2c88 100644 (file)
@@ -37,7 +37,7 @@ class ConnectionPool:
         maxconn: Optional[int] = None,
         name: Optional[str] = None,
         timeout_sec: float = 30.0,
-        nworkers: int = 1,
+        num_workers: int = 1,
     ):
         if maxconn is None:
             maxconn = minconn
@@ -58,7 +58,7 @@ class ConnectionPool:
         self.minconn = minconn
         self.maxconn = maxconn
         self.timeout_sec = timeout_sec
-        self.nworkers = nworkers
+        self.num_workers = num_workers
 
         self._nconns = 0  # currently in the pool, out, being prepared
         self._pool: List[Connection] = []
@@ -67,7 +67,7 @@ class ConnectionPool:
 
         self._wqueue: "Queue[MaintenanceTask]" = Queue()
         self._workers: List[threading.Thread] = []
-        for i in range(nworkers):
+        for i in range(num_workers):
             t = threading.Thread(target=self.worker, args=(self._wqueue,))
             t.daemon = True
             t.start()
index b716597b166d5376b313555edd6b54efcdb4a0fb..febe018c5e589bcd05989df08b75b4847220a1cf 100644 (file)
@@ -1,6 +1,26 @@
+from time import time
+from threading import Thread
+
+import pytest
+
 from psycopg3 import pool
 
 
+def test_minconn_maxconn(dsn):
+    p = pool.ConnectionPool(dsn, num_workers=0)
+    assert p.minconn == p.maxconn == 4
+
+    p = pool.ConnectionPool(dsn, minconn=2, num_workers=0)
+    assert p.minconn == p.maxconn == 2
+
+    p = pool.ConnectionPool(dsn, minconn=2, maxconn=4, num_workers=0)
+    assert p.minconn == 2
+    assert p.maxconn == 4
+
+    with pytest.raises(ValueError):
+        pool.ConnectionPool(dsn, minconn=4, maxconn=2, num_workers=0)
+
+
 def test_pool(dsn):
     p = pool.ConnectionPool(dsn, minconn=2, timeout_sec=1.0)
     with p.connection() as conn:
@@ -13,3 +33,103 @@ def test_pool(dsn):
 
     with p.connection() as conn:
         assert conn.pgconn.backend_pid in (pid1, pid2)
+
+
+@pytest.mark.slow
+def test_queue(dsn):
+    p = pool.ConnectionPool(dsn, minconn=2)
+    results = []
+
+    def worker(n):
+        t0 = time()
+        with p.connection() as conn:
+            (pid,) = conn.execute(
+                "select pg_backend_pid() from pg_sleep(0.2)"
+            ).fetchone()
+        t1 = time()
+        results.append((n, t1 - t0, pid))
+
+    ts = []
+    for i in range(6):
+        t = Thread(target=worker, args=(i,))
+        t.start()
+        ts.append(t)
+
+    for t in ts:
+        t.join()
+
+    assert len([r for r in results if 0.2 < r[1] < 0.35]) == 2
+    assert len([r for r in results if 0.4 < r[1] < 0.55]) == 2
+    assert len([r for r in results if 0.5 < r[1] < 0.75]) == 2
+    assert len(set(r[2] for r in results)) == 2
+
+
+@pytest.mark.slow
+def test_queue_timeout(dsn):
+    p = pool.ConnectionPool(dsn, minconn=2, timeout_sec=0.1)
+    results = []
+    errors = []
+
+    def worker(n):
+        t0 = time()
+        try:
+            with p.connection() as conn:
+                (pid,) = conn.execute(
+                    "select pg_backend_pid() from pg_sleep(0.2)"
+                ).fetchone()
+        except pool.PoolTimeout as e:
+            t1 = time()
+            errors.append((n, t1 - t0, e))
+        else:
+            t1 = time()
+            results.append((n, t1 - t0, pid))
+
+    ts = []
+    for i in range(4):
+        t = Thread(target=worker, args=(i,))
+        t.start()
+        ts.append(t)
+
+    for t in ts:
+        t.join()
+
+    assert len(results) == 2
+    assert len(errors) == 2
+    for e in errors:
+        assert 0.1 < e[1] < 0.15
+
+
+@pytest.mark.slow
+def test_queue_timeout_override(dsn):
+    p = pool.ConnectionPool(dsn, minconn=2, timeout_sec=0.1)
+    results = []
+    errors = []
+
+    def worker(n):
+        t0 = time()
+        timeout = 0.25 if n == 3 else None
+        try:
+            with p.connection(timeout_sec=timeout) as conn:
+                (pid,) = conn.execute(
+                    "select pg_backend_pid() from pg_sleep(0.2)"
+                ).fetchone()
+        except pool.PoolTimeout as e:
+            t1 = time()
+            errors.append((n, t1 - t0, e))
+        else:
+            t1 = time()
+            results.append((n, t1 - t0, pid))
+
+    ts = []
+    for i in range(4):
+        t = Thread(target=worker, args=(i,))
+        t.start()
+        ts.append(t)
+
+    for t in ts:
+        t.join()
+
+    assert len(results) == 3
+    assert len(errors) == 1
+    for e in errors:
+        assert 0.1 < e[1] < 0.15